import json import traceback from typing import Any from datetime import datetime, timedelta from applications.crawler.wechat import get_article_detail from applications.utils import extract_root_source_id from applications.utils import run_tasks_with_asyncio_task_group class MiniProgramConst: ARTICLE_SUCCESS_CODE = 0 # 记录默认状态 DEFAULT_STATUS = 0 # 请求接口失败状态 REQUEST_FAIL_STATUS = -1 # 文章被删除状态 DELETE_STATUS = -2 # 未知原因无信息返回状态 UNKNOWN_STATUS = -3 # 文章违规状态 ILLEGAL_STATUS = -4 TASK_NAME = "recycle_mini_program_detail_daily" ARTICLE_TABLE = "official_articles_v2" DETAIL_TABLE = "long_articles_detail_info" # 更新文章周期天数 ARTICLE_RECYCLE_DAYS = 1 # 更新root_source_id天数 ROOT_SOURCE_ID_UPDATE_DAYS = 3 class RecycleMiniProgramDetailBase(MiniProgramConst): def __init__(self, pool, log_client, trace_id): self.pool = pool self.log_client = log_client self.trace_id = trace_id # 构造小程序信息对象 @staticmethod async def create_mini_info(response: list[dict]) -> list[dict]: return [ { "app_id": "wx89e7eb06478361d7", "avatar": "https://rescdn.yishihui.com/0temp/logo.png", "image_url": "", "nike_name": "票圈 l 3亿人喜欢的视频平台", "root_source_id": item["root_source_id"], "video_id": item["video_id"], "service_type": "0", "title": "", "type": "card", } for item in response ] # 获取单个 root_source_id 的首层,裂变详情 async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]: query = f""" SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend FROM changwen_data_rootsourceid WHERE root_source_id = %s AND dt = %s; """ return await self.pool.async_fetch(query=query, params=(root_source_id, dt)) # 获取制定天发布的文章 async def fetch_published_articles( self, run_date: str, date_delta: int ) -> list[dict]: query = f""" SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list FROM official_articles_v2 WHERE FROM_UNIXTIME(publish_timestamp) BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND); """ return await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(run_date, date_delta, run_date), ) # 从数据库获取root_source_id信息 async def fetch_root_source_id_from_db( self, root_source_id_list: list[str] ) -> list[dict[str, str]]: placeholders = ", ".join(["%s"] * len(root_source_id_list)) query = f""" SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders}); """ return await self.pool.async_fetch( query=query, params=tuple(root_source_id_list) ) # 获取制定几天前的root_source_id async def fetch_root_source_id_in_last_days( self, run_date: str, date_delta: int ) -> list[dict[str, str]]: query = f""" SELECT recall_dt, root_source_id FROM {self.DETAIL_TABLE} WHERE publish_dt BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND); """ return await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(run_date, date_delta, run_date), ) # 插入新的root_source_id async def insert_each_root_source_id(self, params_list: list[tuple]) -> None: # 记录root_source_id信息 query = f""" INSERT INTO {self.DETAIL_TABLE} (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE video_id = VALUES(video_id); """ return await self.pool.async_save( query=query, db_name="piaoquan_crawler", params=params_list, batch=True ) # 更新root_source_id 首层 && 裂变信息 async def update_each_root_source_id(self, item: dict) -> None: recall_dt = item["recall_dt"] root_source_id = item["root_source_id"] mini_program_detail = await self.fetch_root_source_id_result( root_source_id, recall_dt ) if not mini_program_detail: return mini_program_detail = mini_program_detail[0] query = f""" UPDATE {self.DETAIL_TABLE} SET first_level = %s, fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s, fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s, fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s WHERE root_source_id = %s and recall_dt = %s; """ await self.pool.async_save( query=query, db_name="piaoquan_crawler", params=( mini_program_detail["first_uv"], mini_program_detail["split0"], mini_program_detail["split0_head"], mini_program_detail["split0_recommend"], mini_program_detail["split1"], mini_program_detail["split1_head"], mini_program_detail["split1_recommend"], mini_program_detail["split2"], mini_program_detail["split2_head"], mini_program_detail["split2_recommend"], root_source_id, recall_dt, ), ) class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase): # 业务层 def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def handle_single_task(self, article): """ record each article into long_articles_detail_info table """ url, publish_timestamp, wx_sn = ( article["ContentUrl"], article["publish_timestamp"], article["wx_sn"].decode(), ) root_source_id_list = ( json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else [] ) mini_detail = [] # 获取小程序信息 if root_source_id_list: root_source_id_detail = await self.fetch_root_source_id_from_db( root_source_id_list ) mini_detail = await self.create_mini_info(root_source_id_detail) # no root_source_id_list in article try: article_detail = await get_article_detail(article_link=url) response_code = article_detail["code"] if response_code == self.ARTICLE_SUCCESS_CODE: mini_detail = article_detail["data"]["data"].get("mini_program", []) except Exception as e: await self.log_client.log( contents={ "task": self.TASK_NAME, "trace_id": article["trace_id"], "message": "get article detail error", "data": { "error": str(e), "traceback": traceback.format_exc(), }, } ) # 将root_source_id信息插入mysql表格 if not mini_detail: return publish_date = datetime.fromtimestamp(publish_timestamp) publish_dt = publish_date.strftime("%Y-%m-%d") # 记录T+0, T+1, T+2 三天的数据 recall_dt_list = [ (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3) ] params_list = [] for dt in recall_dt_list: for video_index, mini_item in enumerate(mini_detail, start=1): image_url = mini_item["image_url"] nickname = mini_item["nike_name"] if mini_item.get("root_source_id") and mini_item.get("video_id"): root_source_id, video_id = ( mini_item["root_source_id"], mini_item["video_id"], ) else: id_info = extract_root_source_id(mini_item["path"]) root_source_id, video_id = ( id_info["root_source_id"], id_info["video_id"], ) kimi_title = mini_item["title"] params_list.append( ( wx_sn, kimi_title, nickname, image_url, video_index, root_source_id, video_id, publish_dt, dt, ) ) await self.insert_each_root_source_id(params_list) # 记录每日发文 && root_source_id 信息 async def record_published_articles_job( self, date_delta: int, run_date: str, *, max_concurrency: int = 20, # 并发上限 fail_fast: bool = False, # 是否出现错误就退出 ) -> dict[str, Any]: # get tasks task_list = await self.fetch_published_articles(run_date, date_delta) return await run_tasks_with_asyncio_task_group( task_list=task_list, handler=self.handle_single_task, max_concurrency=max_concurrency, fail_fast=fail_fast, description="持久化文章&&rootSourceId", unit="article", ) # 更新root_source_id的首层,裂变信息 async def update_mini_program_detail_job( self, date_delta: int, run_date: str, *, max_concurrency: int = 20, fail_fast: bool = False, ) -> dict[str, Any]: task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta) return await run_tasks_with_asyncio_task_group( task_list=task_list, handler=self.update_each_root_source_id, max_concurrency=max_concurrency, fail_fast=fail_fast, description="更新rootSourceId裂变表现", unit="id", ) # 业务入口 async def deal(self, params: dict): # 解析请求参数 run_date = params.get("run_date") if not run_date: run_date = datetime.today().strftime("%Y-%m-%d") record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS) update_date_delta = params.get( "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS ) # 业务执行 await self.record_published_articles_job( date_delta=record_date_delta, run_date=run_date ) await self.update_mini_program_detail_job( date_delta=update_date_delta, run_date=run_date )