import json import traceback from datetime import datetime, timedelta from tqdm.asyncio import tqdm from applications.crawler.wechat import get_article_detail from applications.utils import get_beijing_date, handle_spider_exception, transform_to_beijing_date, extract_root_source_id class Const: ARTICLE_SUCCESS_CODE = 0 # 记录默认状态 DEFAULT_STATUS = 0 # 请求接口失败状态 REQUEST_FAIL_STATUS = -1 # 文章被删除状态 DELETE_STATUS = -2 # 未知原因无信息返回状态 UNKNOWN_STATUS = -3 # 文章违规状态 ILLEGAL_STATUS = -4 class RecycleRootSourceIdDetail(Const): def __init__(self, pool, log_client, trace_id, run_date): self.pool = pool self.log_client = log_client self.trace_id = trace_id self.run_date = run_date if not self.run_date: self.run_date = get_beijing_date() async def get_publish_articles_last_day(self): """获取前一天的所有发文""" query = """ SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list FROM official_articles_v2 WHERE FROM_UNIXTIME(publish_timestamp) BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND); """ article_list = await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(self.run_date, self.run_date), ) return article_list async def get_mini_program_info_by_root_source_id(self, root_source_id_list): query = """ select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s; """ return await self.pool.async_fetch(query=query, params=(tuple(root_source_id_list),)) async def get_article_mini_program_detail(self, url, root_source_id_list): if not root_source_id_list: try: article_detail = await get_article_detail(url) response_code = article_detail["code"] if response_code == self.ARTICLE_SUCCESS_CODE: mini_info = article_detail["data"]["data"]["mini_program"] return mini_info else: return [] except Exception as e: await handle_spider_exception( log_client=self.log_client, error=e, traceback=traceback.format_exc(), trace_id=self.trace_id, task_name=self.__class__.__name__ ) return [] else: mini_program_info = await self.get_mini_program_info_by_root_source_id(root_source_id_list) if mini_program_info: return [ { "app_id": "wx89e7eb06478361d7", "avatar": "https://rescdn.yishihui.com/0temp/logo.png", "image_url": "", "nike_name": "票圈 l 3亿人喜欢的视频平台", "root_source_id": item["root_source_id"], "video_id": item["video_id"], "service_type": "0", "title": "", "type": "card", } for item in mini_program_info ] else: return [] async def record_single_article(self, article): url = article["ContentUrl"] wx_sn = article["wx_sn"].decode("utf-8") publish_timestamp = article["publish_timestamp"] root_source_id_list = json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else [] # get article mini program info article_mini_program_detail = await self.get_article_mini_program_detail(url, root_source_id_list) if not article_mini_program_detail: return {} else: try: publish_date = transform_to_beijing_date(publish_timestamp) # generate T+0, T+1, T+2 date string recall_dt_str_list = [ (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3) ] for date_str in recall_dt_str_list: for video_index, mini_item in enumerate( article_mini_program_detail, 1 ): image_url = mini_item["image_url"] nick_name = mini_item["nike_name"] # extract video id and root_source_id if mini_item.get("root_source_id") and mini_item.get( "video_id" ): root_source_id = mini_item["root_source_id"] video_id = mini_item["video_id"] else: id_info = extract_root_source_id(mini_item["path"]) root_source_id = id_info["root_source_id"] video_id = id_info["video_id"] kimi_title = mini_item["title"] # self.insert_each_root_source_id( # wx_sn=wx_sn, # mini_title=kimi_title, # mini_name=nick_name, # cover_url=image_url, # video_index=video_index, # root_source_id=root_source_id, # video_id=video_id, # publish_dt=publish_date.strftime("%Y-%m-%d"), # recall_dt=date_str, # ) return {} except Exception as e: print(e) error_msg = traceback.format_exc() return article async def deal(self): """deal function""" # step 1, record articles to detail table publish_articles_list = await self.get_publish_articles_last_day() for article in tqdm(publish_articles_list, desc="更新文章"): await self.record_single_article(article) # step2, update root_source_id detail info