crawler_toutiao_account_videos.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import time
  6. import traceback
  7. from pymysql.cursors import DictCursor
  8. from tqdm import tqdm
  9. from applications import log
  10. from applications.const import ToutiaoVideoCrawlerConst
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_video_entities_process
  13. from applications.utils import Item
  14. from applications.utils import str_to_md5
  15. from applications.utils import insert_into_single_video_source_table
  16. from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
  17. from config import apolloConfig, long_articles_config
  18. const = ToutiaoVideoCrawlerConst()
  19. config = apolloConfig()
  20. cookie = config.getConfigValue("toutiao_blogger_cookie")
  21. class CrawlerToutiaoAccountVideos:
  22. """
  23. toutiao blogger crawler
  24. """
  25. def __init__(self):
  26. self.db_client = DatabaseConnector(db_config=long_articles_config)
  27. self.db_client.connect()
  28. def get_account_list(self):
  29. """
  30. get account list
  31. """
  32. sql = f"""
  33. select account_id, max_cursor
  34. from video_meta_accounts
  35. where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS};
  36. """
  37. account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
  38. return account_list
  39. def crawler_each_account_video_list(
  40. self, account_id: str, max_cursor: int | None, max_behot_time: int = 0
  41. ):
  42. """
  43. account_id: toutiao account id
  44. max_cursor: crawler latest cursor for each account
  45. max_behot_time: max behot time from toutiao, use to switch to next page
  46. """
  47. has_more = True
  48. current_cursor = max_behot_time
  49. max_cursor = max_cursor or const.DEFAULT_CURSOR
  50. while has_more:
  51. response = get_toutiao_account_video_list(
  52. account_id=account_id, cookie=cookie, max_behot_time=current_cursor
  53. )
  54. if not response:
  55. break
  56. if response["message"] != "success":
  57. log(
  58. task="crawler_toutiao_account_videos",
  59. function="crawler_toutiao_account_videos",
  60. message="get response from toutiao failed",
  61. data={"account_id": account_id, "response": response},
  62. )
  63. break
  64. video_list = response["data"]
  65. has_more = response["has_more"]
  66. current_cursor = response["next"]["max_behot_time"]
  67. if not video_list:
  68. break
  69. max_timestamp_in_this_group = video_list[0]["publish_time"]
  70. if max_timestamp_in_this_group < max_cursor:
  71. break
  72. # do crawler each video
  73. crawler_video_list_bar = tqdm(video_list, desc="crawler videos")
  74. for video in crawler_video_list_bar:
  75. try:
  76. crawler_video_list_bar.set_postfix({"video_id": video["id"]})
  77. self.crawler_each_video(video)
  78. except Exception as e:
  79. log(
  80. task="crawler_toutiao_account_videos",
  81. function="crawler_each_account_video_list",
  82. message="crawler each video failed",
  83. data={
  84. "account_id": account_id,
  85. "video_info": video,
  86. "error": str(e),
  87. "traceback": traceback.format_exc(),
  88. },
  89. )
  90. if has_more:
  91. time.sleep(const.SLEEP_SECOND)
  92. else:
  93. break
  94. def crawler_each_video(self, video_data):
  95. """
  96. crawler each video data
  97. """
  98. video_item = Item()
  99. video_id = video_data["group_id"]
  100. title = video_data["title"]
  101. media = video_data["video"]
  102. url = media["download_addr"]["url_list"][0]
  103. # add info into item
  104. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  105. video_item.add("url_unique_md5", video_id)
  106. video_item.add("article_title", title)
  107. video_item.add("out_account_id", video_data["user"]["user_id"])
  108. video_item.add("out_account_name", video_data["source"])
  109. video_item.add("publish_timestamp", video_data["publish_time"])
  110. video_item.add("platform", const.PLATFORM)
  111. video_item.add("read_cnt", video_data.get("read_count", 0))
  112. video_item.add("article_url", url)
  113. video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
  114. video_item.add("crawler_timestamp", int(time.time()))
  115. # check item before insert
  116. video_item.check(source="video")
  117. try:
  118. item_with_oss_path = scrape_video_entities_process(
  119. video_item=video_item.item, db_client=self.db_client
  120. )
  121. if item_with_oss_path:
  122. insert_into_single_video_source_table(
  123. self.db_client, item_with_oss_path
  124. )
  125. except Exception as e:
  126. log(
  127. task="crawler_toutiao_account_videos",
  128. function="crawler_toutiao_account_videos",
  129. message="etl failed",
  130. data={
  131. "video_item": video_item.item,
  132. "error": str(e),
  133. "traceback": traceback.format_exc(),
  134. }
  135. )
  136. def update_account_max_cursor(self, account_id: str) -> None:
  137. """
  138. update account max cursor
  139. """
  140. select_sql = f"""
  141. select max(publish_timestamp) as max_cursor
  142. from publish_single_video_source
  143. where out_account_id = '{account_id}' and platform = '{const.PLATFORM}';
  144. """
  145. response_mysql = self.db_client.fetch(query=select_sql)
  146. max_publish_timestamp = response_mysql[0][0]
  147. if max_publish_timestamp:
  148. update_sql = f"""
  149. update video_meta_accounts
  150. set max_cursor = %s
  151. where account_id = %s and platform = %s;
  152. """
  153. self.db_client.save(
  154. query=update_sql,
  155. params=(max_publish_timestamp, account_id, const.PLATFORM),
  156. )
  157. def deal(self) -> None:
  158. """
  159. class entrance
  160. """
  161. account_list = self.get_account_list()
  162. account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
  163. for account in account_list_bar:
  164. account_id = account["account_id"]
  165. max_cursor = account["max_cursor"]
  166. try:
  167. # crawl each account
  168. account_list_bar.set_postfix({"account_id": account_id})
  169. self.crawler_each_account_video_list(
  170. account_id=account_id, max_cursor=max_cursor
  171. )
  172. self.update_account_max_cursor(account_id)
  173. except Exception as e:
  174. # add log and bot
  175. log(
  176. task="crawler_toutiao_account_videos",
  177. function="deal",
  178. message=account_id,
  179. data={
  180. "error": str(e),
  181. "traceback": traceback.format_exc(),
  182. },
  183. )