crawler_channel_account_videos.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. @author: luojunhui
  3. @tool: pycharm && deepseek
  4. """
  5. import json
  6. import time
  7. import traceback
  8. from applications import log
  9. from applications.db import DatabaseConnector
  10. from applications.utils import download_sph_video
  11. from applications.utils import str_to_md5
  12. from applications.utils import upload_to_oss
  13. from config import long_articles_config
  14. from coldStartTasks.crawler.channels import get_channel_account_videos
  15. NO_SOURCE_ACCOUNT = 0
  16. class CrawlerChannelAccountVideos:
  17. """
  18. crawler channel account videos
  19. """
  20. def __init__(self):
  21. self.db_client = DatabaseConnector(db_config=long_articles_config)
  22. self.db_client.connect()
  23. self.success_crawler_video_count = 0
  24. def whether_video_exists(self, title: str) -> bool:
  25. """
  26. whether video exists, use video_id && title
  27. """
  28. # check title
  29. sql = f"""
  30. select id from publish_single_video_source
  31. where article_title = %s;
  32. """
  33. duplicate_id = self.db_client.fetch(query=sql, params=(title,))
  34. if duplicate_id:
  35. print(title + " video exists")
  36. return True
  37. return False
  38. def get_channel_account_list(self):
  39. """
  40. get channel account list from database
  41. """
  42. return
  43. def crawler_each_account(self, channel_account_id: str, channel_account_name: str):
  44. """
  45. get channel account videos
  46. """
  47. response = get_channel_account_videos(channel_account_id)
  48. if response['ret'] == 200:
  49. response_data = response['data']
  50. last_buffer = response_data['lastBuffer']
  51. continue_flag = response_data['continueFlag']
  52. video_list = response_data['object']
  53. for video in video_list:
  54. video_id = str(video['id'])
  55. account_name = video['nickname']
  56. object_desc = video['objectDesc']
  57. publish_timestamp = video['createtime']
  58. title = object_desc['description']
  59. if self.whether_video_exists(title):
  60. continue
  61. media = object_desc['media'][0]
  62. url = media['Url']
  63. decode_key = media['decodeKey']
  64. url_token = media['urlToken']
  65. download_url = url + url_token
  66. try:
  67. decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
  68. oss_path = upload_to_oss(decrypt_path)
  69. insert_sql = f"""
  70. insert into publish_single_video_source
  71. (content_trace_id, article_title, out_account_id, out_account_name, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, platform, source_account)
  72. values
  73. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  74. """
  75. try:
  76. self.db_client.save(
  77. query=insert_sql,
  78. params=(
  79. "video{}".format(str_to_md5(video_id)),
  80. title,
  81. channel_account_id,
  82. account_name,
  83. oss_path,
  84. publish_timestamp,
  85. int(time.time()),
  86. video_id,
  87. "sph",
  88. NO_SOURCE_ACCOUNT
  89. ),
  90. )
  91. self.success_crawler_video_count += 1
  92. except Exception as e:
  93. log(
  94. task="baidu_video_crawler",
  95. function="save_each_video",
  96. message="save video failed",
  97. data={
  98. "error": str(e),
  99. "traceback": traceback.format_exc(),
  100. "video_id": video_id,
  101. "oss_path": oss_path,
  102. },
  103. )
  104. except Exception as e:
  105. print("download video error:", e)
  106. else:
  107. print(f"crawler channel account {channel_account_name} videos failed")
  108. return