import time, json import datetime import traceback import urllib.parse from tqdm.asyncio import tqdm from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask from .recycle_daily_publish_articles import Const from applications.crawler.wechat import get_article_list_from_account from applications.crawler.wechat import get_article_detail from applications.pipeline import insert_outside_article_into_recycle_pool from applications.api import feishu_robot class RecycleOutsideAccountArticlesTask(Const): def __init__(self, pool, log_client, date_string): self.pool = pool self.log_client = log_client self.date_string = date_string async def get_outside_accounts(self): query = """ select t2.group_source_name as account_source, t3.name as name, t3.gh_id as gh_id from wx_statistics_group_source t1 join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name join publish_account t3 on t3.id = t2.account_id where t1.mode_type = '代运营服务号' and ( t2.group_source_name like '%云誉%' or t2.group_source_name like '%微小盟%' or t2.group_source_name like '%阿雅达%' or t2.group_source_name like '%创易%' ) and t3.status = 1 and t3.name != ''; """ return await self.pool.async_fetch(query=query, db_name="aigc") async def recycle_single_account(self, account): """recycle single account""" query = """ select max(update_time) as publish_timestamp \ from outside_account_articles where gh_id = %s; """ response = await self.pool.async_fetch(query=query, params=(account["gh_id"],)) if response: max_publish_timestamp = response[0]["publish_timestamp"] else: max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD cursor = None while True: response = await get_article_list_from_account( account_id=account["gh_id"], index=cursor ) response_code = response["code"] match response_code: case self.ACCOUNT_FORBIDDEN_CODE: # await feishu_robot.bot( # title="发布账号封禁", # detail={ # "账号名称": account["name"], # "账号id": account["gh_id"], # }, # ) return case self.ARTICLE_SUCCESS_CODE: msg_list = response.get("data", {}).get("data", []) if not msg_list: return await insert_outside_article_into_recycle_pool( self.pool, self.log_client, msg_list, account ) # check last article last_article = msg_list[-1] last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][ "UpdateTime" ] if last_publish_timestamp <= max_publish_timestamp: return cursor = response["data"].get("next_cursor") if not cursor: return case self.CRAWL_CRASH_CODE: await self.log_client.log( contents={ "task": "recycle_daily_publish_articles", "data": { "gh_id": account["gh_id"], }, "message": "爬虫挂掉", "status": "fail", } ) case _: return async def deal(self): subscription_accounts = await self.get_outside_accounts() for account in tqdm(subscription_accounts, desc="recycle each account"): try: await self.recycle_single_account(account) except Exception as e: print( f"{account['name']}\t{account['gh_id']}: recycle account error:", e ) class UpdateOutsideRootSourceIdAndUpdateTimeTask(UpdateRootSourceIdAndUpdateTimeTask): def __init__(self, pool, log_client): super().__init__(pool, log_client) async def get_outside_article_list_v2(self) -> list[dict]: query = """ select content_url, wx_sn from outside_account_articles where publish_timestamp in %s order by update_time desc; """ article_list = await self.pool.async_fetch( query=query, params=(tuple([0, -1]),) ) return article_list async def check_each_article(self, article: dict): url = article["content_url"] wx_sn = article["wx_sn"] try: response = await get_article_detail(url) response_code = response["code"] if response_code == self.ARTICLE_DELETE_CODE: publish_timestamp_s = self.DELETE_STATUS root_source_id_list = [] elif response_code == self.ARTICLE_ILLEGAL_CODE: publish_timestamp_s = self.ILLEGAL_STATUS root_source_id_list = [] elif response_code == self.ARTICLE_SUCCESS_CODE: data = response["data"]["data"] publish_timestamp_ms = data["publish_timestamp"] publish_timestamp_s = int(publish_timestamp_ms / 1000) mini_program = data.get("mini_program", []) if mini_program: root_source_id_list = [ urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get( "rootSourceId", [""] )[0] for i in mini_program ] else: root_source_id_list = [] else: publish_timestamp_s = self.UNKNOWN_STATUS root_source_id_list = [] except Exception as e: publish_timestamp_s = self.REQUEST_FAIL_STATUS root_source_id_list = None error_msg = traceback.format_exc() await self.log_client.log( contents={ "task": "get_official_article_detail", "data": { "url": url, "wx_sn": wx_sn, "error_msg": error_msg, "error": str(e), }, "function": "check_each_article", "status": "fail", } ) query = """ update outside_account_articles set publish_timestamp = %s, root_source_id_list = %s where wx_sn = %s; """ await self.pool.async_save( query=query, params=( publish_timestamp_s, json.dumps(root_source_id_list, ensure_ascii=False), wx_sn, ), ) if publish_timestamp_s == self.REQUEST_FAIL_STATUS: article["wx_sn"] = wx_sn return article else: return None async def fallback_mechanism(self): # 通过msgId 来修改publish_timestamp update_sql = f""" update outside_account_articles oaa join ( select gh_id, app_msg_id, max(publish_timestamp) as publish_timestamp from outside_account_articles where publish_timestamp > %s group by gh_id, app_msg_id ) vv on oaa.app_msg_id = vv.app_msg_id and oaa.gh_id = vv.gh_id set oaa.publish_timestamp = vv.publish_timestamp where oaa.publish_timestamp <= %s; """ affected_rows_1 = await self.pool.async_save(query=update_sql, params=(0, 0)) # 若还是无 publish_timestamp,用update_time当作 publish_timestamp update_sql_2 = f""" update outside_account_articles set publish_timestamp = update_time where publish_timestamp < %s; """ affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,)) if affected_rows_1 or affected_rows_2: await feishu_robot.bot( title="执行兜底修改发布时间戳", detail={ "通过msgId修改": affected_rows_1, "通过create_timestamp修改": affected_rows_2, }, mention=False, ) async def deal(self): task_list = await self.get_outside_article_list_v2() for task in tqdm(task_list, desc="get article detail step1: "): try: await self.check_each_article(task) except Exception as e: try: await self.log_client.log( contents={ "task": "get_official_article_detail_step1", "data": { "detail": { "url": task["ContentUrl"], "wx_sn": task["wx_sn"].decode("utf-8"), }, "error_msg": traceback.format_exc(), "error": str(e), }, "function": "check_each_article", "status": "fail", } ) except Exception as e: print(e) print(traceback.format_exc()) # process_failed_task_reproduce fail_tasks = await self.get_article_list() fail_list = [] for fail_task in tqdm(fail_tasks, desc="get article detail step2: "): try: res = await self.check_each_article(fail_task) if res: fail_list.append(res) except Exception as e: await self.log_client.log( contents={ "task": "get_official_article_detail_step2", "data": { "detail": { "url": fail_task["ContentUrl"], "wx_sn": fail_task["wx_sn"].decode("utf-8"), }, "error_msg": traceback.format_exc(), "error": str(e), }, "function": "check_each_article", "status": "fail", } ) # if fail_list: # await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list) # # current_hour = datetime.datetime.now().hour # if current_hour >= 21: # await self.fallback_mechanism()