123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- from __future__ import annotations
- import asyncio
- import json
- import time
- import traceback
- from datetime import datetime
- from typing import List, Dict
- from applications.api import feishu_robot
- from applications.crawler.wechat import search
- from applications.crawler.wechat import get_article_detail
- from applications.crawler.wechat import get_article_list_from_account
- from applications.pipeline import CrawlerPipeline
- from applications.utils import timestamp_to_str, show_desc_to_sta, generate_gzh_id
- class CrawlerGzhConst:
- PLATFORM = "weixin"
- DEFAULT_VIEW_COUNT = 0
- DEFAULT_LIKE_COUNT = 0
- DEFAULT_ARTICLE_STATUS = 1
- DEFAULT_TIMESTAMP = 1735660800
- class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client)
- self.trace_id = trace_id
- async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
- """get crawler accounts"""
- match strategy:
- case "V1":
- query = """
- select gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = %s and is_using = %s and daily_scrape = %s;
- """
- return await self.pool.async_fetch(query=query, params=(method, 1, 1))
- case "V2":
- query = """
- select gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(method, 1, 100)
- )
- case _:
- raise Exception("strategy not supported")
- async def get_account_latest_update_timestamp(self, account_id: str) -> int:
- """get latest update time"""
- query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
- latest_timestamp_obj = await self.pool.async_fetch(
- query=query, params=(account_id,)
- )
- return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
- async def crawl_each_article(
- self, article_raw_data, mode, account_method, account_id
- ):
- """crawl each article"""
- base_item = {
- "platform": self.PLATFORM,
- "mode": mode,
- "crawler_time": int(time.time()),
- }
- match mode:
- case "account":
- show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
- show_view_count = show_stat.get(
- "show_view_count", self.DEFAULT_VIEW_COUNT
- )
- show_like_count = show_stat.get(
- "show_like_count", self.DEFAULT_LIKE_COUNT
- )
- unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
- new_item = {
- **base_item,
- "read_cnt": show_view_count,
- "like_cnt": show_like_count,
- "title": article_raw_data["Title"],
- "category": account_method,
- "out_account_id": account_id,
- "article_index": article_raw_data["ItemIndex"],
- "link": article_raw_data["ContentUrl"],
- "description": article_raw_data["Digest"],
- "unique_index": unique_idx,
- "publish_time": article_raw_data["send_time"],
- }
- case _:
- raise Exception(f"unknown mode: {mode}")
- await self.save_item_to_database(
- media_type="article", item=new_item, trace_id=self.trace_id
- )
- class CrawlerGzhAccountArticles(CrawlerGzhStrategy):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def insert_article_into_meta(self, gh_id, account_method, msg_list):
- """
- 将数据更新到数据库
- :return:
- """
- for msg in msg_list:
- article_list = msg["AppMsg"]["DetailInfo"]
- for obj in article_list:
- await self.crawl_each_article(
- article_raw_data=obj,
- mode="account",
- account_method=account_method,
- account_id=gh_id,
- )
- async def update_account_latest_timestamp(self, gh_id):
- """update the latest timestamp after crawler"""
- latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
- dt_str = timestamp_to_str(latest_timestamp)
- query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
- await self.pool.async_save(query=query, params=(dt_str, gh_id))
- async def crawler_single_account(self, account_method: str, account: Dict) -> None:
- """crawler single account"""
- current_cursor = None
- gh_id = account["gh_id"]
- latest_timestamp = account["latest_update_time"].timestamp()
- while True:
- # fetch response from weixin
- response = get_article_list_from_account(
- account_id=gh_id, index=current_cursor
- )
- msg_list = response.get("data", {}).get("data")
- if not msg_list:
- break
- # process current page
- await self.insert_article_into_meta(gh_id, account_method, msg_list)
- # whether crawl next page
- last_article_in_this_page = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
- "BaseInfo"
- ]["UpdateTime"]
- if last_time_stamp_in_this_msg > latest_timestamp:
- await self.update_account_latest_timestamp(gh_id)
- break
- # update cursor for next page
- current_cursor = response.get("data", {}).get("next_cursor")
- if not current_cursor:
- break
- async def deal(self, method: str, strategy: str = "V1"):
- account_list = await self.get_crawler_accounts(method, strategy)
- for account in account_list:
- print(account)
- try:
- await self.crawler_single_account(method, account)
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "crawler_gzh_articles",
- "trace_id": account["trace_id"],
- "data": {
- "account_id": account["account_id"],
- "account_method": method,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- }
- )
- class CrawlerGzhSearchArticles(CrawlerGzhStrategy):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def deal(self):
- return {
- "mode": "search",
- "message": "still developing"
- }
|