crawler_gzh.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. from __future__ import annotations
  2. import asyncio
  3. import time, json
  4. import traceback
  5. from datetime import datetime, date, timedelta
  6. from typing import List, Dict
  7. from tqdm.asyncio import tqdm
  8. from applications.api import feishu_robot
  9. from applications.crawler.wechat import weixin_search
  10. from applications.crawler.wechat import get_article_detail
  11. from applications.crawler.wechat import get_article_list_from_account
  12. from applications.pipeline import CrawlerPipeline
  13. from applications.utils import timestamp_to_str, show_desc_to_sta
  14. from applications.utils import get_hot_titles, generate_gzh_id
  15. class CrawlerGzhConst:
  16. PLATFORM = "weixin"
  17. DEFAULT_VIEW_COUNT = 0
  18. DEFAULT_LIKE_COUNT = 0
  19. DEFAULT_ARTICLE_STATUS = 1
  20. MAX_DEPTH = 3
  21. #
  22. SLEEP_SECONDS = 1
  23. STAT_DURATION = 30 # days
  24. DEFAULT_TIMESTAMP = 1735660800
  25. DAILY_SCRAPE_POSTIVE = 1
  26. DAILY_SCRAPE_NEGATIVE = 0
  27. USING_STATUS = 1
  28. NOT_USING_STATUS = 0
  29. CRAWL_ACCOUNT_FIRST_LEVEL = 500
  30. class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
  31. def __init__(self, pool, log_client, trace_id):
  32. super().__init__(pool, log_client)
  33. self.trace_id = trace_id
  34. async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
  35. """get crawler accounts"""
  36. match strategy:
  37. case "V1":
  38. query = """
  39. select gh_id, account_name, latest_update_time
  40. from long_articles_accounts
  41. where account_category = %s and is_using = %s and daily_scrape = %s;
  42. """
  43. return await self.pool.async_fetch(
  44. query=query,
  45. params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE),
  46. )
  47. case "V2":
  48. query = """
  49. select gh_id, account_name, latest_update_time
  50. from long_articles_accounts
  51. where account_category = %s and is_using = %s
  52. order by recent_score_ci_lower desc limit %s;
  53. """
  54. return await self.pool.async_fetch(
  55. query=query,
  56. params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL),
  57. )
  58. case _:
  59. raise Exception("strategy not supported")
  60. async def get_account_latest_update_timestamp(self, account_id: str) -> int:
  61. """get latest update time"""
  62. query = """
  63. select max(publish_time) as publish_time
  64. from crawler_meta_article where out_account_id = %s;
  65. """
  66. fetch_response = await self.pool.async_fetch(query=query, params=(account_id,))
  67. return next((item.get("publish_time") for item in fetch_response or []), None)
  68. async def crawl_each_article(
  69. self, article_raw_data, mode, account_method, account_id, source_title=None
  70. ):
  71. """crawl each article"""
  72. base_item = {
  73. "platform": self.PLATFORM,
  74. "mode": mode,
  75. "crawler_time": int(time.time()),
  76. "category": account_method,
  77. }
  78. match mode:
  79. case "account":
  80. show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
  81. show_view_count = show_stat.get(
  82. "show_view_count", self.DEFAULT_VIEW_COUNT
  83. )
  84. show_like_count = show_stat.get(
  85. "show_like_count", self.DEFAULT_LIKE_COUNT
  86. )
  87. unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
  88. new_item = {
  89. **base_item,
  90. "read_cnt": show_view_count,
  91. "like_cnt": show_like_count,
  92. "title": article_raw_data["Title"],
  93. "out_account_id": account_id,
  94. "article_index": article_raw_data["ItemIndex"],
  95. "link": article_raw_data["ContentUrl"],
  96. "description": article_raw_data["Digest"],
  97. "unique_index": unique_idx,
  98. "publish_time": article_raw_data["send_time"],
  99. }
  100. case "search":
  101. new_item = {
  102. **base_item,
  103. "out_account_id": account_id,
  104. "article_index": article_raw_data["item_index"],
  105. "title": article_raw_data["title"],
  106. "link": article_raw_data["content_link"],
  107. "like_cnt": article_raw_data.get(
  108. "like_count", self.DEFAULT_LIKE_COUNT
  109. ),
  110. "read_cnt": article_raw_data.get(
  111. "view_count", self.DEFAULT_VIEW_COUNT
  112. ),
  113. "publish_time": int(article_raw_data["publish_timestamp"] / 1000),
  114. "unique_index": generate_gzh_id(article_raw_data["content_link"]),
  115. "source_article_title": source_title,
  116. }
  117. case _:
  118. raise Exception(f"unknown mode: {mode}")
  119. await self.save_item_to_database(
  120. media_type="article", item=new_item, trace_id=self.trace_id
  121. )
  122. # await asyncio.sleep(self.SLEEP_SECONDS)
  123. async def update_account_read_avg_info(self, gh_id, account_name):
  124. """update account read avg info"""
  125. position_list = [i for i in range(1, 9)]
  126. today_dt = date.today().isoformat()
  127. for position in position_list:
  128. query = f"""
  129. select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
  130. where out_account_id = '{gh_id}' and article_index = {position}
  131. order by publish_time desc limit {self.STAT_DURATION};
  132. """
  133. fetch_response = await self.pool.async_fetch(query=query)
  134. if fetch_response:
  135. read_cnt_list = [i["read_cnt"] for i in fetch_response]
  136. n = len(read_cnt_list)
  137. read_avg = sum(read_cnt_list) / n
  138. max_publish_dt = fetch_response[0]["publish_dt"]
  139. remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
  140. insert_query = """
  141. insert ignore into crawler_meta_article_accounts_read_avg
  142. (gh_id, account_name, position, read_avg, dt, status, remark)
  143. values
  144. (%s, %s, %s, %s, %s, %s, %s);
  145. """
  146. insert_rows = await self.pool.async_save(
  147. query=insert_query,
  148. params=(
  149. gh_id,
  150. account_name,
  151. position,
  152. read_avg,
  153. today_dt,
  154. self.USING_STATUS,
  155. remark,
  156. ),
  157. )
  158. if insert_rows:
  159. update_query = """
  160. update crawler_meta_article_accounts_read_avg
  161. set status = %s
  162. where gh_id = %s and position = %s and dt < %s;
  163. """
  164. await self.pool.async_save(
  165. update_query, (self.NOT_USING_STATUS, gh_id, position, today_dt)
  166. )
  167. async def get_hot_titles_with_strategy(self, strategy):
  168. """get hot titles with strategy"""
  169. match strategy:
  170. case "V1":
  171. position = 3
  172. read_times_threshold = 1.21
  173. timedelta_days = 3
  174. case "V2":
  175. position = 2
  176. read_times_threshold = 1.1
  177. timedelta_days = 5
  178. case _:
  179. raise Exception(f"unknown strategy: {strategy}")
  180. date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
  181. "%Y%m%d"
  182. )
  183. return await get_hot_titles(
  184. self.pool,
  185. date_string=date_string,
  186. position=position,
  187. read_times_threshold=read_times_threshold,
  188. )
  189. class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
  190. def __init__(self, pool, log_client, trace_id):
  191. super().__init__(pool, log_client, trace_id)
  192. async def insert_article_into_meta(self, gh_id, account_method, msg_list):
  193. """
  194. 将数据更新到数据库
  195. :return:
  196. """
  197. for msg in msg_list:
  198. article_list = msg["AppMsg"]["DetailInfo"]
  199. for obj in article_list:
  200. await self.crawl_each_article(
  201. article_raw_data=obj,
  202. mode="account",
  203. account_method=account_method,
  204. account_id=gh_id,
  205. )
  206. async def update_account_latest_timestamp(self, gh_id):
  207. """update the latest timestamp after crawler"""
  208. latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
  209. dt_str = timestamp_to_str(latest_timestamp)
  210. query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
  211. await self.pool.async_save(query=query, params=(dt_str, gh_id))
  212. async def crawler_single_account(self, account_method: str, account: Dict) -> None:
  213. """crawler single account"""
  214. current_cursor = None
  215. gh_id = account["gh_id"]
  216. latest_timestamp = account["latest_update_time"].timestamp()
  217. while True:
  218. # fetch response from weixin
  219. response = await get_article_list_from_account(
  220. account_id=gh_id, index=current_cursor
  221. )
  222. msg_list = response.get("data", {}).get("data")
  223. if not msg_list:
  224. break
  225. # process current page
  226. await self.insert_article_into_meta(gh_id, account_method, msg_list)
  227. # whether crawl next page
  228. last_article_in_this_page = msg_list[-1]
  229. last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
  230. "BaseInfo"
  231. ]["UpdateTime"]
  232. if last_time_stamp_in_this_msg > latest_timestamp:
  233. await self.update_account_latest_timestamp(gh_id)
  234. break
  235. # update cursor for next page
  236. current_cursor = response.get("data", {}).get("next_cursor")
  237. if not current_cursor:
  238. break
  239. async def deal(self, method: str, strategy: str = "V1"):
  240. account_list = await self.get_crawler_accounts(method, strategy)
  241. for account in tqdm(account_list, desc="抓取单个账号"):
  242. print(f"{datetime.now()}: start crawling account: {account}")
  243. try:
  244. await self.crawler_single_account(method, account)
  245. await self.update_account_read_avg_info(
  246. gh_id=account["gh_id"], account_name=account["account_name"]
  247. )
  248. except Exception as e:
  249. await self.log_client.log(
  250. contents={
  251. "task": "crawler_gzh_articles",
  252. "trace_id": self.trace_id,
  253. "data": {
  254. "account_id": account["gh_id"],
  255. "account_method": method,
  256. "error": str(e),
  257. "traceback": traceback.format_exc(),
  258. },
  259. }
  260. )
  261. print(f"{datetime.now()}: finish crawled account: {account}")
  262. class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
  263. def __init__(self, pool, log_client, trace_id):
  264. super().__init__(pool, log_client, trace_id)
  265. async def crawl_search_articles_detail(
  266. self, article_list: List[Dict], source_title: str
  267. ):
  268. """
  269. @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
  270. """
  271. for article in tqdm(article_list, desc="获取搜索结果详情"):
  272. print(f"{datetime.now()}: start crawling article: {article['title']}")
  273. url = article["url"]
  274. detail_response = await get_article_detail(
  275. url, is_count=True, is_cache=False
  276. )
  277. if not detail_response:
  278. continue
  279. article_data = detail_response.get("data")
  280. if not article_data:
  281. continue
  282. if type(article_data) is not dict:
  283. continue
  284. article_detail = article_data.get("data")
  285. if not article_detail:
  286. continue
  287. await self.crawl_each_article(
  288. article_raw_data=article_detail,
  289. mode="search",
  290. account_method="search",
  291. account_id="search",
  292. source_title=source_title,
  293. )
  294. await asyncio.sleep(self.SLEEP_SECONDS)
  295. async def search_each_title(self, title: str, page: str = "1") -> None:
  296. """search in weixin"""
  297. current_page = page
  298. while True:
  299. # 翻页不超过3页
  300. if int(current_page) > self.MAX_DEPTH:
  301. break
  302. # 调用搜索接口
  303. search_response = await weixin_search(keyword=title, page=page)
  304. if not search_response:
  305. break
  306. article_list = search_response.get("data", {}).get("data")
  307. if not article_list:
  308. break
  309. # 存储搜索结果
  310. await self.crawl_search_articles_detail(article_list, title)
  311. # 判断是否还有下一页
  312. has_more = search_response.get("data", {}).get("has_more")
  313. if not has_more:
  314. break
  315. # 更新page
  316. current_page = search_response.get("data", {}).get("next_cursor")
  317. async def get_task_execute_result(self):
  318. """get task execute result"""
  319. query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;"""
  320. return await self.pool.async_fetch(query=query, params=(self.trace_id,))
  321. async def deal(self, strategy: str = "V1"):
  322. hot_titles = await self.get_hot_titles_with_strategy(strategy)
  323. for hot_title in tqdm(hot_titles, desc="在微信内搜索文章"):
  324. print(f"{datetime.now()}: start searching hot title: {hot_title}")
  325. try:
  326. await self.search_each_title(hot_title)
  327. except Exception as e:
  328. print(
  329. f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}"
  330. )
  331. print(f"{datetime.now()}: finish searched hot title: {hot_title}")
  332. await feishu_robot.bot(
  333. title="公众号搜索任务执行完成",
  334. detail={
  335. "strategy": strategy,
  336. "execute_detail": await self.get_task_execute_result(),
  337. },
  338. )