crawler_toutiao.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import time
  5. import aiohttp
  6. import traceback
  7. from datetime import datetime
  8. from typing import List, Dict
  9. from tqdm import tqdm
  10. from applications.api import feishu_robot
  11. from applications.crawler.toutiao import get_toutiao_account_info_list
  12. from applications.crawler.toutiao import search_in_toutiao
  13. from applications.crawler.toutiao import get_toutiao_detail
  14. from applications.pipeline import CrawlerPipeline
  15. from applications.utils import async_proxy, get_top_article_title_list
  16. class CrawlerToutiaoConst:
  17. # platform
  18. PLATFORM = "toutiao"
  19. # account status
  20. TOUTIAO_ACCOUNT_GOOD_STATUS = 1
  21. TOUTIAO_ACCOUNT_BAD_STATUS = 0
  22. # earliest cursor, 2021-01-01 00:00:00
  23. DEFAULT_CURSOR = 1609430400
  24. # no source account
  25. NO_SOURCE_ACCOUNT_STATUS = 0
  26. # title length min
  27. MIN_TITLE_LENGTH = 10
  28. # max video length(second)
  29. MAX_VIDEO_LENGTH = 600
  30. # sleep second
  31. SLEEP_SECOND = 3
  32. RECOMMEND_TIMES = 10
  33. # 文章模态
  34. ARTICLE_TYPE = 1
  35. class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
  36. def __init__(self, pool, log_client, trace_id):
  37. super().__init__(pool, log_client)
  38. self.trace_id = trace_id
  39. async def get_request_params(self, category):
  40. """
  41. get request params
  42. """
  43. query = f"""
  44. select request_method, request_url, request_headers, post_data
  45. from toutiao_request_params
  46. where category = %s and expire_flag = %s
  47. order by id desc limit 1;
  48. """
  49. response = await self.pool.async_fetch(query=query, params=(category, 0))
  50. if not response:
  51. now = datetime.now()
  52. if 10 < now.hour < 21:
  53. await feishu_robot.bot(
  54. title="今日头条推荐流,cookie 过期",
  55. detail={"info": "cookie expired"},
  56. )
  57. return None
  58. else:
  59. return response[0]
  60. async def get_account_list(self, media_type: str) -> List[dict]:
  61. """get toutiao account list"""
  62. match media_type:
  63. case "video":
  64. table = "video_meta_accounts"
  65. case "article":
  66. table = "article_meta_accounts"
  67. case _:
  68. return []
  69. # fetch query
  70. query = f"""
  71. select account_id, max_cursor
  72. from {table}
  73. where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
  74. """
  75. response = await self.pool.async_fetch(query)
  76. await self.log_client.log(
  77. contents={
  78. "trace_id": self.trace_id,
  79. "task": "crawler_toutiao",
  80. "function": "get_account_list",
  81. "message": f"get toutiao account list, media_type: {media_type}",
  82. "status": "success",
  83. "data": response,
  84. }
  85. )
  86. if not response:
  87. await feishu_robot.bot(
  88. title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
  89. detail={"platform": self.PLATFORM, "error": "获取账号异常"},
  90. )
  91. return []
  92. else:
  93. return response
  94. async def crawler_each_account_info_list(
  95. self,
  96. account_id: str,
  97. media_type: str,
  98. max_cursor: int | None,
  99. max_behot_time: int = 0,
  100. ):
  101. """
  102. account_id: toutiao account id
  103. max_cursor: crawler latest cursor for each account
  104. max_behot_time: max behot time from toutiao, use to switch to next page
  105. """
  106. has_more = True
  107. current_cursor = max_behot_time
  108. max_cursor = max_cursor or self.DEFAULT_CURSOR
  109. cookie = await self.get_config_value(
  110. key="toutiao_blogger_cookie", output_type="string"
  111. )
  112. while has_more:
  113. response = await get_toutiao_account_info_list(
  114. account_id=account_id,
  115. cookie=cookie,
  116. media_type=media_type,
  117. max_behot_time=current_cursor,
  118. )
  119. if not response:
  120. break
  121. if response["message"] != "success":
  122. break
  123. info_list = response["data"]
  124. has_more = response["has_more"]
  125. current_cursor = response["next"]["max_behot_time"]
  126. if not info_list:
  127. break
  128. max_timestamp_in_this_group = info_list[0]["publish_time"]
  129. if max_timestamp_in_this_group < max_cursor:
  130. break
  131. # do crawler
  132. match media_type:
  133. case "video":
  134. bar_description = "crawler videos"
  135. case "article":
  136. bar_description = "crawler articles"
  137. case _:
  138. raise Exception(f"unknown media type: {media_type}")
  139. crawler_info_list_bar = tqdm(info_list, desc=bar_description)
  140. for info in crawler_info_list_bar:
  141. try:
  142. crawler_info_list_bar.set_postfix({"id": info["id"]})
  143. match media_type:
  144. case "video":
  145. await self.crawler_each_video(info)
  146. case "article":
  147. await self.crawler_each_article(
  148. method="account", article_raw_data=info
  149. )
  150. case _:
  151. raise Exception(f"unknown media type: {media_type}")
  152. except Exception as e:
  153. raise Exception(f"crawler each info failed: {e}")
  154. if has_more:
  155. time.sleep(self.SLEEP_SECOND)
  156. else:
  157. break
  158. async def crawler_each_account(self, account_name, account_id, media_type, cookie):
  159. """
  160. get toutiao account info
  161. """
  162. new_account_item = {
  163. "account_name": account_name,
  164. "account_id": account_id,
  165. "platform": self.PLATFORM,
  166. "crawler_date": datetime.now().strftime("%Y-%m-%d"),
  167. "media_type": media_type,
  168. }
  169. # get title_list
  170. response = await get_toutiao_account_info_list(
  171. account_id=account_id,
  172. cookie=cookie,
  173. media_type="article"
  174. )
  175. if not response:
  176. return
  177. article_raw_data = response["data"]
  178. title_list = [i['title'] for i in article_raw_data]
  179. new_account_item["title_list"] = json.dumps(title_list, ensure_ascii=False)
  180. await self.save_item_to_database(media_type="account", item=new_account_item, trace_id=self.trace_id)
  181. async def crawler_each_article(self, method, article_raw_data, category=None):
  182. """
  183. crawler each article
  184. """
  185. # 公共字段提取
  186. base_item = {
  187. "platform": self.PLATFORM,
  188. "mode": method,
  189. "out_account_id": article_raw_data["user_info"]["user_id"],
  190. "title": article_raw_data["title"],
  191. "read_cnt": article_raw_data["read_count"],
  192. "like_cnt": article_raw_data["like_count"],
  193. "publish_time": article_raw_data["publish_time"],
  194. "crawler_time": int(time.time()),
  195. }
  196. match method:
  197. case "account":
  198. new_article_item = {
  199. **base_item,
  200. "category": "toutiao_account_association",
  201. "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
  202. "description": article_raw_data["abstract"],
  203. "unique_index": article_raw_data["group_id"],
  204. }
  205. case "recommend":
  206. new_article_item = {
  207. **base_item,
  208. "category": category,
  209. "title": article_raw_data["title"],
  210. "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
  211. "description": article_raw_data["Abstract"],
  212. "unique_index": article_raw_data["item_id"],
  213. }
  214. case _:
  215. raise Exception(f"unknown method: {method}")
  216. await self.save_item_to_database(media_type="article", item=new_article_item, trace_id=self.trace_id)
  217. async def crawler_each_video(self, video_raw_data):
  218. pass
  219. async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
  220. """
  221. update account max cursor
  222. """
  223. match media_type:
  224. case "video":
  225. query = f"""
  226. select max(publish_timestamp) as max_cursor
  227. from publish_single_video_source
  228. where out_account_id = %s and platform = %s;
  229. """
  230. table = "video_meta_accounts"
  231. case "article":
  232. query = f"""
  233. select max(publish_time) as max_cursor
  234. from crawler_meta_article
  235. where out_account_id = %s and platform = %s;
  236. """
  237. table = "article_meta_accounts"
  238. case _:
  239. raise Exception(f"unknown media type: {media_type}")
  240. response = await self.pool.async_fetch(
  241. query, params=(account_id, self.PLATFORM)
  242. )
  243. max_publish_timestamp = response[0]["max_cursor"]
  244. if max_publish_timestamp:
  245. query = f"""
  246. update {table}
  247. set max_cursor = %s
  248. where account_id = %s and platform = %s;
  249. """
  250. await self.pool.async_save(
  251. query, (max_publish_timestamp, account_id, self.PLATFORM)
  252. )
  253. # 获取个人主页文章/视频
  254. async def crawler_task(self, media_type: str) -> None:
  255. """
  256. class entrance
  257. """
  258. account_list = await self.get_account_list(media_type=media_type)
  259. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  260. for account in account_list_bar:
  261. account_id = account["account_id"]
  262. max_cursor = account["max_cursor"]
  263. try:
  264. account_list_bar.set_postfix({"account_id": account_id})
  265. await self.crawler_each_account_info_list(
  266. account_id=account_id, media_type=media_type, max_cursor=max_cursor
  267. )
  268. await self.update_account_max_cursor(
  269. media_type=media_type, account_id=account_id
  270. )
  271. await self.log_client.log(
  272. contents={
  273. "trace_id": self.trace_id,
  274. "task": "crawler_toutiao_account_info",
  275. "function": "crawler_task",
  276. "message": f"crawler account: {account_id} successfully, media type: {media_type}",
  277. "status": "success",
  278. }
  279. )
  280. except Exception as e:
  281. await self.log_client.log(
  282. contents={
  283. "trace_id": self.trace_id,
  284. "task": "crawler_toutiao_account_info",
  285. "function": "crawler_task",
  286. "message": f"crawler_account: {account_id} fail",
  287. "status": "fail",
  288. "data": {
  289. "media_type": media_type,
  290. "error": str(e),
  291. "traceback": traceback.format_exc(),
  292. },
  293. }
  294. )
  295. async def crawler_recommend_articles(self, category: str) -> None:
  296. cookie = await self.get_request_params(category=category)
  297. if not cookie:
  298. return
  299. for crawler_time in range(self.RECOMMEND_TIMES):
  300. try:
  301. proxy_url = async_proxy()["url"]
  302. proxy_auth = aiohttp.BasicAuth(
  303. async_proxy()["username"], async_proxy()["password"]
  304. )
  305. async with aiohttp.ClientSession() as session:
  306. async with session.request(
  307. method=cookie["request_method"],
  308. url=cookie["request_url"],
  309. headers=json.loads(cookie["request_headers"]),
  310. proxy=proxy_url,
  311. proxy_auth=proxy_auth,
  312. ) as response:
  313. response.raise_for_status()
  314. response_json = await response.json()
  315. await self.log_client.log(
  316. contents={
  317. "task": "crawler_toutiao",
  318. "function": "crawler_recommend_articles",
  319. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  320. "trace_id": self.trace_id,
  321. "status": "success",
  322. "data": response_json,
  323. }
  324. )
  325. except Exception as e:
  326. await self.log_client.log(
  327. contents={
  328. "task": "crawler_toutiao",
  329. "function": "crawler_recommend_articles",
  330. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  331. "status": "fail",
  332. "trace_id": self.trace_id,
  333. "data": {
  334. "error": str(e),
  335. "traceback": traceback.format_exc(),
  336. },
  337. }
  338. )
  339. continue
  340. if not response_json:
  341. continue
  342. article_list = response_json["data"]
  343. for article in article_list:
  344. if article.get("article_url"):
  345. video_flag = article.get("has_video")
  346. if not video_flag:
  347. try:
  348. await self.crawler_each_article(
  349. method="recommend",
  350. article_raw_data=article,
  351. category=category,
  352. )
  353. except Exception as e:
  354. print(f"crawler_recommend_articles error: {e}")
  355. else:
  356. print("this is an video rather than article")
  357. continue
  358. else:
  359. continue
  360. # 抓推荐流
  361. async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
  362. if not category_list:
  363. category_list = ["finance", "tech", "history", "entertainment"]
  364. for category in category_list:
  365. await self.crawler_recommend_articles(category=category)
  366. # 搜索抓账号
  367. async def search_candidate_accounts(self):
  368. top_title_list = await get_top_article_title_list(pool=self.pool)
  369. cookie = await self.get_config_value(
  370. key="toutiao_blogger_cookie", output_type="string"
  371. )
  372. for article in top_title_list:
  373. title = article["title"]
  374. try:
  375. search_response = await search_in_toutiao(keyword=title)
  376. if not search_response:
  377. continue
  378. article_list = search_response["data"]["data"]
  379. for search_article in article_list:
  380. try:
  381. article_url = search_article["article_url"]
  382. account_name = search_article["source"]
  383. if not (
  384. article_url
  385. and account_name
  386. and "toutiao.com" in article_url
  387. ):
  388. continue
  389. article_detail = await get_toutiao_detail(article_url)
  390. if not article_detail:
  391. continue
  392. account_id = (
  393. article_detail.get("data", {})
  394. .get("data", {})
  395. .get("channel_account_id")
  396. )
  397. if account_id:
  398. await self.crawler_each_account(
  399. account_name, account_id, self.ARTICLE_TYPE, cookie
  400. )
  401. await asyncio.sleep(1)
  402. except Exception as e:
  403. await self.log_client.log(
  404. contents={
  405. "task": "crawler_toutiao",
  406. "function": "search_candidate_accounts",
  407. "trace_id": self.trace_id,
  408. "message": "crawler_account fail",
  409. "status": "fail",
  410. "date": {
  411. "error": str(e),
  412. "traceback": traceback.format_exc(),
  413. "article_info": article,
  414. },
  415. }
  416. )
  417. await asyncio.sleep(5)
  418. except Exception as e:
  419. await self.log_client.log(
  420. contents={
  421. "task": "crawler_toutiao",
  422. "function": "search_candidate_accounts",
  423. "trace_id": self.trace_id,
  424. "message": "search_in_toutiao failed",
  425. "status": "fail",
  426. "data": {
  427. "error": str(e),
  428. "traceback": traceback.format_exc(),
  429. },
  430. }
  431. )