crawler_toutiao.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. from __future__ import annotations
  2. import asyncio
  3. import time, json
  4. import aiohttp
  5. import traceback
  6. from datetime import datetime
  7. from typing import List
  8. from tqdm import tqdm
  9. from app.infra.crawler.toutiao import get_toutiao_account_info_list
  10. from app.infra.crawler.toutiao import search_in_toutiao
  11. from app.infra.crawler.toutiao import get_toutiao_detail
  12. from app.infra.external import feishu_robot
  13. from app.infra.internal import get_top_article_title_list
  14. from app.infra.shared.tools import async_proxy
  15. from app.core.pipeline import CrawlerPipeline
  16. from app.core.config import GlobalConfigSettings
  17. from app.core.database import DatabaseManager
  18. from app.core.observability import LogService
  19. class CrawlerToutiaoConst:
  20. # platform
  21. PLATFORM = "toutiao"
  22. # account status
  23. TOUTIAO_ACCOUNT_GOOD_STATUS = 1
  24. TOUTIAO_ACCOUNT_BAD_STATUS = 0
  25. # earliest cursor, 2021-01-01 00:00:00
  26. DEFAULT_CURSOR = 1609430400
  27. # no source account
  28. NO_SOURCE_ACCOUNT_STATUS = 0
  29. # title length min
  30. MIN_TITLE_LENGTH = 10
  31. # max video length(second)
  32. MAX_VIDEO_LENGTH = 600
  33. # sleep second
  34. SLEEP_SECOND = 3
  35. RECOMMEND_TIMES = 10
  36. # 文章模态
  37. ARTICLE_TYPE = 1
  38. class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
  39. def __init__(
  40. self,
  41. pool: DatabaseManager,
  42. log_client: LogService,
  43. trace_id: str,
  44. config: GlobalConfigSettings,
  45. ):
  46. super().__init__(pool, log_client, config.apollo)
  47. self.trace_id = trace_id
  48. async def get_request_params(self, category):
  49. """
  50. get request params
  51. """
  52. query = f"""
  53. select request_method, request_url, request_headers, post_data
  54. from toutiao_request_params
  55. where category = %s and expire_flag = %s
  56. order by id desc limit 1;
  57. """
  58. response = await self.pool.async_fetch(query=query, params=(category, 0))
  59. if not response:
  60. now = datetime.now()
  61. if 10 < now.hour < 21:
  62. await feishu_robot.bot(
  63. title="今日头条推荐流,cookie 过期",
  64. detail={"info": "cookie expired"},
  65. )
  66. return None
  67. else:
  68. return response[0]
  69. async def get_account_list(self, media_type: str) -> List[dict]:
  70. """get toutiao account list"""
  71. match media_type:
  72. case "video":
  73. table = "video_meta_accounts"
  74. case "article":
  75. table = "article_meta_accounts"
  76. case _:
  77. return []
  78. # fetch query
  79. query = f"""
  80. select account_id, max_cursor
  81. from {table}
  82. where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
  83. """
  84. response = await self.pool.async_fetch(query)
  85. await self.log_client.log(
  86. contents={
  87. "trace_id": self.trace_id,
  88. "task": "crawler_toutiao",
  89. "function": "get_account_list",
  90. "message": f"get toutiao account list, media_type: {media_type}",
  91. "status": "success",
  92. "data": response,
  93. }
  94. )
  95. if not response:
  96. await feishu_robot.bot(
  97. title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
  98. detail={"platform": self.PLATFORM, "error": "获取账号异常"},
  99. )
  100. return []
  101. else:
  102. return response
  103. async def crawler_each_account_info_list(
  104. self,
  105. account_id: str,
  106. media_type: str,
  107. max_cursor: int | None,
  108. max_behot_time: int = 0,
  109. ):
  110. """
  111. account_id: toutiao account id
  112. max_cursor: crawler latest cursor for each account
  113. max_behot_time: max behot time from toutiao, use to switch to next page
  114. """
  115. has_more = True
  116. current_cursor = max_behot_time
  117. max_cursor = max_cursor or self.DEFAULT_CURSOR
  118. cookie = await self.apollo_client.get_config_value(
  119. key="toutiao_blogger_cookie", output_type="string"
  120. )
  121. while has_more:
  122. response = await get_toutiao_account_info_list(
  123. account_id=account_id,
  124. cookie=cookie,
  125. media_type=media_type,
  126. max_behot_time=current_cursor,
  127. )
  128. if not response:
  129. break
  130. if response["message"] != "success":
  131. break
  132. info_list = response["data"]
  133. has_more = response["has_more"]
  134. current_cursor = response["next"]["max_behot_time"]
  135. if not info_list:
  136. break
  137. max_timestamp_in_this_group = info_list[0]["publish_time"]
  138. if max_timestamp_in_this_group < max_cursor:
  139. break
  140. # do crawler
  141. match media_type:
  142. case "video":
  143. bar_description = "crawler videos"
  144. case "article":
  145. bar_description = "crawler articles"
  146. case _:
  147. raise Exception(f"unknown media type: {media_type}")
  148. crawler_info_list_bar = tqdm(info_list, desc=bar_description)
  149. for info in crawler_info_list_bar:
  150. try:
  151. crawler_info_list_bar.set_postfix({"id": info["id"]})
  152. match media_type:
  153. case "video":
  154. await self.crawler_each_video(info)
  155. case "article":
  156. await self.crawler_each_article(
  157. method="account", article_raw_data=info
  158. )
  159. case _:
  160. raise Exception(f"unknown media type: {media_type}")
  161. except Exception as e:
  162. raise Exception(f"crawler each info failed: {e}")
  163. if has_more:
  164. await asyncio.sleep(self.SLEEP_SECOND)
  165. else:
  166. break
  167. async def crawler_each_account(self, account_name, account_id, media_type, cookie):
  168. """
  169. get toutiao account info
  170. """
  171. new_account_item = {
  172. "account_name": account_name,
  173. "account_id": account_id,
  174. "platform": self.PLATFORM,
  175. "crawler_date": datetime.now().strftime("%Y-%m-%d"),
  176. "media_type": media_type,
  177. }
  178. # get title_list
  179. response = await get_toutiao_account_info_list(
  180. account_id=account_id, cookie=cookie, media_type="article"
  181. )
  182. if not response:
  183. return
  184. article_raw_data = response["data"]
  185. title_list = [i["title"] for i in article_raw_data]
  186. new_account_item["title_list"] = json.dumps(title_list, ensure_ascii=False)
  187. await self.save_item_to_database(
  188. media_type="account", item=new_account_item, trace_id=self.trace_id
  189. )
  190. async def crawler_each_article(self, method, article_raw_data, category=None):
  191. """
  192. crawler each article
  193. """
  194. # 公共字段提取
  195. base_item = {
  196. "platform": self.PLATFORM,
  197. "mode": method,
  198. "out_account_id": article_raw_data["user_info"]["user_id"],
  199. "title": article_raw_data["title"],
  200. "read_cnt": article_raw_data["read_count"],
  201. "like_cnt": article_raw_data["like_count"],
  202. "publish_time": article_raw_data["publish_time"],
  203. "crawler_time": int(time.time()),
  204. }
  205. match method:
  206. case "account":
  207. new_article_item = {
  208. **base_item,
  209. "category": "toutiao_account_association",
  210. "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
  211. "description": article_raw_data["abstract"],
  212. "unique_index": article_raw_data["group_id"],
  213. }
  214. case "recommend":
  215. new_article_item = {
  216. **base_item,
  217. "category": category,
  218. "title": article_raw_data["title"],
  219. "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
  220. "description": article_raw_data["Abstract"],
  221. "unique_index": article_raw_data["item_id"],
  222. }
  223. case _:
  224. raise Exception(f"unknown method: {method}")
  225. await self.save_item_to_database(
  226. media_type="article", item=new_article_item, trace_id=self.trace_id
  227. )
  228. async def crawler_each_video(self, video_raw_data):
  229. pass
  230. async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
  231. """
  232. update account max cursor
  233. """
  234. match media_type:
  235. case "video":
  236. query = f"""
  237. select max(publish_timestamp) as max_cursor
  238. from publish_single_video_source
  239. where out_account_id = %s and platform = %s;
  240. """
  241. table = "video_meta_accounts"
  242. case "article":
  243. query = f"""
  244. select max(publish_time) as max_cursor
  245. from crawler_meta_article
  246. where out_account_id = %s and platform = %s;
  247. """
  248. table = "article_meta_accounts"
  249. case _:
  250. raise Exception(f"unknown media type: {media_type}")
  251. response = await self.pool.async_fetch(
  252. query, params=(account_id, self.PLATFORM)
  253. )
  254. max_publish_timestamp = response[0]["max_cursor"]
  255. if max_publish_timestamp:
  256. query = f"""
  257. update {table}
  258. set max_cursor = %s
  259. where account_id = %s and platform = %s;
  260. """
  261. await self.pool.async_save(
  262. query, (max_publish_timestamp, account_id, self.PLATFORM)
  263. )
  264. # 获取个人主页文章/视频
  265. async def crawler_task(self, media_type: str) -> None:
  266. """
  267. class entrance
  268. """
  269. account_list = await self.get_account_list(media_type=media_type)
  270. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  271. for account in account_list_bar:
  272. account_id = account["account_id"]
  273. max_cursor = account["max_cursor"]
  274. try:
  275. account_list_bar.set_postfix({"account_id": account_id})
  276. await self.crawler_each_account_info_list(
  277. account_id=account_id, media_type=media_type, max_cursor=max_cursor
  278. )
  279. await self.update_account_max_cursor(
  280. media_type=media_type, account_id=account_id
  281. )
  282. await self.log_client.log(
  283. contents={
  284. "trace_id": self.trace_id,
  285. "task": "crawler_toutiao_account_info",
  286. "function": "crawler_task",
  287. "message": f"crawler account: {account_id} successfully, media type: {media_type}",
  288. "status": "success",
  289. }
  290. )
  291. except Exception as e:
  292. await self.log_client.log(
  293. contents={
  294. "trace_id": self.trace_id,
  295. "task": "crawler_toutiao_account_info",
  296. "function": "crawler_task",
  297. "message": f"crawler_account: {account_id} fail",
  298. "status": "fail",
  299. "data": {
  300. "media_type": media_type,
  301. "error": str(e),
  302. "traceback": traceback.format_exc(),
  303. },
  304. }
  305. )
  306. async def crawler_recommend_articles(self, category: str) -> None:
  307. cookie = await self.get_request_params(category=category)
  308. if not cookie:
  309. return
  310. for crawler_time in range(self.RECOMMEND_TIMES):
  311. try:
  312. proxy_url = async_proxy()["url"]
  313. proxy_auth = aiohttp.BasicAuth(
  314. async_proxy()["username"], async_proxy()["password"]
  315. )
  316. async with aiohttp.ClientSession() as session:
  317. async with session.request(
  318. method=cookie["request_method"],
  319. url=cookie["request_url"],
  320. headers=json.loads(cookie["request_headers"]),
  321. proxy=proxy_url,
  322. proxy_auth=proxy_auth,
  323. ) as response:
  324. response.raise_for_status()
  325. response_json = await response.json()
  326. await self.log_client.log(
  327. contents={
  328. "task": "crawler_toutiao",
  329. "function": "crawler_recommend_articles",
  330. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  331. "trace_id": self.trace_id,
  332. "status": "success",
  333. "data": response_json,
  334. }
  335. )
  336. except Exception as e:
  337. await self.log_client.log(
  338. contents={
  339. "task": "crawler_toutiao",
  340. "function": "crawler_recommend_articles",
  341. "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
  342. "status": "fail",
  343. "trace_id": self.trace_id,
  344. "data": {
  345. "error": str(e),
  346. "traceback": traceback.format_exc(),
  347. },
  348. }
  349. )
  350. continue
  351. if not response_json:
  352. continue
  353. article_list = response_json["data"]
  354. for article in article_list:
  355. if article.get("article_url"):
  356. video_flag = article.get("has_video")
  357. if not video_flag:
  358. try:
  359. await self.crawler_each_article(
  360. method="recommend",
  361. article_raw_data=article,
  362. category=category,
  363. )
  364. except Exception as e:
  365. print(f"crawler_recommend_articles error: {e}")
  366. else:
  367. print("this is an video rather than article")
  368. continue
  369. else:
  370. continue
  371. # 抓推荐流
  372. async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
  373. if not category_list:
  374. category_list = ["finance", "tech", "history", "entertainment"]
  375. for category in category_list:
  376. await self.crawler_recommend_articles(category=category)
  377. # 搜索抓账号
  378. async def search_candidate_accounts(self):
  379. top_title_list = await get_top_article_title_list(pool=self.pool)
  380. cookie = await self.apollo_client.get_config_value(
  381. key="toutiao_blogger_cookie", output_type="string"
  382. )
  383. for article in top_title_list:
  384. title = article["title"]
  385. try:
  386. search_response = await search_in_toutiao(keyword=title)
  387. if not search_response:
  388. continue
  389. article_list = search_response["data"]["data"]
  390. for search_article in article_list:
  391. try:
  392. article_url = search_article["article_url"]
  393. account_name = search_article["source"]
  394. if not (
  395. article_url
  396. and account_name
  397. and "toutiao.com" in article_url
  398. ):
  399. continue
  400. article_detail = await get_toutiao_detail(article_url)
  401. if not article_detail:
  402. continue
  403. account_id = (
  404. article_detail.get("data", {})
  405. .get("data", {})
  406. .get("channel_account_id")
  407. )
  408. if account_id:
  409. await self.crawler_each_account(
  410. account_name, account_id, self.ARTICLE_TYPE, cookie
  411. )
  412. await asyncio.sleep(1)
  413. except Exception as e:
  414. await self.log_client.log(
  415. contents={
  416. "task": "crawler_toutiao",
  417. "function": "search_candidate_accounts",
  418. "trace_id": self.trace_id,
  419. "message": "crawler_account fail",
  420. "status": "fail",
  421. "date": {
  422. "error": str(e),
  423. "traceback": traceback.format_exc(),
  424. "article_info": article,
  425. },
  426. }
  427. )
  428. await asyncio.sleep(5)
  429. except Exception as e:
  430. await self.log_client.log(
  431. contents={
  432. "task": "crawler_toutiao",
  433. "function": "search_candidate_accounts",
  434. "trace_id": self.trace_id,
  435. "message": "search_in_toutiao failed",
  436. "status": "fail",
  437. "data": {
  438. "error": str(e),
  439. "traceback": traceback.format_exc(),
  440. },
  441. }
  442. )