video_crawler.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. """
  2. @author: luojunhui
  3. @description: video crawler
  4. """
  5. import os
  6. import json
  7. import time
  8. import traceback
  9. from typing import List, Dict
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications import Functions
  13. from applications import bot, log
  14. from applications.const import BaiduVideoCrawlerConst
  15. from applications.db import DatabaseConnector
  16. from applications.exception import SpiderError
  17. from config import long_articles_config
  18. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  19. from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
  20. const = BaiduVideoCrawlerConst()
  21. empty_list = []
  22. functions = Functions()
  23. class BaiduVideoCrawler(object):
  24. """
  25. baidu video crawler
  26. """
  27. def __init__(self):
  28. self.db = None
  29. self.success_crawler_video_count = 0
  30. self.connect_db()
  31. def connect_db(self) -> None:
  32. """
  33. connect db
  34. """
  35. self.db = DatabaseConnector(db_config=long_articles_config)
  36. self.db.connect()
  37. def get_account_list(self) -> List[Dict]:
  38. """
  39. get account list
  40. """
  41. sql = f"""
  42. select account_id, account_name, max_cursor
  43. from baidu_account_for_videos
  44. where status = {const.BAIDU_ACCOUNT_GOOD_STATUS};
  45. """
  46. account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
  47. return account_list
  48. def whether_video_exists(self, title: str) -> bool:
  49. """
  50. whether video exists, use video_id && title
  51. """
  52. # check title
  53. sql = f"""
  54. select id from publish_single_video_source
  55. where article_title = %s;
  56. """
  57. duplicate_id = self.db.fetch(query=sql, params=(title,))
  58. if duplicate_id:
  59. print(title + " video exists")
  60. return True
  61. return False
  62. def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None:
  63. """
  64. download and save each video
  65. """
  66. # print(json.dumps(video, ensure_ascii=False, indent=4))
  67. video_id = video["id"]
  68. title = video["title"]
  69. # judge whether video exists
  70. if self.whether_video_exists(title):
  71. return
  72. read_cnt = video.get("playcnt", 0)
  73. like_cnt = video.get("like_num", 0)
  74. publish_timestamp = video["publish_time"]
  75. # duration = video['duration']
  76. cover_url = video["poster"]
  77. video_url = video["playurl"]
  78. # sensitive_flag = video.get('sensitive_flag')
  79. video_more_info = video.get("contentcms_intervene_data")
  80. if video_more_info:
  81. video_category_list = video_more_info.get("category_v2")
  82. if video_category_list:
  83. video_category = video_category_list[0]
  84. else:
  85. video_category = None
  86. else:
  87. video_category = None
  88. manual_tags = video.get("manual_tags")
  89. video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id))
  90. download_path = functions.download_baidu_videos(video_url, video_path)
  91. if download_path:
  92. oss_path = functions.upload_to_oss(local_video_path=download_path)
  93. insert_sql = f"""
  94. INSERT INTO publish_single_video_source
  95. (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
  96. values
  97. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  98. """
  99. try:
  100. self.db.save(
  101. query=insert_sql,
  102. params=(
  103. "video{}".format(functions.str_to_md5(video_id)),
  104. title,
  105. account_id,
  106. account_name,
  107. read_cnt,
  108. like_cnt,
  109. video_url,
  110. cover_url,
  111. oss_path,
  112. publish_timestamp,
  113. int(time.time()),
  114. video_id,
  115. video_category,
  116. (
  117. json.dumps(manual_tags, ensure_ascii=False)
  118. if manual_tags
  119. else None
  120. ),
  121. "hksp",
  122. const.NO_SOURCE_ACCOUNT_STATUS,
  123. ),
  124. )
  125. self.success_crawler_video_count += 1
  126. except Exception as e:
  127. log(
  128. task="baidu_video_crawler",
  129. function="save_each_video",
  130. message="save video failed",
  131. data={
  132. "error": str(e),
  133. "traceback": traceback.format_exc(),
  134. "video_id": video_id,
  135. "oss_path": oss_path,
  136. },
  137. )
  138. else:
  139. print(f"download video failed, video_id: {video_id}")
  140. def save_video_list(
  141. self, account_id: str, account_name: str, video_list: List[Dict]
  142. ) -> None:
  143. """
  144. save video list
  145. """
  146. progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name))
  147. for video_obj in progress_bar:
  148. if video_obj["type"] == "video":
  149. video_id = video_obj["content"]["vid"]
  150. try:
  151. video_detail = baidu_single_video_crawler(video_id)
  152. self.save_each_video(
  153. video=video_detail,
  154. account_id=account_id,
  155. account_name=account_name,
  156. )
  157. progress_bar.set_postfix({"videoId": video_id})
  158. except SpiderError as e:
  159. print("save single video fail", e)
  160. continue
  161. else:
  162. continue
  163. def crawler_each_account(self, account: Dict, cursor=None) -> None:
  164. """
  165. crawler each account
  166. response_strategy
  167. """
  168. account_id = account["account_id"]
  169. max_cursor = account["max_cursor"]
  170. if not max_cursor:
  171. max_cursor = const.DEFAULT_CURSOR
  172. account_name = account["account_name"]
  173. try:
  174. response_json = baidu_account_video_crawler(account_id, cursor=cursor)
  175. video_list = response_json.get("results", empty_list)
  176. if video_list:
  177. self.save_video_list(
  178. account_id=account_id,
  179. account_name=account_name,
  180. video_list=video_list,
  181. )
  182. # check next page
  183. has_next_page = response_json.get("has_more", False)
  184. if has_next_page:
  185. next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR)
  186. if next_cursor < max_cursor:
  187. print("No more videos after 2024-01-01")
  188. return
  189. else:
  190. return self.crawler_each_account(account, next_cursor)
  191. except SpiderError as e:
  192. print(e)
  193. return
  194. def update_cursor(self, account_id: str) -> None:
  195. """
  196. update cursor for each account
  197. """
  198. select_sql = f"""
  199. select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
  200. """
  201. response_mysql = self.db.fetch(query=select_sql)
  202. max_publish_timestamp = response_mysql[0][0]
  203. if max_publish_timestamp:
  204. max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR
  205. update_sql = f"""
  206. update baidu_account_for_videos
  207. set max_cursor = %s
  208. where account_id = %s;
  209. """
  210. self.db.save(query=update_sql, params=(max_cursor, account_id))
  211. def deal(self) -> None:
  212. """
  213. deal
  214. """
  215. account_list = self.get_account_list()
  216. success_cnt = 0
  217. fail_cnt = 0
  218. account_list_process_bar = tqdm(account_list, desc="process account list")
  219. for account in account_list_process_bar:
  220. try:
  221. account_list_process_bar.set_postfix(
  222. {"account_name": account["account_name"]}
  223. )
  224. self.crawler_each_account(account)
  225. self.update_cursor(account["account_id"])
  226. success_cnt += 1
  227. except Exception as e:
  228. fail_cnt += 1
  229. log(
  230. task="baidu_video_crawler",
  231. function="deal",
  232. message="crawler each account failed",
  233. data={
  234. "account_id": account["account_id"],
  235. "account_name": account["account_name"],
  236. "error": str(e),
  237. "trace_back": traceback.format_exc(),
  238. },
  239. )
  240. bot(
  241. title="baidu video crawler task finished",
  242. detail={
  243. "success_crawl_account_num": success_cnt,
  244. "fail_crawl_account_num": fail_cnt,
  245. "success_crawl_video_num": self.success_crawler_video_count,
  246. "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt),
  247. },
  248. mention=False,
  249. )