crawler_channel_account_videos.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import os
  6. import traceback
  7. from tqdm import tqdm
  8. from applications import log
  9. from applications.db import DatabaseConnector
  10. from applications.utils import download_sph_video
  11. from applications.utils import insert_into_single_video_source_table
  12. from applications.utils import Item
  13. from applications.utils import str_to_md5
  14. from applications.utils import upload_to_oss
  15. from config import long_articles_config
  16. from coldStartTasks.crawler.channels import get_channel_account_videos
  17. NO_SOURCE_ACCOUNT = 0
  18. class CrawlerChannelAccountVideos:
  19. """
  20. crawler channel account videos
  21. """
  22. def __init__(self):
  23. self.db_client = DatabaseConnector(db_config=long_articles_config)
  24. self.db_client.connect()
  25. self.success_crawler_video_count = 0
  26. def whether_video_exists(self, title: str) -> bool:
  27. """
  28. whether video exists, use video_id && title
  29. """
  30. # check title
  31. sql = f"""
  32. select id from publish_single_video_source
  33. where article_title = %s;
  34. """
  35. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  36. if duplicate_id:
  37. print(title + " video exists")
  38. return True
  39. return False
  40. def get_channel_account_list(self):
  41. """
  42. get channel account list from database
  43. """
  44. return
  45. def crawler_each_video(self, video: dict):
  46. """
  47. download each video
  48. save video and decrypt video
  49. upload video to oss
  50. """
  51. object_desc = video["objectDesc"]
  52. title = object_desc["description"]
  53. if self.whether_video_exists(title):
  54. return
  55. video_item = Item()
  56. video_id = video["id"]
  57. video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
  58. video_item.add("url_unique_md5", video_id)
  59. video_item.add("article_title", title)
  60. video_item.add("out_account_id", video["username"])
  61. video_item.add("out_account_name", video["nickname"])
  62. video_item.add("publish_timestamp", video["createtime"])
  63. media = object_desc["media"][0]
  64. url = media["Url"]
  65. decode_key = media["decodeKey"]
  66. url_token = media["urlToken"]
  67. download_url = url + url_token
  68. try:
  69. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  70. oss_path = upload_to_oss(decrypt_path)
  71. video_item.add("video_oss_path", oss_path)
  72. video_item.add("source_account", NO_SOURCE_ACCOUNT)
  73. video_item.check(source="video")
  74. insert_into_single_video_source_table(self.db_client, video_item.item)
  75. os.remove(decrypt_path)
  76. except Exception as e:
  77. log(
  78. task="crawler_channel_account_videos",
  79. function="crawler_each_video",
  80. message="download video failed",
  81. data={
  82. "error": str(e),
  83. "traceback": traceback.format_exc(),
  84. "video_id": video["id"],
  85. },
  86. )
  87. def crawler_each_account(self, channel_account_id: str, channel_account_name: str):
  88. """
  89. get channel account videos
  90. """
  91. response = get_channel_account_videos(channel_account_id)
  92. if response["ret"] == 200:
  93. response_data = response["data"]
  94. last_buffer = response_data["lastBuffer"]
  95. continue_flag = response_data["continueFlag"]
  96. video_list = response_data["object"]
  97. crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
  98. for video in crawl_video_list_bar:
  99. crawl_video_list_bar.set_postfix({"video_id": video["id"]})
  100. self.crawler_each_video(video)
  101. else:
  102. print(f"crawler channel account {channel_account_name} videos failed")
  103. return