crawler_channel_account_videos.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import os
  6. import traceback
  7. from tqdm import tqdm
  8. from applications import log
  9. from applications.db import DatabaseConnector
  10. from applications.utils import download_sph_video
  11. from applications.utils import insert_into_single_video_source_table
  12. from applications.utils import Item
  13. from applications.utils import str_to_md5
  14. from applications.utils import upload_to_oss
  15. from config import long_articles_config
  16. from coldStartTasks.crawler.channels import get_channel_account_videos
  17. NO_SOURCE_ACCOUNT = 0
  18. class CrawlerChannelAccountVideos:
  19. """
  20. crawler channel account videos
  21. """
  22. def __init__(self):
  23. self.db_client = DatabaseConnector(db_config=long_articles_config)
  24. self.db_client.connect()
  25. self.success_crawler_video_count = 0
  26. def whether_video_exists(self, title: str) -> bool:
  27. """
  28. whether video exists, use video_id && title
  29. """
  30. # check title
  31. sql = f"""
  32. select id from publish_single_video_source
  33. where article_title = %s;
  34. """
  35. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  36. if duplicate_id:
  37. return True
  38. return False
  39. def get_channel_account_list(self):
  40. """
  41. get channel account list from database
  42. """
  43. return
  44. def crawler_each_video(self, video: dict):
  45. """
  46. download each video
  47. save video and decrypt video
  48. upload video to oss
  49. """
  50. object_desc = video["objectDesc"]
  51. title = object_desc["description"]
  52. if self.whether_video_exists(title):
  53. return
  54. video_item = Item()
  55. video_id = video["id"]
  56. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  57. video_item.add("url_unique_md5", video_id)
  58. video_item.add("article_title", title)
  59. video_item.add("out_account_id", video["username"])
  60. video_item.add("out_account_name", video["nickname"])
  61. video_item.add("publish_timestamp", video["createtime"])
  62. media = object_desc["media"][0]
  63. url = media["Url"]
  64. decode_key = media["decodeKey"]
  65. url_token = media["urlToken"]
  66. download_url = url + url_token
  67. try:
  68. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  69. oss_path = upload_to_oss(decrypt_path)
  70. video_item.add("video_oss_path", oss_path)
  71. video_item.add("source_account", NO_SOURCE_ACCOUNT)
  72. video_item.check(source="video")
  73. insert_into_single_video_source_table(self.db_client, video_item.item)
  74. os.remove(decrypt_path)
  75. except Exception as e:
  76. log(
  77. task="crawler_channel_account_videos",
  78. function="crawler_each_video",
  79. message="download video failed",
  80. data={
  81. "error": str(e),
  82. "traceback": traceback.format_exc(),
  83. "video_id": video["id"],
  84. },
  85. )
  86. def crawler_each_account(self, channel_account_id: str, channel_account_name: str):
  87. """
  88. get channel account videos
  89. """
  90. response = get_channel_account_videos(channel_account_id)
  91. if response["ret"] == 200:
  92. response_data = response["data"]
  93. last_buffer = response_data["lastBuffer"]
  94. continue_flag = response_data["continueFlag"]
  95. video_list = response_data["object"]
  96. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  97. for video in crawl_video_list_bar:
  98. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  99. self.crawler_each_video(video)
  100. else:
  101. print(f"crawler channel account {channel_account_name} videos failed")
  102. return