crawler_channel_account_videos.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import re
  6. import os
  7. import traceback
  8. import time
  9. from pymysql.cursors import DictCursor
  10. from tqdm import tqdm
  11. from applications import log
  12. from applications.const import ChannelVideoCrawlerConst
  13. from applications.db import DatabaseConnector
  14. from applications.utils import download_sph_video
  15. from applications.utils import insert_into_single_video_source_table
  16. from applications.utils import Item
  17. from applications.utils import str_to_md5
  18. from applications.utils import upload_to_oss
  19. from config import long_articles_config
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. const = ChannelVideoCrawlerConst()
  22. class CrawlerChannelAccountVideos:
  23. """
  24. crawler channel account videos
  25. """
  26. def __init__(self):
  27. self.db_client = DatabaseConnector(db_config=long_articles_config)
  28. self.db_client.connect()
  29. def whether_video_exists(self, title: str) -> bool:
  30. """
  31. whether video exists, use video_id && title
  32. """
  33. # check title
  34. sql = f"""
  35. select id from publish_single_video_source
  36. where article_title = %s;
  37. """
  38. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  39. if duplicate_id:
  40. return True
  41. return False
  42. def get_channel_account_list(self):
  43. """
  44. get channel account list from database
  45. """
  46. sql = f"""select account_id, max_cursor from sph_account_for_videos where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS};"""
  47. account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
  48. return account_list
  49. def crawler_each_video(self, video: dict):
  50. """
  51. download each video
  52. save video and decrypt video
  53. upload video to oss
  54. """
  55. object_desc = video["objectDesc"]
  56. title = object_desc["description"]
  57. if self.whether_video_exists(title):
  58. log(
  59. task="crawler_channel_account_videos",
  60. function="crawler_each_video",
  61. message="video title exists",
  62. data={"video_id": video["id"], "title": title},
  63. )
  64. return
  65. cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title)
  66. if len(cleaned_title) < const.MIN_TITLE_LENGTH:
  67. log(
  68. task="crawler_channel_account_videos",
  69. function="crawler_each_video",
  70. message="video title is too short",
  71. data={"video_id": video["id"], "title": title},
  72. )
  73. return
  74. video_length = video["objectDesc"]["media"][0]["VideoPlayLen"]
  75. if video_length and int(video_length) > const.MAX_VIDEO_LENGTH:
  76. log(
  77. task="crawler_channel_account_videos",
  78. function="crawler_each_video",
  79. message="video length is too long",
  80. data={"video_id": video["id"], "title": title},
  81. )
  82. return
  83. video_item = Item()
  84. video_id = video["id"]
  85. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  86. video_item.add("url_unique_md5", video_id)
  87. video_item.add("article_title", title)
  88. video_item.add("out_account_id", video["username"])
  89. video_item.add("out_account_name", video["nickname"])
  90. video_item.add("publish_timestamp", video["createtime"])
  91. video_item.add("platform", "sph")
  92. media = object_desc["media"][0]
  93. url = media["Url"]
  94. decode_key = media["decodeKey"]
  95. url_token = media["urlToken"]
  96. download_url = url + url_token
  97. try:
  98. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  99. oss_path = upload_to_oss(decrypt_path)
  100. video_item.add("video_oss_path", oss_path)
  101. video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
  102. video_item.check(source="video")
  103. insert_into_single_video_source_table(self.db_client, video_item.item)
  104. os.remove(decrypt_path)
  105. except Exception as e:
  106. log(
  107. task="crawler_channel_account_videos",
  108. function="crawler_each_video",
  109. message="download video failed",
  110. data={
  111. "error": str(e),
  112. "traceback": traceback.format_exc(),
  113. "video_id": video["id"],
  114. },
  115. )
  116. def crawler_each_account(self, channel_account: dict, last_buffer: str = ""):
  117. """
  118. get channel account videos
  119. """
  120. channel_account_id = channel_account["account_id"]
  121. max_cursor = channel_account["max_cursor"]
  122. if not max_cursor:
  123. max_cursor = const.DEFAULT_CURSOR
  124. response = get_channel_account_videos(
  125. channel_account_id, last_buffer=last_buffer
  126. )
  127. if response["ret"] == 200:
  128. response_data = response["data"]
  129. last_buffer = response_data["lastBuffer"]
  130. continue_flag = response_data["continueFlag"]
  131. video_list = response_data["object"]
  132. create_timestamp = video_list[0]["createtime"]
  133. if create_timestamp < max_cursor:
  134. return
  135. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  136. for video in crawl_video_list_bar:
  137. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  138. self.crawler_each_video(video)
  139. if continue_flag:
  140. time.sleep(const.SLEEP_SECOND)
  141. return self.crawler_each_account(channel_account_id, last_buffer)
  142. else:
  143. return
  144. else:
  145. log(
  146. task="crawler_channel_account_videos",
  147. function="crawler_each_video",
  148. message="get_channel_account_videos failed",
  149. data={
  150. "response": response,
  151. "channel_account_id": channel_account_id,
  152. "max_cursor": max_cursor,
  153. },
  154. )
  155. return
  156. def update_account_max_cursor(self, account_id):
  157. """
  158. update account max cursor
  159. """
  160. select_sql = f"""
  161. select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
  162. """
  163. response_mysql = self.db_client.fetch(query=select_sql)
  164. max_publish_timestamp = response_mysql[0][0]
  165. if max_publish_timestamp:
  166. update_sql = f"""
  167. update sph_account_for_videos
  168. set max_cursor = %s
  169. where account_id = %s;
  170. """
  171. self.db_client.save(
  172. query=update_sql, params=(max_publish_timestamp, account_id)
  173. )
  174. def deal(self):
  175. """
  176. deal channel account videos
  177. """
  178. account_list = self.get_channel_account_list()
  179. account_crawler_bar = tqdm(account_list, desc="crawler channel account videos")
  180. for account in account_crawler_bar:
  181. try:
  182. account_crawler_bar.set_postfix({"account_id": account["account_id"]})
  183. self.crawler_each_account(channel_account=account)
  184. self.update_account_max_cursor(account["account_id"])
  185. except Exception as e:
  186. log(
  187. task="crawler_channel_account_videos",
  188. function="deal",
  189. message="crawler channel account videos failed",
  190. data={
  191. "error": str(e),
  192. "traceback": traceback.format_exc(),
  193. "account_id": account["account_id"],
  194. },
  195. )