video_crawler.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. @author: luojunhui
  3. @description: video crawler
  4. """
  5. import json
  6. import time
  7. import traceback
  8. from typing import List, Dict
  9. from pymysql.cursors import DictCursor
  10. from tqdm import tqdm
  11. from applications import Functions
  12. from applications import bot, log
  13. from applications.const import BaiduVideoCrawlerConst
  14. from applications.db import DatabaseConnector
  15. from applications.exception import SpiderError
  16. from config import long_articles_config
  17. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  18. from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
  19. const = BaiduVideoCrawlerConst()
  20. empty_list = []
  21. functions = Functions()
  22. class BaiduVideoCrawler(object):
  23. """
  24. baidu video crawler
  25. """
  26. def __init__(self):
  27. self.db = None
  28. self.success_crawler_video_count = 0
  29. def connect_db(self) -> None:
  30. """
  31. connect db
  32. """
  33. self.db = DatabaseConnector(db_config=long_articles_config)
  34. self.db.connect()
  35. def get_account_list(self) -> List[Dict]:
  36. """
  37. get account list
  38. """
  39. sql = f"""
  40. select account_id, account_name, max_cursor
  41. from baidu_account_for_videos
  42. where status = {const.BAIDU_ACCOUNT_GOOD_STATUS};
  43. """
  44. account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
  45. return account_list
  46. def whether_video_exists(self, title: str) -> bool:
  47. """
  48. whether video exists, use video_id && title
  49. """
  50. # check title
  51. sql_2 = f"""
  52. select id from publish_single_video_source
  53. where article_title = '{title}';
  54. """
  55. duplicate_id = self.db.fetch(query=sql_2)
  56. if duplicate_id:
  57. print(title + " video exists")
  58. return True
  59. return False
  60. def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None:
  61. """
  62. download and save each video
  63. """
  64. # print(json.dumps(video, ensure_ascii=False, indent=4))
  65. video_id = video['id']
  66. title = video['title']
  67. # judge whether video exists
  68. if self.whether_video_exists(title):
  69. return
  70. read_cnt = video.get('playcnt', 0)
  71. like_cnt = video.get('like_num', 0)
  72. publish_timestamp = video['publish_time']
  73. # duration = video['duration']
  74. cover_url = video['poster']
  75. video_url = video['playurl']
  76. # sensitive_flag = video.get('sensitive_flag')
  77. video_more_info = video.get('contentcms_intervene_data')
  78. if video_more_info:
  79. video_category_list = video_more_info.get('category_v2')
  80. if video_category_list:
  81. video_category = video_category_list[0]
  82. else:
  83. video_category = None
  84. else:
  85. video_category = None
  86. manual_tags = video.get('manual_tags')
  87. video_path = 'static/{}.mp4'.format(video_id)
  88. download_path = functions.download_baidu_videos(video_url, video_path)
  89. if download_path:
  90. oss_path = functions.upload_to_oss(local_video_path=download_path)
  91. insert_sql = f"""
  92. INSERT INTO publish_single_video_source
  93. (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
  94. values
  95. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  96. """
  97. try:
  98. self.db.save(
  99. query=insert_sql,
  100. params=(
  101. "video{}".format(functions.str_to_md5(video_id)),
  102. title,
  103. account_id,
  104. account_name,
  105. read_cnt,
  106. like_cnt,
  107. video_url,
  108. cover_url,
  109. oss_path,
  110. publish_timestamp,
  111. int(time.time()),
  112. video_id,
  113. video_category,
  114. json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None,
  115. "baidu",
  116. const.NO_SOURCE_ACCOUNT_STATUS
  117. )
  118. )
  119. self.success_crawler_video_count += 1
  120. except Exception as e:
  121. print(e)
  122. else:
  123. print(f"download video failed, video_id: {video_id}")
  124. def save_video_list(self, account_id: str, account_name: str, video_list: List[Dict]) -> None:
  125. """
  126. save video list
  127. """
  128. # print(json.dumps(video_list, ensure_ascii=False, indent=4))
  129. for video_obj in tqdm(video_list, desc="save video list"):
  130. if video_obj['type'] == 'video':
  131. video_id = video_obj['content']['vid']
  132. try:
  133. video_detail = baidu_single_video_crawler(video_id)
  134. self.save_each_video(video_detail, account_id=account_id, account_name=account_name)
  135. except SpiderError as e:
  136. print("save single video fail", e)
  137. continue
  138. else:
  139. continue
  140. def crawler_each_account(self, account: Dict, cursor=None) -> None:
  141. """
  142. crawler each account
  143. response_strategy
  144. """
  145. account_id = account['account_id']
  146. max_cursor = account['max_cursor']
  147. if not max_cursor:
  148. max_cursor = const.DEFAULT_CURSOR
  149. account_name = account['account_name']
  150. try:
  151. response_json = baidu_account_video_crawler(account_id, cursor=cursor)
  152. video_list = response_json.get("results", empty_list)
  153. if video_list:
  154. self.save_video_list(
  155. account_id=account_id,
  156. account_name=account_name,
  157. video_list=video_list
  158. )
  159. # check next page
  160. has_next_page = response_json.get("has_more", False)
  161. if has_next_page:
  162. next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR)
  163. if next_cursor < max_cursor:
  164. print("No more videos after 2024-01-01")
  165. return
  166. else:
  167. return self.crawler_each_account(account, next_cursor)
  168. except SpiderError as e:
  169. print(e)
  170. return
  171. def update_cursor(self, account_id: str) -> None:
  172. """
  173. update cursor for each account
  174. """
  175. select_sql = f"""
  176. select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
  177. """
  178. response_mysql = self.db.fetch(query=select_sql)
  179. max_publish_timestamp = response_mysql[0][0]
  180. if max_publish_timestamp:
  181. max_cursor = max_publish_timestamp * 10000
  182. update_sql = f"""
  183. update baidu_account_for_videos
  184. set max_cursor = %s
  185. where account_id = %s;
  186. """
  187. self.db.save(
  188. query=update_sql,
  189. params=(max_cursor, account_id)
  190. )
  191. def deal(self) -> None:
  192. """
  193. deal
  194. """
  195. account_list = self.get_account_list()
  196. success_cnt = 0
  197. fail_cnt = 0
  198. for account in account_list:
  199. try:
  200. self.crawler_each_account(account)
  201. self.update_cursor(account['account_id'])
  202. success_cnt += 1
  203. except Exception as e:
  204. fail_cnt += 1
  205. log(
  206. task="baidu_video_crawler",
  207. function="deal",
  208. message="crawler each account failed",
  209. data={
  210. "account_id": account['account_id'],
  211. "account_name": account['account_name'],
  212. "error": str(e),
  213. "trace_back": traceback.format_exc()
  214. }
  215. )
  216. bot(
  217. title="baidu video crawler task finished",
  218. detail={
  219. "success_crawl_account_num": success_cnt,
  220. "fail_crawl_account_num": fail_cnt,
  221. "success_crawl_video_num": self.success_crawler_video_count,
  222. "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt)
  223. },
  224. metion=False
  225. )