123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- from __future__ import annotations
- import asyncio
- import time, json
- import traceback
- from datetime import datetime, date, timedelta
- from typing import List, Dict
- from tqdm.asyncio import tqdm
- from applications.api import feishu_robot
- from applications.crawler.wechat import weixin_search
- from applications.crawler.wechat import get_article_detail
- from applications.crawler.wechat import get_article_list_from_account
- from applications.pipeline import CrawlerPipeline
- from applications.utils import timestamp_to_str, show_desc_to_sta
- from applications.utils import get_hot_titles, generate_gzh_id
- class CrawlerGzhConst:
- PLATFORM = "weixin"
- DEFAULT_VIEW_COUNT = 0
- DEFAULT_LIKE_COUNT = 0
- DEFAULT_ARTICLE_STATUS = 1
- MAX_DEPTH = 3
- #
- SLEEP_SECONDS = 1
- STAT_DURATION = 30 # days
- DEFAULT_TIMESTAMP = 1735660800
- DAILY_SCRAPE_POSTIVE = 1
- DAILY_SCRAPE_NEGATIVE = 0
- USING_STATUS = 1
- NOT_USING_STATUS = 0
- CRAWL_ACCOUNT_FIRST_LEVEL = 500
- class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client)
- self.trace_id = trace_id
- async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
- """get crawler accounts"""
- match strategy:
- case "V1":
- query = """
- select gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = %s and is_using = %s and daily_scrape = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE)
- )
- case "V2":
- query = """
- select gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = %s and is_using = %s
- order by recent_score_ci_lower desc limit %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL)
- )
- case _:
- raise Exception("strategy not supported")
- async def get_account_latest_update_timestamp(self, account_id: str) -> int:
- """get latest update time"""
- query = """
- select max(publish_time) as publish_time
- from crawler_meta_article where out_account_id = %s;
- """
- fetch_response = await self.pool.async_fetch(query=query, params=(account_id,))
- return next((item.get("publish_time") for item in fetch_response or []), None)
- async def crawl_each_article(
- self, article_raw_data, mode, account_method, account_id, source_title=None
- ):
- """crawl each article"""
- base_item = {
- "platform": self.PLATFORM,
- "mode": mode,
- "crawler_time": int(time.time()),
- "category": account_method,
- }
- match mode:
- case "account":
- show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
- show_view_count = show_stat.get(
- "show_view_count", self.DEFAULT_VIEW_COUNT
- )
- show_like_count = show_stat.get(
- "show_like_count", self.DEFAULT_LIKE_COUNT
- )
- unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
- new_item = {
- **base_item,
- "read_cnt": show_view_count,
- "like_cnt": show_like_count,
- "title": article_raw_data["Title"],
- "out_account_id": account_id,
- "article_index": article_raw_data["ItemIndex"],
- "link": article_raw_data["ContentUrl"],
- "description": article_raw_data["Digest"],
- "unique_index": unique_idx,
- "publish_time": article_raw_data["send_time"],
- }
- case "search":
- new_item = {
- **base_item,
- "out_account_id": account_id,
- "article_index": article_raw_data["item_index"],
- "title": article_raw_data["title"],
- "link": article_raw_data["content_link"],
- "like_cnt": article_raw_data.get(
- "like_count", self.DEFAULT_LIKE_COUNT
- ),
- "read_cnt": article_raw_data.get(
- "view_count", self.DEFAULT_VIEW_COUNT
- ),
- "publish_time": int(article_raw_data["publish_timestamp"] / 1000),
- "unique_index": generate_gzh_id(article_raw_data["content_link"]),
- "source_article_title": source_title,
- }
- case _:
- raise Exception(f"unknown mode: {mode}")
- await self.save_item_to_database(
- media_type="article", item=new_item, trace_id=self.trace_id
- )
- await asyncio.sleep(self.SLEEP_SECONDS)
- async def update_account_read_avg_info(self, gh_id, account_name):
- """update account read avg info"""
- position_list = [i for i in range(1, 9)]
- today_dt = date.today().isoformat()
- for position in position_list:
- query = f"""
- select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
- where out_account_id = '{gh_id}' and article_index = {position}
- order by publish_time desc limit {self.STAT_DURATION};
- """
- fetch_response = await self.pool.async_fetch(query=query)
- if fetch_response:
- read_cnt_list = [i["read_cnt"] for i in fetch_response]
- n = len(read_cnt_list)
- read_avg = sum(read_cnt_list) / n
- max_publish_dt = fetch_response[0]["publish_dt"]
- remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
- insert_query = """
- insert ignore into crawler_meta_article_accounts_read_avg
- (gh_id, account_name, position, read_avg, dt, status, remark)
- values
- (%s, %s, %s, %s, %s, %s, %s);
- """
- insert_rows = await self.pool.async_save(
- query=insert_query,
- params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark),
- )
- if insert_rows:
- update_query = """
- update crawler_meta_article_accounts_read_avg
- set status = %s
- where gh_id = %s and position = %s and dt < %s;
- """
- await self.pool.async_save(
- update_query, (self.NOT_USING_STATUS, gh_id, position, today_dt)
- )
- async def get_hot_titles_with_strategy(self, strategy):
- """get hot titles with strategy"""
- match strategy:
- case "V1":
- position = 3
- read_times_threshold = 1.21
- timedelta_days = 3
- case "V2":
- position = 2
- read_times_threshold = 1.1
- timedelta_days = 5
- case _:
- raise Exception(f"unknown strategy: {strategy}")
- date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
- "%Y%m%d"
- )
- return await get_hot_titles(
- self.pool,
- date_string=date_string,
- position=position,
- read_times_threshold=read_times_threshold,
- )
- class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def insert_article_into_meta(self, gh_id, account_method, msg_list):
- """
- 将数据更新到数据库
- :return:
- """
- for msg in msg_list:
- article_list = msg["AppMsg"]["DetailInfo"]
- for obj in article_list:
- await self.crawl_each_article(
- article_raw_data=obj,
- mode="account",
- account_method=account_method,
- account_id=gh_id,
- )
- async def update_account_latest_timestamp(self, gh_id):
- """update the latest timestamp after crawler"""
- latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
- dt_str = timestamp_to_str(latest_timestamp)
- query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
- await self.pool.async_save(query=query, params=(dt_str, gh_id))
- async def crawler_single_account(self, account_method: str, account: Dict) -> None:
- """crawler single account"""
- current_cursor = None
- gh_id = account["gh_id"]
- latest_timestamp = account["latest_update_time"].timestamp()
- while True:
- # fetch response from weixin
- response = await get_article_list_from_account(
- account_id=gh_id, index=current_cursor
- )
- msg_list = response.get("data", {}).get("data")
- if not msg_list:
- break
- # process current page
- await self.insert_article_into_meta(gh_id, account_method, msg_list)
- # whether crawl next page
- last_article_in_this_page = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
- "BaseInfo"
- ]["UpdateTime"]
- if last_time_stamp_in_this_msg > latest_timestamp:
- await self.update_account_latest_timestamp(gh_id)
- break
- # update cursor for next page
- current_cursor = response.get("data", {}).get("next_cursor")
- if not current_cursor:
- break
- async def deal(self, method: str, strategy: str = "V1"):
- account_list = await self.get_crawler_accounts(method, strategy)
- for account in tqdm(account_list, desc="抓取单个账号"):
- print(f"{datetime.now()}: start crawling account: {json.dumps(account, ensure_ascii=False)}")
- try:
- await self.crawler_single_account(method, account)
- await self.update_account_read_avg_info(
- gh_id=account["gh_id"], account_name=account["account_name"]
- )
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "crawler_gzh_articles",
- "trace_id": self.trace_id,
- "data": {
- "account_id": account["gh_id"],
- "account_method": method,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- }
- )
- print(f"{datetime.now()}: finish crawled account: {json.dumps(account, ensure_ascii=False)}")
- class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str):
- """
- @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
- """
- for article in article_list:
- url = article["url"]
- detail_response = await get_article_detail(url, is_count=True, is_cache=False)
- if not detail_response:
- continue
- article_data = detail_response.get("data")
- if not article_data:
- continue
- if type(article_data) is not dict:
- continue
- article_detail = article_data.get("data")
- if not article_detail:
- continue
- await self.crawl_each_article(
- article_raw_data=article_detail,
- mode="search",
- account_method="search",
- account_id="search",
- source_title=source_title,
- )
- await asyncio.sleep(self.SLEEP_SECONDS)
- async def search_each_title(self, title: str, page: str = "1") -> None:
- """search in weixin"""
- current_page = page
- while True:
- # 翻页不超过3页
- if int(current_page) > self.MAX_DEPTH:
- break
- # 调用搜索接口
- search_response = await weixin_search(keyword=title, page=page)
- if not search_response:
- break
- article_list = search_response.get("data", {}).get("data")
- if not article_list:
- break
- # 存储搜索结果
- await self.crawl_search_articles_detail(article_list, title)
- # 判断是否还有下一页
- has_more = search_response.get("data", {}).get("has_more")
- if not has_more:
- break
- # 更新page
- current_page = search_response.get("data", {}).get("next_cursor")
- async def get_task_execute_result(self):
- """get task execute result"""
- query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;"""
- return await self.pool.async_fetch(query=query, params=(self.trace_id,))
- async def deal(self, strategy: str = "V1"):
- hot_titles = await self.get_hot_titles_with_strategy(strategy)
- for hot_title in tqdm(hot_titles, desc="在微信内搜索文章"):
- print(f"{datetime.now()}: start searching hot title: {hot_title}")
- try:
- await self.search_each_title(hot_title)
- except Exception as e:
- print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}")
- print(f"{datetime.now()}: finish searched hot title: {hot_title}")
- await feishu_robot.bot(
- title="公众号搜索任务执行完成",
- detail={
- "strategy": strategy,
- "execute_detail": await self.get_task_execute_result(),
- },
- )
|