crawler_gzh.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import time
  5. import traceback
  6. from datetime import datetime, date
  7. from typing import List, Dict
  8. from applications.api import feishu_robot
  9. from applications.crawler.wechat import weixin_search
  10. from applications.crawler.wechat import get_article_detail
  11. from applications.crawler.wechat import get_article_list_from_account
  12. from applications.pipeline import CrawlerPipeline
  13. from applications.utils import timestamp_to_str, show_desc_to_sta
  14. from applications.utils import get_hot_titles, generate_gzh_id
  15. class CrawlerGzhConst:
  16. PLATFORM = "weixin"
  17. DEFAULT_VIEW_COUNT = 0
  18. DEFAULT_LIKE_COUNT = 0
  19. DEFAULT_ARTICLE_STATUS = 1
  20. STAT_DURATION = 30 # days
  21. DEFAULT_TIMESTAMP = 1735660800
  22. class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
  23. def __init__(self, pool, log_client, trace_id):
  24. super().__init__(pool, log_client)
  25. self.trace_id = trace_id
  26. async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
  27. """get crawler accounts"""
  28. match strategy:
  29. case "V1":
  30. query = """
  31. select gh_id, account_name, latest_update_time
  32. from long_articles_accounts
  33. where account_category = %s and is_using = %s and daily_scrape = %s;
  34. """
  35. return await self.pool.async_fetch(query=query, params=(method, 1, 1))
  36. case "V2":
  37. query = """
  38. select gh_id, account_name, latest_update_time
  39. from long_articles_accounts
  40. where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s;
  41. """
  42. return await self.pool.async_fetch(query=query, params=(method, 1, 500))
  43. case _:
  44. raise Exception("strategy not supported")
  45. async def get_account_latest_update_timestamp(self, account_id: str) -> int:
  46. """get latest update time"""
  47. query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
  48. latest_timestamp_obj = await self.pool.async_fetch(
  49. query=query, params=(account_id,)
  50. )
  51. return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
  52. async def crawl_each_article(
  53. self, article_raw_data, mode, account_method, account_id
  54. ):
  55. """crawl each article"""
  56. base_item = {
  57. "platform": self.PLATFORM,
  58. "mode": mode,
  59. "crawler_time": int(time.time()),
  60. }
  61. match mode:
  62. case "account":
  63. show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
  64. show_view_count = show_stat.get(
  65. "show_view_count", self.DEFAULT_VIEW_COUNT
  66. )
  67. show_like_count = show_stat.get(
  68. "show_like_count", self.DEFAULT_LIKE_COUNT
  69. )
  70. unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
  71. new_item = {
  72. **base_item,
  73. "read_cnt": show_view_count,
  74. "like_cnt": show_like_count,
  75. "title": article_raw_data["Title"],
  76. "category": account_method,
  77. "out_account_id": account_id,
  78. "article_index": article_raw_data["ItemIndex"],
  79. "link": article_raw_data["ContentUrl"],
  80. "description": article_raw_data["Digest"],
  81. "unique_index": unique_idx,
  82. "publish_time": article_raw_data["send_time"],
  83. }
  84. case _:
  85. raise Exception(f"unknown mode: {mode}")
  86. await self.save_item_to_database(
  87. media_type="article", item=new_item, trace_id=self.trace_id
  88. )
  89. async def update_account_read_avg_info(self, gh_id, account_name):
  90. """update account read avg info"""
  91. position_list = [i for i in range(1, 9)]
  92. today_dt = date.today().isoformat()
  93. for position in position_list:
  94. query = f"""
  95. select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
  96. where out_account_id = '{gh_id}' and article_index = {position}
  97. order by publish_time desc limit {self.STAT_DURATION};
  98. """
  99. fetch_response = await self.pool.async_fetch(query=query)
  100. if fetch_response:
  101. read_cnt_list = [i["read_cnt"] for i in fetch_response]
  102. n = len(read_cnt_list)
  103. read_avg = sum(read_cnt_list) / n
  104. max_publish_dt = fetch_response[0]["publish_dt"]
  105. remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
  106. insert_query = f"""
  107. insert ignore into crawler_meta_article_accounts_read_avg
  108. (gh_id, account_name, position, read_avg, dt, status, remark)
  109. values
  110. (%s, %s, %s, %s, %s, %s, %s);
  111. """
  112. insert_rows = await self.pool.async_save(
  113. query=insert_query,
  114. params=(
  115. gh_id,
  116. account_name,
  117. position,
  118. read_avg,
  119. today_dt,
  120. 1,
  121. remark,
  122. ),
  123. )
  124. if insert_rows:
  125. update_query = f"""
  126. update crawler_meta_article_accounts_read_avg
  127. set status = %s
  128. where gh_id = %s and position = %s and dt < %s;
  129. """
  130. self.pool.async_save(update_query, (0, gh_id, position, today_dt))
  131. class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
  132. def __init__(self, pool, log_client, trace_id):
  133. super().__init__(pool, log_client, trace_id)
  134. async def insert_article_into_meta(self, gh_id, account_method, msg_list):
  135. """
  136. 将数据更新到数据库
  137. :return:
  138. """
  139. for msg in msg_list:
  140. article_list = msg["AppMsg"]["DetailInfo"]
  141. for obj in article_list:
  142. await self.crawl_each_article(
  143. article_raw_data=obj,
  144. mode="account",
  145. account_method=account_method,
  146. account_id=gh_id,
  147. )
  148. async def update_account_latest_timestamp(self, gh_id):
  149. """update the latest timestamp after crawler"""
  150. latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
  151. dt_str = timestamp_to_str(latest_timestamp)
  152. query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
  153. await self.pool.async_save(query=query, params=(dt_str, gh_id))
  154. async def crawler_single_account(self, account_method: str, account: Dict) -> None:
  155. """crawler single account"""
  156. current_cursor = None
  157. gh_id = account["gh_id"]
  158. latest_timestamp = account["latest_update_time"].timestamp()
  159. while True:
  160. # fetch response from weixin
  161. response = get_article_list_from_account(
  162. account_id=gh_id, index=current_cursor
  163. )
  164. msg_list = response.get("data", {}).get("data")
  165. if not msg_list:
  166. break
  167. # process current page
  168. await self.insert_article_into_meta(gh_id, account_method, msg_list)
  169. # whether crawl next page
  170. last_article_in_this_page = msg_list[-1]
  171. last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
  172. "BaseInfo"
  173. ]["UpdateTime"]
  174. if last_time_stamp_in_this_msg > latest_timestamp:
  175. await self.update_account_latest_timestamp(gh_id)
  176. break
  177. # update cursor for next page
  178. current_cursor = response.get("data", {}).get("next_cursor")
  179. if not current_cursor:
  180. break
  181. async def deal(self, method: str, strategy: str = "V1"):
  182. account_list = await self.get_crawler_accounts(method, strategy)
  183. for account in account_list:
  184. print(account)
  185. try:
  186. await self.crawler_single_account(method, account)
  187. await self.update_account_read_avg_info(
  188. gh_id=account["gh_id"], account_name=account["account_name"]
  189. )
  190. except Exception as e:
  191. await self.log_client.log(
  192. contents={
  193. "task": "crawler_gzh_articles",
  194. "trace_id": self.trace_id,
  195. "data": {
  196. "account_id": account["gh_id"],
  197. "account_method": method,
  198. "error": str(e),
  199. "traceback": traceback.format_exc(),
  200. },
  201. }
  202. )
  203. class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
  204. def __init__(self, pool, log_client, trace_id):
  205. super().__init__(pool, log_client, trace_id)
  206. async def search_each_title(self, title: str, page='1') -> None:
  207. """search in weixin"""
  208. search_response = await weixin_search(keyword=title, page=page)
  209. async def deal(self, date_string: str, strategy: str = "V1"):
  210. hot_titles = await get_hot_titles(self.pool, date_string=date_string)
  211. for hot_title in hot_titles:
  212. await self.search_each_title(hot_title)
  213. #
  214. #
  215. # if __name__ == "__main__":
  216. # import asyncio
  217. # response = asyncio.run(weixin_search(keyword="南京照相馆"))
  218. # print(json.dumps(response, ensure_ascii=False, indent=4))