crawler_toutiao.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import requests
  5. import traceback
  6. from datetime import datetime
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from applications.api import feishu_robot
  10. from applications.crawler.toutiao import get_toutiao_account_info_list
  11. from applications.pipeline import CrawlerPipeline
  12. from applications.utils import proxy
  13. class CrawlerToutiaoConst:
  14. # platform
  15. PLATFORM = "toutiao"
  16. # account status
  17. TOUTIAO_ACCOUNT_GOOD_STATUS = 1
  18. TOUTIAO_ACCOUNT_BAD_STATUS = 0
  19. # earliest cursor, 2021-01-01 00:00:00
  20. DEFAULT_CURSOR = 1609430400
  21. # no source account
  22. NO_SOURCE_ACCOUNT_STATUS = 0
  23. # title length min
  24. MIN_TITLE_LENGTH = 10
  25. # max video length(second)
  26. MAX_VIDEO_LENGTH = 600
  27. # sleep second
  28. SLEEP_SECOND = 3
  29. class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
  30. def __init__(self, pool, log_client, trace_id):
  31. super().__init__(pool, log_client)
  32. self.trace_id = trace_id
  33. async def get_request_params(self, category):
  34. """
  35. get request params
  36. """
  37. query = f"""
  38. select request_method, request_url, request_headers, post_data
  39. from toutiao_request_params
  40. where category = %s and expire_flag = %s
  41. order by id desc limit 1;
  42. """
  43. response = await self.pool.async_fetch(query=query, params=(category, 0))
  44. if not response:
  45. now = datetime.now()
  46. if 10 < now.hour < 21:
  47. await feishu_robot.bot(
  48. title="今日头条推荐流,cookie 过期",
  49. detail={"info": "cookie expired"},
  50. )
  51. return None
  52. else:
  53. return response[0]
  54. async def get_account_list(self, media_type: str) -> List[dict]:
  55. """get toutiao account list"""
  56. match media_type:
  57. case "video":
  58. table = "video_meta_accounts"
  59. case "article":
  60. table = "article_meta_accounts"
  61. case _:
  62. return []
  63. # fetch query
  64. query = f"""
  65. select account_id, max_cursor
  66. from {table}
  67. where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
  68. """
  69. response = await self.pool.async_fetch(query)
  70. if not response:
  71. await feishu_robot.bot(
  72. title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
  73. detail={"platform": self.PLATFORM, "error": "获取账号异常"},
  74. )
  75. return []
  76. else:
  77. return response
  78. async def crawler_each_account_info_list(
  79. self,
  80. account_id: str,
  81. media_type: str,
  82. max_cursor: int | None,
  83. max_behot_time: int = 0,
  84. ):
  85. """
  86. account_id: toutiao account id
  87. max_cursor: crawler latest cursor for each account
  88. max_behot_time: max behot time from toutiao, use to switch to next page
  89. """
  90. has_more = True
  91. current_cursor = max_behot_time
  92. max_cursor = max_cursor or self.DEFAULT_CURSOR
  93. cookie = await self.get_config_value(
  94. key="toutiao_blogger_cookie", output_type="string"
  95. )
  96. while has_more:
  97. response = await get_toutiao_account_info_list(
  98. account_id=account_id,
  99. cookie=cookie,
  100. media_type=media_type,
  101. max_behot_time=current_cursor,
  102. )
  103. if not response:
  104. break
  105. if response["message"] != "success":
  106. break
  107. info_list = response["data"]
  108. has_more = response["has_more"]
  109. current_cursor = response["next"]["max_behot_time"]
  110. if not info_list:
  111. break
  112. max_timestamp_in_this_group = info_list[0]["publish_time"]
  113. if max_timestamp_in_this_group < max_cursor:
  114. break
  115. # do crawler
  116. match media_type:
  117. case "video":
  118. bar_description = "crawler videos"
  119. case "article":
  120. bar_description = "crawler articles"
  121. case _:
  122. raise Exception(f"unknown media type: {media_type}")
  123. crawler_info_list_bar = tqdm(info_list, desc=bar_description)
  124. print(json.dumps(info_list, ensure_ascii=False, indent=4))
  125. for info in crawler_info_list_bar:
  126. try:
  127. crawler_info_list_bar.set_postfix({"id": info["id"]})
  128. match media_type:
  129. case "video":
  130. await self.crawler_each_video(info)
  131. case "article":
  132. await self.crawler_each_article(
  133. method="account", article_raw_data=info
  134. )
  135. case _:
  136. raise Exception(f"unknown media type: {media_type}")
  137. except Exception as e:
  138. raise Exception(f"crawler each info failed: {e}")
  139. if has_more:
  140. time.sleep(self.SLEEP_SECOND)
  141. else:
  142. break
  143. async def crawler_each_article(self, method, article_raw_data, category=None):
  144. """
  145. crawler each article
  146. """
  147. # 公共字段提取
  148. base_item = {
  149. "platform": self.PLATFORM,
  150. "mode": method,
  151. "out_account_id": article_raw_data["user_info"]["user_id"],
  152. "title": article_raw_data["title"],
  153. "read_cnt": article_raw_data["read_count"],
  154. "like_cnt": article_raw_data["like_count"],
  155. "publish_time": article_raw_data["publish_time"],
  156. "crawler_time": int(time.time()),
  157. }
  158. match method:
  159. case "account":
  160. new_article_item = {
  161. **base_item,
  162. "category": "toutiao_account_association",
  163. "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
  164. "description": article_raw_data["abstract"],
  165. "unique_index": article_raw_data["group_id"],
  166. }
  167. case "recommend":
  168. new_article_item = {
  169. **base_item,
  170. "category": category,
  171. "title": article_raw_data["title"],
  172. "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
  173. "description": article_raw_data["Abstract"],
  174. "unique_index": article_raw_data["item_id"],
  175. }
  176. case _:
  177. raise Exception(f"unknown method: {method}")
  178. await self.save_item_to_database(media_type="article", item=new_article_item)
  179. async def crawler_each_video(self, video_raw_data):
  180. pass
  181. async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
  182. """
  183. update account max cursor
  184. """
  185. match media_type:
  186. case "video":
  187. query = f"""
  188. select max(publish_timestamp) as max_cursor
  189. from publish_single_video_source
  190. where out_account_id = %s and platform = %s;
  191. """
  192. table = "video_meta_accounts"
  193. case "article":
  194. query = f"""
  195. select max(publish_time) as max_cursor
  196. from crawler_meta_article
  197. where out_account_id = %s and platform = %s;
  198. """
  199. table = "article_meta_accounts"
  200. case _:
  201. raise Exception(f"unknown media type: {media_type}")
  202. response = await self.pool.async_fetch(
  203. query, params=(account_id, self.PLATFORM)
  204. )
  205. max_publish_timestamp = response[0]["max_cursor"]
  206. if max_publish_timestamp:
  207. query = f"""
  208. update {table}
  209. set max_cursor = %s
  210. where account_id = %s and platform = %s;
  211. """
  212. await self.pool.async_save(
  213. query, (max_publish_timestamp, account_id, self.PLATFORM)
  214. )
  215. async def crawler_task(self, media_type: str) -> None:
  216. """
  217. class entrance
  218. """
  219. account_list = await self.get_account_list(media_type=media_type)
  220. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  221. for account in account_list_bar:
  222. account_id = account["account_id"]
  223. max_cursor = account["max_cursor"]
  224. try:
  225. account_list_bar.set_postfix({"account_id": account_id})
  226. await self.crawler_each_account_info_list(
  227. account_id=account_id, media_type=media_type, max_cursor=max_cursor
  228. )
  229. await self.update_account_max_cursor(
  230. media_type=media_type, account_id=account_id
  231. )
  232. except Exception as e:
  233. await self.log_client.log(
  234. contents={
  235. "task": "crawler_toutiao_account_info",
  236. "function": "crawler_task",
  237. "message": account_id,
  238. "data": {
  239. "media_type": media_type,
  240. "error": str(e),
  241. "traceback": traceback.format_exc(),
  242. },
  243. }
  244. )
  245. async def crawler_recommend_articles(self, category: str) -> None:
  246. cookie = await self.get_request_params(category=category)
  247. if not cookie:
  248. return
  249. for crawler_time in range(10):
  250. response = requests.request(
  251. method=cookie["request_method"],
  252. url=cookie["request_url"],
  253. headers=json.loads(cookie["request_headers"]),
  254. proxies=proxy(),
  255. )
  256. if response.text is None:
  257. continue
  258. article_list = response.json()["data"]
  259. for article in article_list:
  260. if article.get("article_url"):
  261. video_flag = article.get("has_video")
  262. if not video_flag:
  263. try:
  264. await self.crawler_each_article(
  265. method="recommend",
  266. article_raw_data=article,
  267. category=category,
  268. )
  269. except Exception as e:
  270. print(f"crawler_recommend_articles error: {e}")
  271. else:
  272. print("this is an video rather than article")
  273. continue
  274. else:
  275. continue
  276. async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
  277. if not category_list:
  278. category_list = ["finance", "tech", "history", "entertainment"]
  279. for category in category_list:
  280. await self.crawler_recommend_articles(category=category)