import json import time import urllib.parse from datetime import datetime from typing import Optional from tqdm import tqdm from pymysql.cursors import DictCursor from applications.api import FeishuBotApi from applications.db import DatabaseConnector from applications.utils import str_to_md5 from cold_start.crawler.wechat import get_article_detail from config import denet_config, long_articles_config, piaoquan_crawler_config class FwhDataRecycle: RECYCLE_INIT_STATUS = 0 RECYCLE_PROCESSING_STATUS = 1 RECYCLE_SUCCESS_STATUS = 2 RECYCLE_FAILED_STATUS = 99 PUBLISH_SUCCESS_STATUS = 2 STAT_PERIOD = 3 * 24 * 3600 ARTICLE_ILLEGAL_CODE = 25012 def __init__(self): self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_client.connect() self.feishu_robot = FeishuBotApi() def get_group_server_accounts(self): fetch_query = f""" select gzh_id from article_gzh_developer; """ fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor) gh_id_list = [i["gzh_id"] for i in fetch_response] # gh_id_list = ['gh_5e543853d8f0'] return gh_id_list def get_server_account_name(self, gh_id: str) -> Optional[str]: fetch_query = f""" select account_name from long_articles_group_send_result where gh_id = %s limit 1; """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor, params=(gh_id,) ) account_name = fetch_response[0]["account_name"] if fetch_response else None return account_name def illegal_article_bot( self, account_name:str, gh_id:str, group_id: str, illegal_msg: str, publish_date: str, ): self.feishu_robot.bot( title="服务号文章违规告警,请前往微信公众平台处理", detail={ "account_name": account_name, "gh_id": gh_id, "group_id": group_id, "illegal_msg": illegal_msg, "publish_date": str(publish_date) }, env="server_account_publish_monitor" ) class FwhGroupPublishRecordManager(FwhDataRecycle): def get_published_articles(self): fetch_query = f""" select id, publish_content_id, gh_id, user_group_id from long_articles_group_send_result where status = %s and recycle_status = %s; """ fetch_response = self.long_articles_client.fetch( query=fetch_query, cursor_type=DictCursor, params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS), ) return fetch_response def get_article_url_from_aigc_system(self, publish_content_id, user_group_id): sql = f""" select t1.publish_stage_url from publish_content_stage_url t1 left join publish_content t2 on t1.publish_content_id = t2.id where t1.publish_content_id = %s and t1.user_group_id = %s; """ article_info = self.denet_client.fetch( sql, cursor_type=DictCursor, params=(publish_content_id, user_group_id), ) if article_info: return article_info[0] else: return None def update_recycle_status(self, record_id, ori_status, new_status): update_query = f""" update long_articles_group_send_result set recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( update_query, (new_status, record_id, ori_status) ) def set_article_url(self, record_id, article_url): update_query = f""" update long_articles_group_send_result set url = %s, recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( query=update_query, params=( article_url, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS, ), ) def deal(self): publish_records = self.get_published_articles() for publish_record in tqdm(publish_records): publish_content_id = publish_record["publish_content_id"] record_id = publish_record["id"] group_id = publish_record["user_group_id"] # lock self.update_recycle_status( record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS ) publish_call_back_info = self.get_article_url_from_aigc_system( publish_content_id, group_id ) if publish_call_back_info: article_url = publish_call_back_info["publish_stage_url"] if article_url: # set record and unlock self.set_article_url(record_id, article_url) else: # unlock self.update_recycle_status( record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS, ) else: # unlock self.update_recycle_status( record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS ) def monitor(self, date_string: str = datetime.today().strftime("%Y-%m-%d")): """ monitor the publish record """ now = datetime.now() if now.hour < 12: account_list = self.get_group_server_accounts() do_not_publish_account = [] sql = f""" select account_name as '账号名称', gh_id, count(distinct user_group_id) as '发文组数' from long_articles_group_send_result where publish_date = %s group by account_name, gh_id; """ publish_records = self.long_articles_client.fetch( query=sql, cursor_type=DictCursor, params=(date_string,) ) self.feishu_robot.bot( title=f"{date_string}服务号发文记录", mention=False, detail=publish_records, env="server_account_publish_monitor", ) publish_account_id_set = set([i["gh_id"] for i in publish_records]) for account_id in account_list: if account_id not in publish_account_id_set: account_name = self.get_server_account_name(account_id) do_not_publish_account.append( { "account_name": account_name, "gh_id": account_id, } ) if do_not_publish_account: self.feishu_robot.bot( title=f"{date_string}发现服务号存在未发文情况", detail=do_not_publish_account, env="server_account_publish_monitor", ) class SaveFwhDataToDatabase(FwhDataRecycle): def update_article_read_cnt(self, wx_sn, new_read_cnt): """ update article read cnt """ if new_read_cnt <= 0: return 0 update_query = f""" update official_articles_v2 set show_view_count = %s where wx_sn = %s; """ return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn)) def save_data_to_database(self, article): """ save data to db """ insert_query = f""" insert into official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ return self.piaoquan_client.save(insert_query, article) def get_stat_published_articles(self, gh_id): earliest_timestamp = int(time.time()) - self.STAT_PERIOD fetch_query = f""" select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp from long_articles_group_send_result where gh_id = %s and recycle_status = %s and create_time > %s; """ earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime("%Y-%m-%d %H:%M:%S") return self.long_articles_client.fetch( fetch_query, DictCursor, (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_time), ) def process_each_account_data(self, account_published_article_list): if not account_published_article_list: return for article in account_published_article_list: account_name = article["account_name"] gh_id = article["gh_id"] user_group_id = article["user_group_id"] url = article["url"] publish_date = article["publish_date"] # get article detail info with spider try: article_detail_info = get_article_detail( url, is_count=True, is_cache=False ) response_code = article_detail_info["code"] if response_code == self.ARTICLE_ILLEGAL_CODE: self.illegal_article_bot( account_name=account_name, gh_id=gh_id, group_id=user_group_id, illegal_msg=article_detail_info["msg"], publish_date=publish_date, ) time.sleep(1) content_url = article_detail_info["data"]["data"]["content_link"] app_msg_id = content_url.split("mid=")[-1].split("&")[0] wx_sn = content_url.split("sn=")[-1] publish_timestamp = int( article_detail_info["data"]["data"]["publish_timestamp"] / 1000 ) create_time = publish_timestamp update_time = publish_timestamp item_index = article_detail_info["data"]["data"]["item_index"] show_view_count = article_detail_info["data"]["data"]["view_count"] title = article_detail_info["data"]["data"]["title"] title_md5 = str_to_md5(title) channel_content_id = article_detail_info["data"]["data"][ "channel_content_id" ] mini_program_info = article_detail_info["data"]["data"]["mini_program"] root_source_id_list = [ urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[ "rootSourceId" ][0] for i in mini_program_info ] root_source_id_list = json.dumps(root_source_id_list) try: self.save_data_to_database( article=( gh_id, account_name, app_msg_id, title, "9", create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp, ) ) except Exception as e: self.update_article_read_cnt(wx_sn, show_view_count) except Exception as e: print(f"article {url} is not available, skip it") print(e) def deal(self): account_id_list = self.get_group_server_accounts() for account_id in account_id_list: publish_articles = tqdm( self.get_stat_published_articles(account_id), desc=f" {account_id}", ) self.process_each_account_data(publish_articles) class FwhDataExportTemp(FwhDataRecycle): def get_publish_articles(self, gh_id): sql = f""" -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list from official_articles_v2 where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13' and ghId = '{gh_id}' and article_group is not null group by accountName, title, ItemIndex; """ return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor) def get_fission_info(self, root_source_id_list): """ 获取裂变信息 """ root_source_id_tuple = tuple(json.loads(root_source_id_list)) query = f""" select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission' from changwen_data_rootsourceid where root_source_id in %s; """ return self.long_articles_client.fetch( query=query, cursor_type=DictCursor, params=(root_source_id_tuple,) ) def get_fans_num(self, gh_id, group_id_tuple): sql = f""" select count(1) as 'fans_count' from article_user_group where gzh_id = %s and user_group_id in %s and is_delete = 0; """ return self.piaoquan_client.fetch( query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple) ) def deal(self): import pandas as pd gh_id_list = self.get_group_server_accounts() L = [] for gh_id in gh_id_list: publish_articles = self.get_publish_articles(gh_id) for article in publish_articles: try: group_id_tuple = tuple(article["group"].split(",")) fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][ "fans_count" ] root_source_id_list = article["root_source_id_list"] fission_info = self.get_fission_info(root_source_id_list) article["uv"] = fission_info[0]["uv"] article["first_uv"] = fission_info[0]["first_uv"] article["split_uv"] = fission_info[0]["split_uv"] article["T+0_fission"] = fission_info[0]["T+0_fission"] article["fans_count"] = fans_count L.append(article) except Exception as e: print(f"article {article['ContentUrl']} is not available, skip it") df = pd.DataFrame(L) df.to_csv("temp2.csv", index=False)