import asyncio import json import time import datetime import urllib.parse import traceback from tqdm.asyncio import tqdm from applications.api import feishu_robot from applications.crawler.wechat import get_article_list_from_account from applications.crawler.wechat import get_article_detail from applications.pipeline import insert_article_into_recycle_pool from applications.utils import str_to_md5 class Const: # 订阅号 SUBSCRIBE_TYPE_SET = {0, 1} NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30 FORBIDDEN_GH_IDS = [ "gh_4c058673c07e", "gh_de9f9ebc976b", "gh_7b4a5f86d68c", "gh_f902cea89e48", "gh_789a40fe7935", "gh_cd041ed721e6", "gh_62d7f423f382", "gh_043223059726", 'gh_6cfd1132df94', 'gh_7f5075624a50', 'gh_d4dffc34ac39' ] # 文章状态 # 记录默认状态 DEFAULT_STATUS = 0 # 请求接口失败状态 REQUEST_FAIL_STATUS = -1 # 文章被删除状态 DELETE_STATUS = -2 # 未知原因无信息返回状态 UNKNOWN_STATUS = -3 # 文章违规状态 ILLEGAL_STATUS = -4 ARTICLE_ILLEGAL_CODE = 25012 ARTICLE_DELETE_CODE = 25005 ARTICLE_SUCCESS_CODE = 0 ARTICLE_UNKNOWN_CODE = 10000 ACCOUNT_FORBIDDEN_CODE = 25013 CRAWL_CRASH_CODE = 20000 STAT_PERIOD = 3 * 24 * 3600 INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAILED_STATUS = 99 class RecycleDailyPublishArticlesTask(Const): def __init__(self, pool, log_client, date_string): self.pool = pool self.log_client = log_client self.date_string = date_string async def get_publish_accounts(self): """ get all publish accounts """ query = f""" select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp, t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id, group_concat(distinct t5.remark) as account_remark from publish_plan t1 join publish_plan_account t2 on t1.id = t2.plan_id join publish_account t3 on t2.account_id = t3.id left join publish_account_wx_type t4 on t3.id = t4.account_id left join publish_account_remark t5 on t3.id = t5.publish_account_id where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5 group by t3.id; """ account_list = await self.pool.async_fetch(query, db_name="aigc") return [i for i in account_list if "自动回复" not in str(i["account_remark"])] async def get_account_status(self): """get account experiment status""" sql = f""" select t1.account_id, t2.status from wx_statistics_group_source_account t1 join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name; """ account_status_list = await self.pool.async_fetch(sql, db_name="aigc") account_status_dict = { account["account_id"]: account["status"] for account in account_status_list } return account_status_dict async def recycle_single_account(self, account): """recycle single account""" query = """ select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s; """ response = await self.pool.async_fetch( query, params=(account["gh_id"],), db_name="piaoquan_crawler" ) if response: max_publish_timestamp = response[0]["publish_timestamp"] else: max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD cursor = None while True: response = await get_article_list_from_account( account_id=account["gh_id"], index=cursor ) response_code = response["code"] match response_code: case self.ACCOUNT_FORBIDDEN_CODE: await feishu_robot.bot( title="发布账号封禁", detail={ "账号名称": account["name"], "账号id": account["gh_id"], }, ) return case self.ARTICLE_SUCCESS_CODE: msg_list = response.get("data", {}).get("data", []) if not msg_list: return await insert_article_into_recycle_pool( self.pool, self.log_client, msg_list, account ) # check last article last_article = msg_list[-1] last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][ "UpdateTime" ] if last_publish_timestamp <= max_publish_timestamp: return cursor = response["data"].get("next_cursor") if not cursor: return case self.CRAWL_CRASH_CODE: await self.log_client.log( contents={ "task": "recycle_daily_publish_articles", "data": { "gh_id": account["gh_id"], }, "message": "爬虫挂掉", "status": "fail", } ) case _: return async def get_task_list(self): """recycle all publish accounts articles""" binding_accounts = await self.get_publish_accounts() # 过滤封禁账号 binding_accounts = [ i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS ] account_status = await self.get_account_status() account_list = [ { **item, "using_status": ( 0 if account_status.get(item["account_id"]) == "实验" else 1 ), } for item in binding_accounts ] # 订阅号 subscription_accounts = [ i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET ] return subscription_accounts async def deal(self): subscription_accounts = await self.get_task_list() for account in tqdm(subscription_accounts, desc="recycle each account"): try: await self.recycle_single_account(account) except Exception as e: print( f"{account['name']}\t{account['gh_id']}: recycle account error:", e ) class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask): async def check_account(self, account: dict, date_string: str) -> bool: """check account data""" query = """ select accountName, count(1) as publish_count from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s; """ response = await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(account["gh_id"], date_string), ) if response: today_publish_count = response[0]["publish_count"] return today_publish_count > 0 else: return False async def deal(self): task_list = await self.get_task_list() for task in tqdm(task_list, desc="check each account step1: "): if await self.check_account(task, self.date_string): continue else: await self.recycle_single_account(task) # check again fail_list = [] for second_task in tqdm(task_list, desc="check each account step2: "): if await self.check_account(second_task, self.date_string): continue else: second_task.pop("account_type", None) second_task.pop("account_auth", None) second_task.pop("account_id", None) second_task.pop("account_remark", None) fail_list.append(second_task) if fail_list: now = datetime.datetime.now() if now.hour < 20: return columns = [ feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="name", display_name="公众号名称", ), feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id" ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="follower_count", display_name="粉丝数", ), feishu_robot.create_feishu_columns_sheet( sheet_type="date", sheet_name="account_init_timestamp", display_name="账号接入系统时间", ), feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="using_status", display_name="利用状态", ), ] await feishu_robot.bot( title=f"{self.date_string} 发布文章,存在未更新的账号", detail={"columns": columns, "rows": fail_list}, table=True, mention=False, ) else: await feishu_robot.bot( title=f"{self.date_string} 发布文章,所有文章更新成功", detail={ "date_string": self.date_string, "finish_time": datetime.datetime.now().__str__(), }, mention=False, ) class UpdateRootSourceIdAndUpdateTimeTask(Const): """ update publish_timestamp && root_source_id """ def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client async def get_article_list(self) -> list[dict]: query = """select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;""" article_list = await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),) ) return article_list async def check_each_article(self, article: dict): url = article["ContentUrl"] wx_sn = article["wx_sn"].decode("utf-8") try: response = await get_article_detail(url) response_code = response["code"] if response_code == self.ARTICLE_DELETE_CODE: publish_timestamp_s = self.DELETE_STATUS root_source_id_list = [] elif response_code == self.ARTICLE_ILLEGAL_CODE: publish_timestamp_s = self.ILLEGAL_STATUS root_source_id_list = [] elif response_code == self.ARTICLE_SUCCESS_CODE: data = response["data"]["data"] publish_timestamp_ms = data["publish_timestamp"] publish_timestamp_s = int(publish_timestamp_ms / 1000) mini_program = data.get("mini_program", []) if mini_program: root_source_id_list = [ urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get("rootSourceId", [""])[0] for i in mini_program ] else: root_source_id_list = [] else: publish_timestamp_s = self.UNKNOWN_STATUS root_source_id_list = [] except Exception as e: publish_timestamp_s = self.REQUEST_FAIL_STATUS root_source_id_list = None error_msg = traceback.format_exc() await self.log_client.log( contents={ "task": "get_official_article_detail", "data": { "url": url, "wx_sn": wx_sn, "error_msg": error_msg, "error": str(e), }, "function": "check_each_article", "status": "fail", } ) query = """ update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s where wx_sn = %s; """ await self.pool.async_save( query=query, db_name="piaoquan_crawler", params=( publish_timestamp_s, json.dumps(root_source_id_list, ensure_ascii=False), wx_sn, ), ) if publish_timestamp_s == self.REQUEST_FAIL_STATUS: article['wx_sn'] = wx_sn return article else: return None async def fallback_mechanism(self): # 通过msgId 来修改publish_timestamp update_sql = f""" update official_articles_v2 oav join ( select ghId, appMsgId, max(publish_timestamp) as publish_timestamp from official_articles_v2 where publish_timestamp > %s group by ghId, appMsgId ) vv on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId set oav.publish_timestamp = vv.publish_timestamp where oav.publish_timestamp <= %s; """ affected_rows_1 = await self.pool.async_save( query=update_sql, params=(0, 0), db_name="piaoquan_crawler" ) # 若还是无 publish_timestamp,用update_time当作 publish_timestamp update_sql_2 = f""" update official_articles_v2 set publish_timestamp = updateTime where publish_timestamp < %s; """ affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,), db_name="piaoquan_crawler") if affected_rows_1 or affected_rows_2: await feishu_robot.bot( title="执行兜底修改发布时间戳", detail={ "通过msgId修改": affected_rows_1, "通过create_timestamp修改": affected_rows_2, }, mention=False, ) async def deal(self): task_list = await self.get_article_list() for task in tqdm(task_list, desc="get article detail step1: "): try: await self.check_each_article(task) except Exception as e: try: await self.log_client.log( contents={ "task": "get_official_article_detail_step1", "data": { "detail": { "url": task["ContentUrl"], "wx_sn": task["wx_sn"].decode("utf-8"), }, "error_msg": traceback.format_exc(), "error": str(e), }, "function": "check_each_article", "status": "fail", } ) except Exception as e: print(e) print(traceback.format_exc()) # process_failed_task_reproduce fail_tasks = await self.get_article_list() fail_list = [] for fail_task in tqdm(fail_tasks, desc="get article detail step2: "): try: res = await self.check_each_article(fail_task) if res: fail_list.append(res) except Exception as e: await self.log_client.log( contents={ "task": "get_official_article_detail_step2", "data": { "detail": { "url": fail_task["ContentUrl"], "wx_sn": fail_task["wx_sn"].decode("utf-8"), }, "error_msg": traceback.format_exc(), "error": str(e), }, "function": "check_each_article", "status": "fail", } ) if fail_list: await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list) current_hour = datetime.datetime.now().hour if current_hour >= 21: await self.fallback_mechanism() class RecycleFwhDailyPublishArticlesTask(Const): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client @staticmethod async def illegal_article_bot( account_name: str, gh_id: str, group_id: str, illegal_msg: str, publish_date: str, ): await feishu_robot.bot( title="服务号文章违规告警,请前往微信公众平台处理", detail={ "account_name": account_name, "gh_id": gh_id, "group_id": group_id, "illegal_msg": illegal_msg, "publish_date": str(publish_date), }, env="server_account_publish_monitor", ) async def save_data_to_database(self, article): """ save data to db """ insert_query = f""" insert into official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ return await self.pool.async_save( query=insert_query, db_name="piaoquan_crawler", params=article ) async def update_article_read_cnt(self, wx_sn, new_read_cnt): if new_read_cnt <= 0: return 0 update_query = """ update official_articles_v2 set show_view_count = %s where wx_sn = %s; """ return await self.pool.async_save( query=update_query, db_name="piaoquan_crawler", params=(new_read_cnt, wx_sn) ) async def get_group_server_accounts(self): fetch_query = "select gzh_id from article_gzh_developer;" fetch_response = await self.pool.async_fetch( query=fetch_query, db_name="piaoquan_crawler" ) gh_id_list = [i["gzh_id"] for i in fetch_response] return gh_id_list async def get_stat_published_articles(self, gh_id): earliest_timestamp = int(time.time()) - self.STAT_PERIOD fetch_query = """ select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp from long_articles_group_send_result where gh_id = %s and recycle_status = %s and create_time > %s; """ earliest_time = datetime.datetime.fromtimestamp(earliest_timestamp).strftime( "%Y-%m-%d %H:%M:%S" ) return await self.pool.async_fetch( query=fetch_query, params=(gh_id, self.SUCCESS_STATUS, earliest_time), ) async def process_each_account_data(self, account_published_article_list): if not account_published_article_list: return for article in account_published_article_list: account_name = article["account_name"] gh_id = article["gh_id"] user_group_id = article["user_group_id"] url = article["url"] publish_date = article["publish_date"] # get article detail info with spider try: article_detail_info = await get_article_detail( url, is_count=True, is_cache=False ) response_code = article_detail_info["code"] if response_code == self.ARTICLE_ILLEGAL_CODE: await self.illegal_article_bot( account_name=account_name, gh_id=gh_id, group_id=user_group_id, illegal_msg=article_detail_info["msg"], publish_date=publish_date, ) await asyncio.sleep(1) content_url = article_detail_info["data"]["data"]["content_link"] app_msg_id = content_url.split("mid=")[-1].split("&")[0] wx_sn = content_url.split("sn=")[-1] publish_timestamp = int( article_detail_info["data"]["data"]["publish_timestamp"] / 1000 ) create_time = publish_timestamp update_time = publish_timestamp item_index = article_detail_info["data"]["data"]["item_index"] show_view_count = article_detail_info["data"]["data"]["view_count"] title = article_detail_info["data"]["data"]["title"] title_md5 = str_to_md5(title) channel_content_id = article_detail_info["data"]["data"][ "channel_content_id" ] mini_program_info = article_detail_info["data"]["data"]["mini_program"] root_source_id_list = [ urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[ "rootSourceId" ][0] for i in mini_program_info ] root_source_id_list = json.dumps(root_source_id_list) try: await self.save_data_to_database( article=( gh_id, account_name, app_msg_id, title, "9", create_time, update_time, item_index, url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp, ) ) except Exception as e: await self.update_article_read_cnt(wx_sn, show_view_count) except Exception as e: print(f"article {url} is not available, skip it") print(e) async def deal(self): account_id_list = await self.get_group_server_accounts() for account_id in account_id_list: publish_articles = tqdm( await self.get_stat_published_articles(account_id), desc=f" {account_id}", ) await self.process_each_account_data(publish_articles)