from __future__ import annotations import asyncio import time, json import traceback from datetime import datetime, date, timedelta from typing import List, Dict from tqdm.asyncio import tqdm from applications.api import feishu_robot from applications.crawler.wechat import weixin_search from applications.crawler.wechat import get_article_detail from applications.crawler.wechat import get_article_list_from_account from applications.pipeline import CrawlerPipeline from applications.utils import timestamp_to_str, show_desc_to_sta from applications.utils import get_hot_titles, generate_gzh_id class CrawlerGzhConst: PLATFORM = "weixin" DEFAULT_VIEW_COUNT = 0 DEFAULT_LIKE_COUNT = 0 DEFAULT_ARTICLE_STATUS = 1 MAX_DEPTH = 3 # SLEEP_SECONDS = 1 STAT_DURATION = 30 # days DEFAULT_TIMESTAMP = 1735660800 DAILY_SCRAPE_POSTIVE = 1 DAILY_SCRAPE_NEGATIVE = 0 USING_STATUS = 1 NOT_USING_STATUS = 0 CRAWL_ACCOUNT_FIRST_LEVEL = 500 class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client) self.trace_id = trace_id async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]: """get crawler accounts""" match strategy: case "V1": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s and daily_scrape = %s; """ return await self.pool.async_fetch( query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE) ) case "V2": query = """ select gh_id, account_name, latest_update_time from long_articles_accounts where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; """ return await self.pool.async_fetch( query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL) ) case _: raise Exception("strategy not supported") async def get_account_latest_update_timestamp(self, account_id: str) -> int: """get latest update time""" query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s; """ fetch_response = await self.pool.async_fetch(query=query, params=(account_id,)) return next((item.get("publish_time") for item in fetch_response or []), None) async def crawl_each_article( self, article_raw_data, mode, account_method, account_id, source_title=None ): """crawl each article""" base_item = { "platform": self.PLATFORM, "mode": mode, "crawler_time": int(time.time()), "category": account_method, } match mode: case "account": show_stat = show_desc_to_sta(article_raw_data["ShowDesc"]) show_view_count = show_stat.get( "show_view_count", self.DEFAULT_VIEW_COUNT ) show_like_count = show_stat.get( "show_like_count", self.DEFAULT_LIKE_COUNT ) unique_idx = generate_gzh_id(article_raw_data["ContentUrl"]) new_item = { **base_item, "read_cnt": show_view_count, "like_cnt": show_like_count, "title": article_raw_data["Title"], "out_account_id": account_id, "article_index": article_raw_data["ItemIndex"], "link": article_raw_data["ContentUrl"], "description": article_raw_data["Digest"], "unique_index": unique_idx, "publish_time": article_raw_data["send_time"], } case "search": new_item = { **base_item, "out_account_id": account_id, "article_index": article_raw_data["item_index"], "title": article_raw_data["title"], "link": article_raw_data["content_link"], "like_cnt": article_raw_data.get( "like_count", self.DEFAULT_LIKE_COUNT ), "read_cnt": article_raw_data.get( "view_count", self.DEFAULT_VIEW_COUNT ), "publish_time": int(article_raw_data["publish_timestamp"] / 1000), "unique_index": generate_gzh_id(article_raw_data["content_link"]), "source_article_title": source_title, } case _: raise Exception(f"unknown mode: {mode}") await self.save_item_to_database( media_type="article", item=new_item, trace_id=self.trace_id ) await asyncio.sleep(self.SLEEP_SECONDS) async def update_account_read_avg_info(self, gh_id, account_name): """update account read avg info""" position_list = [i for i in range(1, 9)] today_dt = date.today().isoformat() for position in position_list: query = f""" select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article where out_account_id = '{gh_id}' and article_index = {position} order by publish_time desc limit {self.STAT_DURATION}; """ fetch_response = await self.pool.async_fetch(query=query) if fetch_response: read_cnt_list = [i["read_cnt"] for i in fetch_response] n = len(read_cnt_list) read_avg = sum(read_cnt_list) / n max_publish_dt = fetch_response[0]["publish_dt"] remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天" insert_query = """ insert ignore into crawler_meta_article_accounts_read_avg (gh_id, account_name, position, read_avg, dt, status, remark) values (%s, %s, %s, %s, %s, %s, %s); """ insert_rows = await self.pool.async_save( query=insert_query, params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark), ) if insert_rows: update_query = """ update crawler_meta_article_accounts_read_avg set status = %s where gh_id = %s and position = %s and dt < %s; """ await self.pool.async_save( update_query, (self.NOT_USING_STATUS, gh_id, position, today_dt) ) async def get_hot_titles_with_strategy(self, strategy): """get hot titles with strategy""" match strategy: case "V1": position = 3 read_times_threshold = 1.21 timedelta_days = 3 case "V2": position = 2 read_times_threshold = 1.1 timedelta_days = 5 case _: raise Exception(f"unknown strategy: {strategy}") date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime( "%Y%m%d" ) return await get_hot_titles( self.pool, date_string=date_string, position=position, read_times_threshold=read_times_threshold, ) class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def insert_article_into_meta(self, gh_id, account_method, msg_list): """ 将数据更新到数据库 :return: """ for msg in msg_list: article_list = msg["AppMsg"]["DetailInfo"] for obj in article_list: await self.crawl_each_article( article_raw_data=obj, mode="account", account_method=account_method, account_id=gh_id, ) async def update_account_latest_timestamp(self, gh_id): """update the latest timestamp after crawler""" latest_timestamp = await self.get_account_latest_update_timestamp(gh_id) dt_str = timestamp_to_str(latest_timestamp) query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;""" await self.pool.async_save(query=query, params=(dt_str, gh_id)) async def crawler_single_account(self, account_method: str, account: Dict) -> None: """crawler single account""" current_cursor = None gh_id = account["gh_id"] latest_timestamp = account["latest_update_time"].timestamp() while True: # fetch response from weixin response = await get_article_list_from_account( account_id=gh_id, index=current_cursor ) msg_list = response.get("data", {}).get("data") if not msg_list: break # process current page await self.insert_article_into_meta(gh_id, account_method, msg_list) # whether crawl next page last_article_in_this_page = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][ "BaseInfo" ]["UpdateTime"] if last_time_stamp_in_this_msg > latest_timestamp: await self.update_account_latest_timestamp(gh_id) break # update cursor for next page current_cursor = response.get("data", {}).get("next_cursor") if not current_cursor: break async def deal(self, method: str, strategy: str = "V1"): account_list = await self.get_crawler_accounts(method, strategy) for account in tqdm(account_list, desc="抓取单个账号"): print(f"{datetime.now()}: start crawling account: {account}") try: await self.crawler_single_account(method, account) await self.update_account_read_avg_info( gh_id=account["gh_id"], account_name=account["account_name"] ) except Exception as e: await self.log_client.log( contents={ "task": "crawler_gzh_articles", "trace_id": self.trace_id, "data": { "account_id": account["gh_id"], "account_method": method, "error": str(e), "traceback": traceback.format_exc(), }, } ) print(f"{datetime.now()}: finish crawled account: {account}") class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str): """ @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中 """ for article in article_list: url = article["url"] detail_response = await get_article_detail(url, is_count=True, is_cache=False) if not detail_response: continue article_data = detail_response.get("data") if not article_data: continue if type(article_data) is not dict: continue article_detail = article_data.get("data") if not article_detail: continue await self.crawl_each_article( article_raw_data=article_detail, mode="search", account_method="search", account_id="search", source_title=source_title, ) await asyncio.sleep(self.SLEEP_SECONDS) async def search_each_title(self, title: str, page: str = "1") -> None: """search in weixin""" current_page = page while True: # 翻页不超过3页 if int(current_page) > self.MAX_DEPTH: break # 调用搜索接口 search_response = await weixin_search(keyword=title, page=page) if not search_response: break article_list = search_response.get("data", {}).get("data") if not article_list: break # 存储搜索结果 await self.crawl_search_articles_detail(article_list, title) # 判断是否还有下一页 has_more = search_response.get("data", {}).get("has_more") if not has_more: break # 更新page current_page = search_response.get("data", {}).get("next_cursor") async def get_task_execute_result(self): """get task execute result""" query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;""" return await self.pool.async_fetch(query=query, params=(self.trace_id,)) async def deal(self, strategy: str = "V1"): hot_titles = await self.get_hot_titles_with_strategy(strategy) for hot_title in tqdm(hot_titles, desc="在微信内搜索文章"): print(f"{datetime.now()}: start searching hot title: {hot_title}") try: await self.search_each_title(hot_title) except Exception as e: print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}") print(f"{datetime.now()}: finish searched hot title: {hot_title}") await feishu_robot.bot( title="公众号搜索任务执行完成", detail={ "strategy": strategy, "execute_detail": await self.get_task_execute_result(), }, )