crawler_channel_account_videos.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import re
  6. import os
  7. import traceback
  8. import time
  9. from pymysql.cursors import DictCursor
  10. from tqdm import tqdm
  11. from applications import log
  12. from applications.const import ChannelVideoCrawlerConst
  13. from applications.db import DatabaseConnector
  14. from applications.utils import download_sph_video
  15. from applications.utils import insert_into_single_video_source_table
  16. from applications.utils import Item
  17. from applications.utils import str_to_md5
  18. from applications.utils import upload_to_oss
  19. from config import long_articles_config
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. const = ChannelVideoCrawlerConst()
  22. class CrawlerChannelAccountVideos:
  23. """
  24. crawler channel account videos
  25. """
  26. def __init__(self):
  27. self.db_client = DatabaseConnector(db_config=long_articles_config)
  28. self.db_client.connect()
  29. def whether_video_exists(self, title: str) -> bool:
  30. """
  31. whether video exists, use video_id && title
  32. """
  33. # check title
  34. sql = f"""
  35. select id from publish_single_video_source
  36. where article_title = %s;
  37. """
  38. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  39. if duplicate_id:
  40. return True
  41. return False
  42. def get_channel_account_list(self) -> list[dict]:
  43. """
  44. get channel account list from database
  45. """
  46. sql = f"""
  47. select account_id, max_cursor
  48. from sph_account_for_videos
  49. where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS}
  50. order by max_cursor;"""
  51. account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
  52. return account_list
  53. def crawler_each_video(self, video: dict) -> None:
  54. """
  55. download each video
  56. save video and decrypt video
  57. upload video to oss
  58. """
  59. object_desc = video["objectDesc"]
  60. title = object_desc["description"]
  61. if self.whether_video_exists(title):
  62. log(
  63. task="crawler_channel_account_videos",
  64. function="crawler_each_video",
  65. message="video title exists",
  66. data={"video_id": video["id"], "title": title},
  67. )
  68. return
  69. cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title)
  70. if len(cleaned_title) < const.MIN_TITLE_LENGTH:
  71. log(
  72. task="crawler_channel_account_videos",
  73. function="crawler_each_video",
  74. message="video title is too short",
  75. data={"video_id": video["id"], "title": title},
  76. )
  77. return
  78. video_length = video["objectDesc"]["media"][0]["VideoPlayLen"]
  79. if video_length and int(video_length) > const.MAX_VIDEO_LENGTH:
  80. log(
  81. task="crawler_channel_account_videos",
  82. function="crawler_each_video",
  83. message="video length is too long",
  84. data={"video_id": video["id"], "title": title, "length": video_length},
  85. )
  86. return
  87. video_item = Item()
  88. video_id = video["id"]
  89. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  90. video_item.add("url_unique_md5", video_id)
  91. video_item.add("article_title", title)
  92. video_item.add("out_account_id", video["username"])
  93. video_item.add("out_account_name", video["nickname"])
  94. video_item.add("publish_timestamp", video["createtime"])
  95. video_item.add("platform", "sph")
  96. video_item.add("crawler_timestamp", int(time.time()))
  97. media = object_desc["media"][0]
  98. url = media["Url"]
  99. decode_key = media["decodeKey"]
  100. url_token = media["urlToken"]
  101. download_url = url + url_token
  102. try:
  103. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  104. oss_path = upload_to_oss(decrypt_path)
  105. video_item.add("video_oss_path", oss_path)
  106. video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
  107. video_item.check(source="video")
  108. insert_into_single_video_source_table(self.db_client, video_item.item)
  109. os.remove(decrypt_path)
  110. except Exception as e:
  111. log(
  112. task="crawler_channel_account_videos",
  113. function="crawler_each_video",
  114. message="download video failed",
  115. data={
  116. "error": str(e),
  117. "traceback": traceback.format_exc(),
  118. "video_id": video["id"],
  119. },
  120. )
  121. def crawler_each_account(self, channel_account: dict, last_buffer: str = "") -> None:
  122. """
  123. 通过循环替代递归,分页爬取频道账号视频
  124. """
  125. channel_account_id = channel_account["account_id"]
  126. max_cursor = channel_account.get("max_cursor") or const.DEFAULT_CURSOR
  127. current_last_buffer = last_buffer
  128. has_more = True
  129. while has_more:
  130. response = get_channel_account_videos(channel_account_id, last_buffer=current_last_buffer)
  131. if response["ret"] != 200:
  132. log(
  133. task="crawler_channel_account_videos",
  134. function="crawler_each_video",
  135. message="get_channel_account_videos failed",
  136. data={
  137. "response": response,
  138. "channel_account_id": channel_account_id,
  139. "max_cursor": max_cursor,
  140. },
  141. )
  142. break
  143. response_data = response["data"]
  144. response_data_type = type(response_data)
  145. if response_data_type is dict:
  146. current_last_buffer = response_data.get["lastBuffer"] # 更新分页游标
  147. has_more = response_data["continueFlag"] # 是否还有下一页
  148. video_list = response_data["object"]
  149. elif response_data_type is list:
  150. has_more = False
  151. video_list = response_data
  152. video_list = video_list
  153. else:
  154. return
  155. if not video_list:
  156. break
  157. create_timestamp = video_list[0]["createtime"]
  158. if create_timestamp < max_cursor:
  159. break
  160. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  161. for video in crawl_video_list_bar:
  162. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  163. self.crawler_each_video(video)
  164. if has_more:
  165. time.sleep(const.SLEEP_SECOND)
  166. else:
  167. break
  168. def update_account_max_cursor(self, account_id: str) -> None:
  169. """
  170. update account max cursor
  171. """
  172. select_sql = f"""
  173. select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
  174. """
  175. response_mysql = self.db_client.fetch(query=select_sql)
  176. max_publish_timestamp = response_mysql[0][0]
  177. if max_publish_timestamp:
  178. update_sql = f"""
  179. update sph_account_for_videos
  180. set max_cursor = %s
  181. where account_id = %s;
  182. """
  183. self.db_client.save(
  184. query=update_sql, params=(max_publish_timestamp, account_id)
  185. )
  186. def deal(self):
  187. """
  188. deal channel account videos
  189. """
  190. account_list = self.get_channel_account_list()
  191. account_crawler_bar = tqdm(account_list, desc="crawler channel account videos")
  192. for account in account_crawler_bar:
  193. try:
  194. account_crawler_bar.set_postfix({"account_id": account["account_id"]})
  195. self.crawler_each_account(channel_account=account)
  196. self.update_account_max_cursor(account["account_id"])
  197. except Exception as e:
  198. log(
  199. task="crawler_channel_account_videos",
  200. function="deal",
  201. message="crawler channel account videos failed",
  202. data={
  203. "error": str(e),
  204. "traceback": traceback.format_exc(),
  205. "account_id": account["account_id"],
  206. },
  207. )