crawler_channel_account_videos.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import re
  6. import os
  7. import traceback
  8. import time
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.db import DatabaseConnector
  12. from applications.utils import download_sph_video
  13. from applications.utils import insert_into_single_video_source_table
  14. from applications.utils import Item
  15. from applications.utils import str_to_md5
  16. from applications.utils import upload_to_oss
  17. from config import long_articles_config
  18. from coldStartTasks.crawler.channels import get_channel_account_videos
  19. NO_SOURCE_ACCOUNT = 0
  20. class CrawlerChannelAccountVideos:
  21. """
  22. crawler channel account videos
  23. """
  24. def __init__(self):
  25. self.db_client = DatabaseConnector(db_config=long_articles_config)
  26. self.db_client.connect()
  27. self.success_crawler_video_count = 0
  28. def whether_video_exists(self, title: str) -> bool:
  29. """
  30. whether video exists, use video_id && title
  31. """
  32. # check title
  33. sql = f"""
  34. select id from publish_single_video_source
  35. where article_title = %s;
  36. """
  37. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  38. if duplicate_id:
  39. return True
  40. return False
  41. def get_channel_account_list(self):
  42. """
  43. get channel account list from database
  44. """
  45. return
  46. def crawler_each_video(self, video: dict):
  47. """
  48. download each video
  49. save video and decrypt video
  50. upload video to oss
  51. """
  52. object_desc = video["objectDesc"]
  53. title = object_desc["description"]
  54. if self.whether_video_exists(title):
  55. log(
  56. task="crawler_channel_account_videos",
  57. function="crawler_each_video",
  58. message="video title exists",
  59. data={"video_id": video["id"], "title": title}
  60. )
  61. return
  62. cleaned_title = re.sub(r'[^\u4e00-\u9fff]', '', title)
  63. if len(cleaned_title) < 10:
  64. log(
  65. task="crawler_channel_account_videos",
  66. function="crawler_each_video",
  67. message="video title is too short",
  68. data={"video_id": video["id"], "title": title}
  69. )
  70. return
  71. video_length = video['objectDesc']['media'][0]['VideoPlayLen']
  72. if video_length and int(video_length) > 300:
  73. return
  74. video_item = Item()
  75. video_id = video["id"]
  76. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  77. video_item.add("url_unique_md5", video_id)
  78. video_item.add("article_title", title)
  79. video_item.add("out_account_id", video["username"])
  80. video_item.add("out_account_name", video["nickname"])
  81. video_item.add("publish_timestamp", video["createtime"])
  82. video_item.add("platform", 'sph')
  83. media = object_desc["media"][0]
  84. url = media["Url"]
  85. decode_key = media["decodeKey"]
  86. url_token = media["urlToken"]
  87. download_url = url + url_token
  88. try:
  89. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  90. oss_path = upload_to_oss(decrypt_path)
  91. video_item.add("video_oss_path", oss_path)
  92. video_item.add("source_account", NO_SOURCE_ACCOUNT)
  93. video_item.check(source="video")
  94. insert_into_single_video_source_table(self.db_client, video_item.item)
  95. os.remove(decrypt_path)
  96. except Exception as e:
  97. log(
  98. task="crawler_channel_account_videos",
  99. function="crawler_each_video",
  100. message="download video failed",
  101. data={
  102. "error": str(e),
  103. "traceback": traceback.format_exc(),
  104. "video_id": video["id"],
  105. },
  106. )
  107. def crawler_each_account(self, channel_account_id: str, channel_account_name: str, last_buffer: str = ""):
  108. """
  109. get channel account videos
  110. """
  111. response = get_channel_account_videos(channel_account_id, last_buffer=last_buffer)
  112. if response["ret"] == 200:
  113. response_data = response["data"]
  114. last_buffer = response_data["lastBuffer"]
  115. continue_flag = response_data["continueFlag"]
  116. print("last_buffer: ", last_buffer)
  117. video_list = response_data["object"]
  118. create_timestamp = video_list[0]['createtime']
  119. if create_timestamp < 1704038400:
  120. return
  121. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  122. for video in crawl_video_list_bar:
  123. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  124. self.crawler_each_video(video)
  125. if continue_flag:
  126. time.sleep(1)
  127. return self.crawler_each_account(channel_account_id, channel_account_name, last_buffer)
  128. else:
  129. return
  130. else:
  131. print(f"crawler channel account {channel_account_name} videos failed")
  132. return