crawler_toutiao.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import aiohttp
  5. import traceback
  6. from datetime import datetime
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from applications.api import feishu_robot
  10. from applications.crawler.toutiao import get_toutiao_account_info_list
  11. from applications.pipeline import CrawlerPipeline
  12. from applications.utils import async_proxy
  13. class CrawlerToutiaoConst:
  14. # platform
  15. PLATFORM = "toutiao"
  16. # account status
  17. TOUTIAO_ACCOUNT_GOOD_STATUS = 1
  18. TOUTIAO_ACCOUNT_BAD_STATUS = 0
  19. # earliest cursor, 2021-01-01 00:00:00
  20. DEFAULT_CURSOR = 1609430400
  21. # no source account
  22. NO_SOURCE_ACCOUNT_STATUS = 0
  23. # title length min
  24. MIN_TITLE_LENGTH = 10
  25. # max video length(second)
  26. MAX_VIDEO_LENGTH = 600
  27. # sleep second
  28. SLEEP_SECOND = 3
  29. RECOMMEND_TIMES = 10
  30. class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
  31. def __init__(self, pool, log_client, trace_id):
  32. super().__init__(pool, log_client)
  33. self.trace_id = trace_id
  34. async def get_request_params(self, category):
  35. """
  36. get request params
  37. """
  38. query = f"""
  39. select request_method, request_url, request_headers, post_data
  40. from toutiao_request_params
  41. where category = %s and expire_flag = %s
  42. order by id desc limit 1;
  43. """
  44. response = await self.pool.async_fetch(query=query, params=(category, 0))
  45. if not response:
  46. now = datetime.now()
  47. if 10 < now.hour < 21:
  48. await feishu_robot.bot(
  49. title="今日头条推荐流,cookie 过期",
  50. detail={"info": "cookie expired"},
  51. )
  52. return None
  53. else:
  54. return response[0]
  55. async def get_account_list(self, media_type: str) -> List[dict]:
  56. """get toutiao account list"""
  57. match media_type:
  58. case "video":
  59. table = "video_meta_accounts"
  60. case "article":
  61. table = "article_meta_accounts"
  62. case _:
  63. return []
  64. # fetch query
  65. query = f"""
  66. select account_id, max_cursor
  67. from {table}
  68. where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
  69. """
  70. response = await self.pool.async_fetch(query)
  71. await self.log_client.log(
  72. contents={
  73. "trace_id": self.trace_id,
  74. "task": "crawler_toutiao",
  75. "function": "get_account_list",
  76. "message": f"get toutiao account list, media_type: {media_type}",
  77. "status": "success",
  78. "data": response
  79. }
  80. )
  81. if not response:
  82. await feishu_robot.bot(
  83. title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
  84. detail={"platform": self.PLATFORM, "error": "获取账号异常"},
  85. )
  86. return []
  87. else:
  88. return response
  89. async def crawler_each_account_info_list(
  90. self,
  91. account_id: str,
  92. media_type: str,
  93. max_cursor: int | None,
  94. max_behot_time: int = 0,
  95. ):
  96. """
  97. account_id: toutiao account id
  98. max_cursor: crawler latest cursor for each account
  99. max_behot_time: max behot time from toutiao, use to switch to next page
  100. """
  101. has_more = True
  102. current_cursor = max_behot_time
  103. max_cursor = max_cursor or self.DEFAULT_CURSOR
  104. cookie = await self.get_config_value(
  105. key="toutiao_blogger_cookie", output_type="string"
  106. )
  107. while has_more:
  108. response = await get_toutiao_account_info_list(
  109. account_id=account_id,
  110. cookie=cookie,
  111. media_type=media_type,
  112. max_behot_time=current_cursor,
  113. )
  114. if not response:
  115. break
  116. if response["message"] != "success":
  117. break
  118. info_list = response["data"]
  119. has_more = response["has_more"]
  120. current_cursor = response["next"]["max_behot_time"]
  121. if not info_list:
  122. break
  123. max_timestamp_in_this_group = info_list[0]["publish_time"]
  124. if max_timestamp_in_this_group < max_cursor:
  125. break
  126. # do crawler
  127. match media_type:
  128. case "video":
  129. bar_description = "crawler videos"
  130. case "article":
  131. bar_description = "crawler articles"
  132. case _:
  133. raise Exception(f"unknown media type: {media_type}")
  134. crawler_info_list_bar = tqdm(info_list, desc=bar_description)
  135. for info in crawler_info_list_bar:
  136. try:
  137. crawler_info_list_bar.set_postfix({"id": info["id"]})
  138. match media_type:
  139. case "video":
  140. await self.crawler_each_video(info)
  141. case "article":
  142. await self.crawler_each_article(
  143. method="account", article_raw_data=info
  144. )
  145. case _:
  146. raise Exception(f"unknown media type: {media_type}")
  147. except Exception as e:
  148. raise Exception(f"crawler each info failed: {e}")
  149. if has_more:
  150. time.sleep(self.SLEEP_SECOND)
  151. else:
  152. break
  153. async def crawler_each_article(self, method, article_raw_data, category=None):
  154. """
  155. crawler each article
  156. """
  157. # 公共字段提取
  158. base_item = {
  159. "platform": self.PLATFORM,
  160. "mode": method,
  161. "out_account_id": article_raw_data["user_info"]["user_id"],
  162. "title": article_raw_data["title"],
  163. "read_cnt": article_raw_data["read_count"],
  164. "like_cnt": article_raw_data["like_count"],
  165. "publish_time": article_raw_data["publish_time"],
  166. "crawler_time": int(time.time()),
  167. }
  168. match method:
  169. case "account":
  170. new_article_item = {
  171. **base_item,
  172. "category": "toutiao_account_association",
  173. "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
  174. "description": article_raw_data["abstract"],
  175. "unique_index": article_raw_data["group_id"],
  176. }
  177. case "recommend":
  178. new_article_item = {
  179. **base_item,
  180. "category": category,
  181. "title": article_raw_data["title"],
  182. "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
  183. "description": article_raw_data["Abstract"],
  184. "unique_index": article_raw_data["item_id"],
  185. }
  186. case _:
  187. raise Exception(f"unknown method: {method}")
  188. await self.log_client.log(
  189. contents={
  190. "task": "crawler_toutiao",
  191. "function": "crawler_each_article",
  192. "trace_id": self.trace_id,
  193. "message": "抓取文章成功",
  194. "status": "success",
  195. "data": new_article_item,
  196. }
  197. )
  198. await self.save_item_to_database(media_type="article", item=new_article_item)
  199. async def crawler_each_video(self, video_raw_data):
  200. pass
  201. async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
  202. """
  203. update account max cursor
  204. """
  205. match media_type:
  206. case "video":
  207. query = f"""
  208. select max(publish_timestamp) as max_cursor
  209. from publish_single_video_source
  210. where out_account_id = %s and platform = %s;
  211. """
  212. table = "video_meta_accounts"
  213. case "article":
  214. query = f"""
  215. select max(publish_time) as max_cursor
  216. from crawler_meta_article
  217. where out_account_id = %s and platform = %s;
  218. """
  219. table = "article_meta_accounts"
  220. case _:
  221. raise Exception(f"unknown media type: {media_type}")
  222. response = await self.pool.async_fetch(
  223. query, params=(account_id, self.PLATFORM)
  224. )
  225. max_publish_timestamp = response[0]["max_cursor"]
  226. if max_publish_timestamp:
  227. query = f"""
  228. update {table}
  229. set max_cursor = %s
  230. where account_id = %s and platform = %s;
  231. """
  232. await self.pool.async_save(
  233. query, (max_publish_timestamp, account_id, self.PLATFORM)
  234. )
  235. async def crawler_task(self, media_type: str) -> None:
  236. """
  237. class entrance
  238. """
  239. account_list = await self.get_account_list(media_type=media_type)
  240. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  241. for account in account_list_bar:
  242. account_id = account["account_id"]
  243. max_cursor = account["max_cursor"]
  244. try:
  245. account_list_bar.set_postfix({"account_id": account_id})
  246. await self.crawler_each_account_info_list(
  247. account_id=account_id, media_type=media_type, max_cursor=max_cursor
  248. )
  249. await self.update_account_max_cursor(
  250. media_type=media_type, account_id=account_id
  251. )
  252. await self.log_client.log(
  253. contents={
  254. "trace_id": self.trace_id,
  255. "task": "crawler_toutiao_account_info",
  256. "function": "crawler_task",
  257. "message": f"crawler account: {account_id} successfully, media type: {media_type}",
  258. "status": "success"
  259. }
  260. )
  261. except Exception as e:
  262. await self.log_client.log(
  263. contents={
  264. "trace_id": self.trace_id,
  265. "task": "crawler_toutiao_account_info",
  266. "function": "crawler_task",
  267. "message": f"crawler_account: {account_id} fail",
  268. "status": "fail",
  269. "data": {
  270. "media_type": media_type,
  271. "error": str(e),
  272. "traceback": traceback.format_exc(),
  273. },
  274. }
  275. )
  276. async def crawler_recommend_articles(self, category: str) -> None:
  277. cookie = await self.get_request_params(category=category)
  278. if not cookie:
  279. return
  280. for crawler_time in range(self.RECOMMEND_TIMES):
  281. try:
  282. proxy_url = async_proxy()["url"]
  283. proxy_auth = aiohttp.BasicAuth(async_proxy()["username"], async_proxy()["password"])
  284. async with aiohttp.ClientSession() as session:
  285. async with session.request(
  286. method=cookie["request_method"],
  287. url=cookie["request_url"],
  288. headers=json.loads(cookie["request_headers"]),
  289. proxy=proxy_url,
  290. proxy_auth=proxy_auth
  291. ) as response:
  292. response.raise_for_status()
  293. response_json = await response.json()
  294. await self.log_client.log(
  295. contents={
  296. "task": "crawler_toutiao",
  297. "function": "crawler_recommend_articles",
  298. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  299. "trace_id": self.trace_id,
  300. "status": "success",
  301. "data": response_json,
  302. }
  303. )
  304. except Exception as e:
  305. await self.log_client.log(
  306. contents={
  307. "task": "crawler_toutiao",
  308. "function": "crawler_recommend_articles",
  309. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  310. "status": "fail",
  311. "trace_id": self.trace_id,
  312. "data": {"error": str(e), "traceback": traceback.format_exc(),},
  313. }
  314. )
  315. continue
  316. article_list = response_json["data"]
  317. for article in article_list:
  318. if article.get("article_url"):
  319. video_flag = article.get("has_video")
  320. if not video_flag:
  321. try:
  322. await self.crawler_each_article(
  323. method="recommend",
  324. article_raw_data=article,
  325. category=category,
  326. )
  327. except Exception as e:
  328. print(f"crawler_recommend_articles error: {e}")
  329. else:
  330. print("this is an video rather than article")
  331. continue
  332. else:
  333. continue
  334. async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
  335. if not category_list:
  336. category_list = ["finance", "tech", "history", "entertainment"]
  337. for category in category_list:
  338. await self.crawler_recommend_articles(category=category)