crawler_toutiao.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import traceback
  5. from typing import List, Dict
  6. from tqdm import tqdm
  7. from applications.api import feishu_robot
  8. from applications.crawler.toutiao import get_toutiao_account_info_list
  9. from applications.pipeline import CrawlerPipeline
  10. class CrawlerToutiaoConst:
  11. # platform
  12. PLATFORM = "toutiao"
  13. # account status
  14. TOUTIAO_ACCOUNT_GOOD_STATUS = 1
  15. TOUTIAO_ACCOUNT_BAD_STATUS = 0
  16. # earliest cursor, 2021-01-01 00:00:00
  17. DEFAULT_CURSOR = 1609430400
  18. # no source account
  19. NO_SOURCE_ACCOUNT_STATUS = 0
  20. # title length min
  21. MIN_TITLE_LENGTH = 10
  22. # max video length(second)
  23. MAX_VIDEO_LENGTH = 600
  24. # sleep second
  25. SLEEP_SECOND = 3
  26. class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
  27. def __init__(self, pool, log_client):
  28. super().__init__(pool, log_client)
  29. async def get_account_list(self, media_type: str) -> List[dict]:
  30. """get toutiao account list"""
  31. match media_type:
  32. case "video":
  33. table = "video_meta_accounts"
  34. case "article":
  35. table = "article_meta_accounts"
  36. case _:
  37. return []
  38. # fetch query
  39. query = f"""
  40. select account_id, max_cursor
  41. from {table}
  42. where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
  43. """
  44. response = await self.pool.async_fetch(query)
  45. if not response:
  46. await feishu_robot.bot(
  47. title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
  48. detail={"platform": self.PLATFORM, "error": "获取账号异常"},
  49. )
  50. return []
  51. else:
  52. return response
  53. async def crawler_each_account_info_list(
  54. self,
  55. account_id: str,
  56. media_type: str,
  57. max_cursor: int | None,
  58. max_behot_time: int = 0,
  59. ):
  60. """
  61. account_id: toutiao account id
  62. max_cursor: crawler latest cursor for each account
  63. max_behot_time: max behot time from toutiao, use to switch to next page
  64. """
  65. has_more = True
  66. current_cursor = max_behot_time
  67. max_cursor = max_cursor or self.DEFAULT_CURSOR
  68. cookie = await self.get_config_value(
  69. key="toutiao_blogger_cookie", output_type="string"
  70. )
  71. while has_more:
  72. response = await get_toutiao_account_info_list(
  73. account_id=account_id,
  74. cookie=cookie,
  75. media_type=media_type,
  76. max_behot_time=current_cursor,
  77. )
  78. if not response:
  79. break
  80. if response["message"] != "success":
  81. break
  82. info_list = response["data"]
  83. has_more = response["has_more"]
  84. current_cursor = response["next"]["max_behot_time"]
  85. if not info_list:
  86. break
  87. max_timestamp_in_this_group = info_list[0]["publish_time"]
  88. if max_timestamp_in_this_group < max_cursor:
  89. break
  90. # do crawler
  91. match media_type:
  92. case "video":
  93. bar_description = "crawler videos"
  94. case "article":
  95. bar_description = "crawler articles"
  96. case _:
  97. raise Exception(f"unknown media type: {media_type}")
  98. crawler_info_list_bar = tqdm(info_list, desc=bar_description)
  99. for info in crawler_info_list_bar:
  100. try:
  101. crawler_info_list_bar.set_postfix({"id": info["id"]})
  102. match media_type:
  103. case "video":
  104. await self.crawler_each_video(info)
  105. case "article":
  106. await self.crawler_each_article(info)
  107. case _:
  108. raise Exception(f"unknown media type: {media_type}")
  109. except Exception as e:
  110. raise Exception(f"crawler each info failed: {e}")
  111. if has_more:
  112. time.sleep(self.SLEEP_SECOND)
  113. else:
  114. break
  115. async def crawler_each_article(self, article_raw_data):
  116. """
  117. crawler each article
  118. """
  119. new_article_item = {
  120. "platform": self.PLATFORM,
  121. "mode": "account",
  122. "category": "toutiao_account_association",
  123. "out_account_id": article_raw_data["user_info"]["user_id"],
  124. "title": article_raw_data["title"],
  125. "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
  126. "read_cnt": article_raw_data["read_count"],
  127. "like_cnt": article_raw_data["like_count"],
  128. "description": article_raw_data["abstract"],
  129. "publish_time": article_raw_data["publish_time"],
  130. "unique_index": article_raw_data["group_id"],
  131. }
  132. await self.save_item_to_database(media_type="article", item=new_article_item)
  133. async def crawler_each_video(self, video_raw_data):
  134. pass
  135. async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
  136. """
  137. update account max cursor
  138. """
  139. match media_type:
  140. case "video":
  141. query = f"""
  142. select max(publish_timestamp) as max_cursor
  143. from publish_single_video_source
  144. where out_account_id = %s and platform = %s;
  145. """
  146. table = "video_meta_accounts"
  147. case "article":
  148. query = f"""
  149. select max(publish_time) as max_cursor
  150. from crawler_meta_article
  151. where out_account_id = %s and platform = %s;
  152. """
  153. table = "article_meta_accounts"
  154. case _:
  155. raise Exception(f"unknown media type: {media_type}")
  156. response = await self.pool.async_fetch(
  157. query, params=(account_id, self.PLATFORM)
  158. )
  159. max_publish_timestamp = response[0]["max_cursor"]
  160. if max_publish_timestamp:
  161. query = f"""
  162. update {table}
  163. set max_cursor = %s
  164. where account_id = %s and platform = %s;
  165. """
  166. await self.pool.async_save(
  167. query, (max_publish_timestamp, account_id, self.PLATFORM)
  168. )
  169. async def crawler_task(self, media_type: str) -> None:
  170. """
  171. class entrance
  172. """
  173. account_list = await self.get_account_list(media_type=media_type)
  174. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  175. for account in account_list_bar:
  176. account_id = account["account_id"]
  177. max_cursor = account["max_cursor"]
  178. try:
  179. account_list_bar.set_postfix({"account_id": account_id})
  180. await self.crawler_each_account_info_list(
  181. account_id=account_id, media_type=media_type, max_cursor=max_cursor
  182. )
  183. await self.update_account_max_cursor(
  184. media_type=media_type, account_id=account_id
  185. )
  186. except Exception as e:
  187. await self.log_client.log(
  188. contents={
  189. "task": "crawler_toutiao_account_info",
  190. "function": "crawler_task",
  191. "message": account_id,
  192. "data": {
  193. "media_type": media_type,
  194. "error": str(e),
  195. "traceback": traceback.format_exc(),
  196. },
  197. }
  198. )