from __future__ import annotations import asyncio import json import time import traceback from datetime import datetime, date from typing import List, Dict from applications.api import feishu_robot from applications.crawler.wechat import weixin_search from applications.crawler.wechat import get_article_detail from applications.crawler.wechat import get_article_list_from_account from applications.pipeline import CrawlerPipeline from applications.utils import timestamp_to_str, show_desc_to_sta from applications.utils import get_hot_titles, generate_gzh_id class CrawlerGzhConst: PLATFORM = "weixin" DEFAULT_VIEW_COUNT = 0 DEFAULT_LIKE_COUNT = 0 DEFAULT_ARTICLE_STATUS = 1 STAT_DURATION = 30 # days DEFAULT_TIMESTAMP = 1735660800 class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client) self.trace_id = trace_id async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]: """get crawler accounts""" match strategy: case "V1": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s and daily_scrape = %s; """ return await self.pool.async_fetch(query=query, params=(method, 1, 1)) case "V2": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; """ return await self.pool.async_fetch(query=query, params=(method, 1, 500)) case _: raise Exception("strategy not supported") async def get_account_latest_update_timestamp(self, account_id: str) -> int: """get latest update time""" query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;""" latest_timestamp_obj = await self.pool.async_fetch( query=query, params=(account_id,) ) return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None async def crawl_each_article( self, article_raw_data, mode, account_method, account_id ): """crawl each article""" base_item = { "platform": self.PLATFORM, "mode": mode, "crawler_time": int(time.time()), } match mode: case "account": show_stat = show_desc_to_sta(article_raw_data["ShowDesc"]) show_view_count = show_stat.get( "show_view_count", self.DEFAULT_VIEW_COUNT ) show_like_count = show_stat.get( "show_like_count", self.DEFAULT_LIKE_COUNT ) unique_idx = generate_gzh_id(article_raw_data["ContentUrl"]) new_item = { **base_item, "read_cnt": show_view_count, "like_cnt": show_like_count, "title": article_raw_data["Title"], "category": account_method, "out_account_id": account_id, "article_index": article_raw_data["ItemIndex"], "link": article_raw_data["ContentUrl"], "description": article_raw_data["Digest"], "unique_index": unique_idx, "publish_time": article_raw_data["send_time"], } case _: raise Exception(f"unknown mode: {mode}") await self.save_item_to_database( media_type="article", item=new_item, trace_id=self.trace_id ) async def update_account_read_avg_info(self, gh_id, account_name): """update account read avg info""" position_list = [i for i in range(1, 9)] today_dt = date.today().isoformat() for position in position_list: query = f""" select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article where out_account_id = '{gh_id}' and article_index = {position} order by publish_time desc limit {self.STAT_DURATION}; """ fetch_response = await self.pool.async_fetch(query=query) if fetch_response: read_cnt_list = [i["read_cnt"] for i in fetch_response] n = len(read_cnt_list) read_avg = sum(read_cnt_list) / n max_publish_dt = fetch_response[0]["publish_dt"] remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天" insert_query = f""" insert ignore into crawler_meta_article_accounts_read_avg (gh_id, account_name, position, read_avg, dt, status, remark) values (%s, %s, %s, %s, %s, %s, %s); """ insert_rows = await self.pool.async_save( query=insert_query, params=( gh_id, account_name, position, read_avg, today_dt, 1, remark, ), ) if insert_rows: update_query = f""" update crawler_meta_article_accounts_read_avg set status = %s where gh_id = %s and position = %s and dt < %s; """ await self.pool.async_save(update_query, (0, gh_id, position, today_dt)) class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def insert_article_into_meta(self, gh_id, account_method, msg_list): """ 将数据更新到数据库 :return: """ for msg in msg_list: article_list = msg["AppMsg"]["DetailInfo"] for obj in article_list: await self.crawl_each_article( article_raw_data=obj, mode="account", account_method=account_method, account_id=gh_id, ) async def update_account_latest_timestamp(self, gh_id): """update the latest timestamp after crawler""" latest_timestamp = await self.get_account_latest_update_timestamp(gh_id) dt_str = timestamp_to_str(latest_timestamp) query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;""" await self.pool.async_save(query=query, params=(dt_str, gh_id)) async def crawler_single_account(self, account_method: str, account: Dict) -> None: """crawler single account""" current_cursor = None gh_id = account["gh_id"] latest_timestamp = account["latest_update_time"].timestamp() while True: # fetch response from weixin response = get_article_list_from_account( account_id=gh_id, index=current_cursor ) msg_list = response.get("data", {}).get("data") if not msg_list: break # process current page await self.insert_article_into_meta(gh_id, account_method, msg_list) # whether crawl next page last_article_in_this_page = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][ "BaseInfo" ]["UpdateTime"] if last_time_stamp_in_this_msg > latest_timestamp: await self.update_account_latest_timestamp(gh_id) break # update cursor for next page current_cursor = response.get("data", {}).get("next_cursor") if not current_cursor: break async def deal(self, method: str, strategy: str = "V1"): account_list = await self.get_crawler_accounts(method, strategy) for account in account_list: print(account) try: await self.crawler_single_account(method, account) await self.update_account_read_avg_info( gh_id=account["gh_id"], account_name=account["account_name"] ) except Exception as e: await self.log_client.log( contents={ "task": "crawler_gzh_articles", "trace_id": self.trace_id, "data": { "account_id": account["gh_id"], "account_method": method, "error": str(e), "traceback": traceback.format_exc(), }, } ) class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def search_each_title(self, title: str, page='1') -> None: """search in weixin""" search_response = await weixin_search(keyword=title, page=page) async def deal(self, date_string: str, strategy: str = "V1"): hot_titles = await get_hot_titles(self.pool, date_string=date_string) for hot_title in hot_titles: await self.search_each_title(hot_title) # # # if __name__ == "__main__": # import asyncio # response = asyncio.run(weixin_search(keyword="南京照相馆")) # print(json.dumps(response, ensure_ascii=False, indent=4))