video_crawler.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. @author: luojunhui
  3. @description: video crawler
  4. """
  5. import json
  6. import time
  7. from pymysql.cursors import DictCursor
  8. from tqdm import tqdm
  9. from applications import Functions
  10. from applications.db import DatabaseConnector
  11. from applications.exception import SpiderError
  12. from config import long_articles_config
  13. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  14. from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
  15. empty_list = []
  16. functions = Functions()
  17. DEFAULT_CURSOR = 17040384000000 # 最早时间为2024-01-01 00:00:00
  18. class BaiduVideoCrawler(object):
  19. """
  20. baidu video crawler
  21. """
  22. def __init__(self):
  23. self.db = None
  24. def connect_db(self):
  25. """
  26. connect db
  27. """
  28. self.db = DatabaseConnector(db_config=long_articles_config)
  29. self.db.connect()
  30. def get_account_list(self):
  31. """
  32. get account list
  33. status = 1 表示正常抓取的账号
  34. """
  35. sql = f"""
  36. select account_id, account_name, latest_crawler_timestamp as max_cursor
  37. from baidu_account_for_videos
  38. where status = 1;
  39. """
  40. account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
  41. return account_list
  42. def whether_video_exists(self, video_id, title):
  43. """
  44. whether video exists, use video_id && title
  45. """
  46. # first check video_id
  47. sql_1 = f"""
  48. select id from publish_single_video_source
  49. where url_unique_md5 = '{video_id}';
  50. """
  51. count_1 = self.db.fetch(query=sql_1)
  52. if count_1:
  53. print(video_id + " video exists")
  54. return True
  55. # check title
  56. sql_2 = f"""
  57. select id from publish_single_video_source
  58. where article_title = '{title}';
  59. """
  60. count_2 = self.db.fetch(query=sql_2)
  61. if count_2:
  62. print(title + " video exists")
  63. return True
  64. return False
  65. def save_each_video(self, video, account_id, account_name):
  66. """
  67. download and save each video
  68. """
  69. # print(json.dumps(video, ensure_ascii=False, indent=4))
  70. video_id = video['id']
  71. title = video['title']
  72. # judge whether video exists
  73. if self.whether_video_exists(video_id, title):
  74. return
  75. read_cnt = video.get('playcnt', 0)
  76. like_cnt = video.get('like_num', 0)
  77. publish_timestamp = video['publish_time']
  78. # duration = video['duration']
  79. cover_url = video['poster']
  80. video_url = video['playurl']
  81. # sensitive_flag = video.get('sensitive_flag')
  82. video_more_info = video.get('contentcms_intervene_data')
  83. if video_more_info:
  84. video_category_list = video_more_info.get('category_v2')
  85. if video_category_list:
  86. video_category = video_category_list[0]
  87. else:
  88. video_category = None
  89. else:
  90. video_category = None
  91. manual_tags = video.get('manual_tags')
  92. video_path = 'static/{}.mp4'.format(video_id)
  93. download_path = functions.download_baidu_videos(video_url, video_path)
  94. if download_path:
  95. oss_path = functions.upload_to_oss(local_video_path=download_path)
  96. insert_sql = f"""
  97. INSERT INTO publish_single_video_source
  98. (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
  99. values
  100. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  101. """
  102. try:
  103. self.db.save(
  104. query=insert_sql,
  105. params=(
  106. "video{}".format(functions.str_to_md5(video_id)),
  107. title,
  108. account_id,
  109. account_name,
  110. read_cnt,
  111. like_cnt,
  112. video_url,
  113. cover_url,
  114. oss_path,
  115. publish_timestamp,
  116. int(time.time()),
  117. video_id,
  118. video_category,
  119. json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None,
  120. "baidu",
  121. 0
  122. )
  123. )
  124. except Exception as e:
  125. print(e)
  126. else:
  127. print(f"download video failed, video_id: {video_id}")
  128. def save_video_list(self, account_id, account_name, video_list):
  129. """
  130. save video list
  131. """
  132. # print(json.dumps(video_list, ensure_ascii=False, indent=4))
  133. for video_obj in tqdm(video_list, desc="save video list"):
  134. if video_obj['type'] == 'video':
  135. video_id = video_obj['content']['vid']
  136. try:
  137. video_detail = baidu_single_video_crawler(video_id)
  138. self.save_each_video(video_detail, account_id=account_id, account_name=account_name)
  139. except SpiderError as e:
  140. print(e)
  141. continue
  142. else:
  143. continue
  144. def crawler_each_account(self, account, cursor=None):
  145. """
  146. crawler each account
  147. response_strategy
  148. """
  149. account_id = account['account_id']
  150. max_cursor = account['max_cursor']
  151. if not max_cursor:
  152. max_cursor = DEFAULT_CURSOR
  153. account_name = account['account_name']
  154. try:
  155. response_json = baidu_account_video_crawler(account_id, cursor=cursor)
  156. video_list = response_json.get("results", empty_list)
  157. if video_list:
  158. self.save_video_list(
  159. account_id=account_id,
  160. account_name=account_name,
  161. video_list=video_list
  162. )
  163. # check next page
  164. has_next_page = response_json.get("has_more", False)
  165. if has_next_page:
  166. next_cursor = response_json.get("ctime", DEFAULT_CURSOR)
  167. if next_cursor < max_cursor:
  168. print("No more videos after 2024-01-01")
  169. return
  170. else:
  171. return self.crawler_each_account(account, next_cursor)
  172. except SpiderError as e:
  173. print(e)
  174. return
  175. def deal(self):
  176. """
  177. deal
  178. """
  179. account_list = self.get_account_list()
  180. for account in account_list[1:]:
  181. self.crawler_each_account(account)