crawler_gzh.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. from __future__ import annotations
  2. import asyncio
  3. import time
  4. import traceback
  5. from datetime import datetime, date, timedelta
  6. from typing import List, Dict
  7. from applications.api import feishu_robot
  8. from applications.crawler.wechat import weixin_search
  9. from applications.crawler.wechat import get_article_detail
  10. from applications.crawler.wechat import get_article_list_from_account
  11. from applications.pipeline import CrawlerPipeline
  12. from applications.utils import timestamp_to_str, show_desc_to_sta
  13. from applications.utils import get_hot_titles, generate_gzh_id
  14. class CrawlerGzhConst:
  15. PLATFORM = "weixin"
  16. DEFAULT_VIEW_COUNT = 0
  17. DEFAULT_LIKE_COUNT = 0
  18. DEFAULT_ARTICLE_STATUS = 1
  19. MAX_DEPTH = 3
  20. #
  21. SLEEP_SECONDS = 1
  22. STAT_DURATION = 30 # days
  23. DEFAULT_TIMESTAMP = 1735660800
  24. class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
  25. def __init__(self, pool, log_client, trace_id):
  26. super().__init__(pool, log_client)
  27. self.trace_id = trace_id
  28. async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
  29. """get crawler accounts"""
  30. match strategy:
  31. case "V1":
  32. query = """
  33. select gh_id, account_name, latest_update_time
  34. from long_articles_accounts
  35. where account_category = %s and is_using = %s and daily_scrape = %s;
  36. """
  37. return await self.pool.async_fetch(query=query, params=(method, 1, 1))
  38. case "V2":
  39. query = """
  40. select gh_id, account_name, latest_update_time
  41. from long_articles_accounts
  42. where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s;
  43. """
  44. return await self.pool.async_fetch(query=query, params=(method, 1, 500))
  45. case _:
  46. raise Exception("strategy not supported")
  47. async def get_account_latest_update_timestamp(self, account_id: str) -> int:
  48. """get latest update time"""
  49. query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
  50. latest_timestamp_obj = await self.pool.async_fetch(
  51. query=query, params=(account_id,)
  52. )
  53. return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
  54. async def crawl_each_article(
  55. self, article_raw_data, mode, account_method, account_id, source_title=None
  56. ):
  57. """crawl each article"""
  58. base_item = {
  59. "platform": self.PLATFORM,
  60. "mode": mode,
  61. "crawler_time": int(time.time()),
  62. "category": account_method,
  63. }
  64. match mode:
  65. case "account":
  66. show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
  67. show_view_count = show_stat.get(
  68. "show_view_count", self.DEFAULT_VIEW_COUNT
  69. )
  70. show_like_count = show_stat.get(
  71. "show_like_count", self.DEFAULT_LIKE_COUNT
  72. )
  73. unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
  74. new_item = {
  75. **base_item,
  76. "read_cnt": show_view_count,
  77. "like_cnt": show_like_count,
  78. "title": article_raw_data["Title"],
  79. "out_account_id": account_id,
  80. "article_index": article_raw_data["ItemIndex"],
  81. "link": article_raw_data["ContentUrl"],
  82. "description": article_raw_data["Digest"],
  83. "unique_index": unique_idx,
  84. "publish_time": article_raw_data["send_time"],
  85. }
  86. case "search":
  87. new_item = {
  88. **base_item,
  89. "out_account_id": account_id,
  90. "article_index": article_raw_data["item_index"],
  91. "title": article_raw_data["title"],
  92. "link": article_raw_data["content_link"],
  93. "like_cnt": article_raw_data.get(
  94. "like_count", self.DEFAULT_LIKE_COUNT
  95. ),
  96. "read_cnt": article_raw_data.get(
  97. "view_count", self.DEFAULT_VIEW_COUNT
  98. ),
  99. "publish_time": int(article_raw_data["publish_timestamp"] / 1000),
  100. "unique_index": generate_gzh_id(article_raw_data["content_link"]),
  101. "source_article_title": source_title,
  102. }
  103. case _:
  104. raise Exception(f"unknown mode: {mode}")
  105. await self.save_item_to_database(
  106. media_type="article", item=new_item, trace_id=self.trace_id
  107. )
  108. await asyncio.sleep(self.STAT_DURATION)
  109. async def update_account_read_avg_info(self, gh_id, account_name):
  110. """update account read avg info"""
  111. position_list = [i for i in range(1, 9)]
  112. today_dt = date.today().isoformat()
  113. for position in position_list:
  114. query = f"""
  115. select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
  116. where out_account_id = '{gh_id}' and article_index = {position}
  117. order by publish_time desc limit {self.STAT_DURATION};
  118. """
  119. fetch_response = await self.pool.async_fetch(query=query)
  120. if fetch_response:
  121. read_cnt_list = [i["read_cnt"] for i in fetch_response]
  122. n = len(read_cnt_list)
  123. read_avg = sum(read_cnt_list) / n
  124. max_publish_dt = fetch_response[0]["publish_dt"]
  125. remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
  126. insert_query = f"""
  127. insert ignore into crawler_meta_article_accounts_read_avg
  128. (gh_id, account_name, position, read_avg, dt, status, remark)
  129. values
  130. (%s, %s, %s, %s, %s, %s, %s);
  131. """
  132. insert_rows = await self.pool.async_save(
  133. query=insert_query,
  134. params=(
  135. gh_id,
  136. account_name,
  137. position,
  138. read_avg,
  139. today_dt,
  140. 1,
  141. remark,
  142. ),
  143. )
  144. if insert_rows:
  145. update_query = f"""
  146. update crawler_meta_article_accounts_read_avg
  147. set status = %s
  148. where gh_id = %s and position = %s and dt < %s;
  149. """
  150. await self.pool.async_save(
  151. update_query, (0, gh_id, position, today_dt)
  152. )
  153. async def get_hot_titles_with_strategy(self, strategy):
  154. """get hot titles with strategy"""
  155. match strategy:
  156. case "V1":
  157. position = 3
  158. read_times_threshold = 1.21
  159. timedelta_days = 3
  160. case "V2":
  161. position = 2
  162. read_times_threshold = 1.1
  163. timedelta_days = 5
  164. case _:
  165. raise Exception(f"unknown strategy: {strategy}")
  166. date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
  167. "%Y%m%d"
  168. )
  169. return await get_hot_titles(
  170. self.pool,
  171. date_string=date_string,
  172. position=position,
  173. read_times_threshold=read_times_threshold,
  174. )
  175. class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
  176. def __init__(self, pool, log_client, trace_id):
  177. super().__init__(pool, log_client, trace_id)
  178. async def insert_article_into_meta(self, gh_id, account_method, msg_list):
  179. """
  180. 将数据更新到数据库
  181. :return:
  182. """
  183. for msg in msg_list:
  184. article_list = msg["AppMsg"]["DetailInfo"]
  185. for obj in article_list:
  186. await self.crawl_each_article(
  187. article_raw_data=obj,
  188. mode="account",
  189. account_method=account_method,
  190. account_id=gh_id,
  191. )
  192. async def update_account_latest_timestamp(self, gh_id):
  193. """update the latest timestamp after crawler"""
  194. latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
  195. dt_str = timestamp_to_str(latest_timestamp)
  196. query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
  197. await self.pool.async_save(query=query, params=(dt_str, gh_id))
  198. async def crawler_single_account(self, account_method: str, account: Dict) -> None:
  199. """crawler single account"""
  200. current_cursor = None
  201. gh_id = account["gh_id"]
  202. latest_timestamp = account["latest_update_time"].timestamp()
  203. while True:
  204. # fetch response from weixin
  205. response = await get_article_list_from_account(
  206. account_id=gh_id, index=current_cursor
  207. )
  208. msg_list = response.get("data", {}).get("data")
  209. if not msg_list:
  210. break
  211. # process current page
  212. await self.insert_article_into_meta(gh_id, account_method, msg_list)
  213. # whether crawl next page
  214. last_article_in_this_page = msg_list[-1]
  215. last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
  216. "BaseInfo"
  217. ]["UpdateTime"]
  218. if last_time_stamp_in_this_msg > latest_timestamp:
  219. await self.update_account_latest_timestamp(gh_id)
  220. break
  221. # update cursor for next page
  222. current_cursor = response.get("data", {}).get("next_cursor")
  223. if not current_cursor:
  224. break
  225. async def deal(self, method: str, strategy: str = "V1"):
  226. account_list = await self.get_crawler_accounts(method, strategy)
  227. for account in account_list:
  228. print(account)
  229. try:
  230. await self.crawler_single_account(method, account)
  231. await self.update_account_read_avg_info(
  232. gh_id=account["gh_id"], account_name=account["account_name"]
  233. )
  234. except Exception as e:
  235. await self.log_client.log(
  236. contents={
  237. "task": "crawler_gzh_articles",
  238. "trace_id": self.trace_id,
  239. "data": {
  240. "account_id": account["gh_id"],
  241. "account_method": method,
  242. "error": str(e),
  243. "traceback": traceback.format_exc(),
  244. },
  245. }
  246. )
  247. class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
  248. def __init__(self, pool, log_client, trace_id):
  249. super().__init__(pool, log_client, trace_id)
  250. async def crawl_search_articles_detail(
  251. self, article_list: List[Dict], source_title: str
  252. ):
  253. for article in article_list:
  254. url = article["url"]
  255. detail_response = await get_article_detail(
  256. url, is_count=True, is_cache=False
  257. )
  258. if not detail_response:
  259. continue
  260. article_data = detail_response.get("data")
  261. if not article_data:
  262. continue
  263. if type(article_data) is not dict:
  264. continue
  265. article_detail = article_data.get("data")
  266. if not article_detail:
  267. continue
  268. await self.crawl_each_article(
  269. article_raw_data=article_detail,
  270. mode="search",
  271. account_method="search",
  272. account_id="search",
  273. source_title=source_title,
  274. )
  275. await asyncio.sleep(self.SLEEP_SECONDS)
  276. async def search_each_title(self, title: str, page: str = "1") -> None:
  277. """search in weixin"""
  278. current_page = page
  279. while True:
  280. # 翻页不超过3页
  281. if int(current_page) > self.MAX_DEPTH:
  282. break
  283. # 调用搜索接口
  284. search_response = await weixin_search(keyword=title, page=page)
  285. if not search_response:
  286. break
  287. article_list = search_response.get("data", {}).get("data")
  288. if not article_list:
  289. break
  290. # 存储搜索结果
  291. await self.crawl_search_articles_detail(article_list, title)
  292. # 判断是否还有下一页
  293. has_more = search_response.get("data", {}).get("has_more")
  294. if not has_more:
  295. break
  296. # 更新page
  297. current_page = search_response.get("data", {}).get("next_cursor")
  298. async def get_task_execute_result(self):
  299. """get task execute result"""
  300. query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;"""
  301. return await self.pool.async_fetch(query=query, params=(self.trace_id,))
  302. async def deal(self, strategy: str = "V1"):
  303. hot_titles = await self.get_hot_titles_with_strategy(strategy)
  304. for hot_title in hot_titles:
  305. print("hot title:", hot_title)
  306. try:
  307. await self.search_each_title(hot_title)
  308. except Exception as e:
  309. print(f"crawler_gzh_articles error:{e}")
  310. await feishu_robot.bot(
  311. title="公众号搜索任务执行完成",
  312. detail={
  313. "strategy": strategy,
  314. "execute_detail": await self.get_task_execute_result(),
  315. },
  316. )