from __future__ import annotations import asyncio import json import time import traceback from datetime import datetime from typing import List, Dict from applications.api import feishu_robot from applications.crawler.wechat import search from applications.crawler.wechat import get_article_detail from applications.crawler.wechat import get_article_list_from_account from applications.pipeline import CrawlerPipeline from applications.utils import timestamp_to_str, show_desc_to_sta, generate_gzh_id class CrawlerGzhConst: PLATFORM = "weixin" DEFAULT_VIEW_COUNT = 0 DEFAULT_LIKE_COUNT = 0 DEFAULT_ARTICLE_STATUS = 1 DEFAULT_TIMESTAMP = 1735660800 class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client) self.trace_id = trace_id async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]: """get crawler accounts""" match strategy: case "V1": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s and daily_scrape = %s; """ return await self.pool.async_fetch(query=query, params=(method, 1, 1)) case "V2": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; """ return await self.pool.async_fetch( query=query, params=(method, 1, 100) ) case _: raise Exception("strategy not supported") async def get_account_latest_update_timestamp(self, account_id: str) -> int: """get latest update time""" query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;""" latest_timestamp_obj = await self.pool.async_fetch( query=query, params=(account_id,) ) return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None async def crawl_each_article( self, article_raw_data, mode, account_method, account_id ): """crawl each article""" base_item = { "platform": self.PLATFORM, "mode": mode, "crawler_time": int(time.time()), } match mode: case "account": show_stat = show_desc_to_sta(article_raw_data["ShowDesc"]) show_view_count = show_stat.get( "show_view_count", self.DEFAULT_VIEW_COUNT ) show_like_count = show_stat.get( "show_like_count", self.DEFAULT_LIKE_COUNT ) unique_idx = generate_gzh_id(article_raw_data["ContentUrl"]) new_item = { **base_item, "read_cnt": show_view_count, "like_cnt": show_like_count, "title": article_raw_data["Title"], "category": account_method, "out_account_id": account_id, "article_index": article_raw_data["ItemIndex"], "link": article_raw_data["ContentUrl"], "description": article_raw_data["Digest"], "unique_index": unique_idx, "publish_time": article_raw_data["send_time"], } case _: raise Exception(f"unknown mode: {mode}") await self.save_item_to_database( media_type="article", item=new_item, trace_id=self.trace_id ) class CrawlerGzhAccountArticles(CrawlerGzhStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def insert_article_into_meta(self, gh_id, account_method, msg_list): """ 将数据更新到数据库 :return: """ for msg in msg_list: article_list = msg["AppMsg"]["DetailInfo"] for obj in article_list: await self.crawl_each_article( article_raw_data=obj, mode="account", account_method=account_method, account_id=gh_id, ) async def update_account_latest_timestamp(self, gh_id): """update the latest timestamp after crawler""" latest_timestamp = await self.get_account_latest_update_timestamp(gh_id) dt_str = timestamp_to_str(latest_timestamp) query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;""" await self.pool.async_save(query=query, params=(dt_str, gh_id)) async def crawler_single_account(self, account_method: str, account: Dict) -> None: """crawler single account""" current_cursor = None gh_id = account["gh_id"] latest_timestamp = account["latest_update_time"].timestamp() while True: # fetch response from weixin response = get_article_list_from_account( account_id=gh_id, index=current_cursor ) msg_list = response.get("data", {}).get("data") if not msg_list: break # process current page await self.insert_article_into_meta(gh_id, account_method, msg_list) # whether crawl next page last_article_in_this_page = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][ "BaseInfo" ]["UpdateTime"] if last_time_stamp_in_this_msg > latest_timestamp: await self.update_account_latest_timestamp(gh_id) break # update cursor for next page current_cursor = response.get("data", {}).get("next_cursor") if not current_cursor: break async def deal(self, method: str, strategy: str = "V1"): account_list = await self.get_crawler_accounts(method, strategy) for account in account_list: print(account) try: await self.crawler_single_account(method, account) except Exception as e: await self.log_client.log( contents={ "task": "crawler_gzh_articles", "trace_id": account["trace_id"], "data": { "account_id": account["account_id"], "account_method": method, "error": str(e), "traceback": traceback.format_exc(), } } ) class CrawlerGzhSearchArticles(CrawlerGzhStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def deal(self): return { "mode": "search", "message": "still developing" }