crawler_channel_account_videos.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import os
  6. import traceback
  7. import time
  8. from tqdm import tqdm
  9. from applications import log
  10. from applications.db import DatabaseConnector
  11. from applications.utils import download_sph_video
  12. from applications.utils import insert_into_single_video_source_table
  13. from applications.utils import Item
  14. from applications.utils import str_to_md5
  15. from applications.utils import upload_to_oss
  16. from config import long_articles_config
  17. from coldStartTasks.crawler.channels import get_channel_account_videos
  18. NO_SOURCE_ACCOUNT = 0
  19. class CrawlerChannelAccountVideos:
  20. """
  21. crawler channel account videos
  22. """
  23. def __init__(self):
  24. self.db_client = DatabaseConnector(db_config=long_articles_config)
  25. self.db_client.connect()
  26. self.success_crawler_video_count = 0
  27. def whether_video_exists(self, title: str) -> bool:
  28. """
  29. whether video exists, use video_id && title
  30. """
  31. # check title
  32. sql = f"""
  33. select id from publish_single_video_source
  34. where article_title = %s;
  35. """
  36. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  37. if duplicate_id:
  38. return True
  39. return False
  40. def get_channel_account_list(self):
  41. """
  42. get channel account list from database
  43. """
  44. return
  45. def crawler_each_video(self, video: dict):
  46. """
  47. download each video
  48. save video and decrypt video
  49. upload video to oss
  50. """
  51. object_desc = video["objectDesc"]
  52. title = object_desc["description"]
  53. if self.whether_video_exists(title):
  54. return
  55. video_item = Item()
  56. video_id = video["id"]
  57. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  58. video_item.add("url_unique_md5", video_id)
  59. video_item.add("article_title", title)
  60. video_item.add("out_account_id", video["username"])
  61. video_item.add("out_account_name", video["nickname"])
  62. video_item.add("publish_timestamp", video["createtime"])
  63. media = object_desc["media"][0]
  64. url = media["Url"]
  65. decode_key = media["decodeKey"]
  66. url_token = media["urlToken"]
  67. download_url = url + url_token
  68. try:
  69. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  70. oss_path = upload_to_oss(decrypt_path)
  71. video_item.add("video_oss_path", oss_path)
  72. video_item.add("source_account", NO_SOURCE_ACCOUNT)
  73. video_item.check(source="video")
  74. insert_into_single_video_source_table(self.db_client, video_item.item)
  75. os.remove(decrypt_path)
  76. except Exception as e:
  77. log(
  78. task="crawler_channel_account_videos",
  79. function="crawler_each_video",
  80. message="download video failed",
  81. data={
  82. "error": str(e),
  83. "traceback": traceback.format_exc(),
  84. "video_id": video["id"],
  85. },
  86. )
  87. def crawler_each_account(self, channel_account_id: str, channel_account_name: str, last_buffer: str = ""):
  88. """
  89. get channel account videos
  90. """
  91. response = get_channel_account_videos(channel_account_id, last_buffer=last_buffer)
  92. if response["ret"] == 200:
  93. response_data = response["data"]
  94. last_buffer = response_data["lastBuffer"]
  95. continue_flag = response_data["continueFlag"]
  96. print("last_buffer: ", last_buffer)
  97. video_list = response_data["object"]
  98. create_timestamp = video_list[0]['createtime']
  99. if create_timestamp < 1704038400:
  100. return
  101. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  102. for video in crawl_video_list_bar:
  103. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  104. self.crawler_each_video(video)
  105. if continue_flag:
  106. time.sleep(1)
  107. return self.crawler_each_account(channel_account_id, channel_account_name, last_buffer)
  108. else:
  109. return
  110. else:
  111. print(f"crawler channel account {channel_account_name} videos failed")
  112. return