universal_crawler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. import os
  2. import sys
  3. import json
  4. import random
  5. import time
  6. import uuid
  7. import yaml
  8. import requests
  9. import cv2
  10. from datetime import datetime
  11. from typing import Dict, Any, List, Optional, Union
  12. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
  13. from utils.extractors import safe_extract, extract_multiple
  14. # 添加公共模块路径
  15. sys.path.append(os.getcwd())
  16. print(os.getcwd())
  17. from application.items import VideoItem
  18. from application.pipeline import PiaoQuanPipeline
  19. from application.common.messageQueue import MQ
  20. from application.common.log import AliyunLogger
  21. # from application.common.mysql import MysqlHelper
  22. from configs.messages import MESSAGES
  23. from configs import codes
  24. from utils.config_loader import ConfigLoader
  25. from application.common.log import Local
  26. from configs.config import base_url
  27. def before_send_log(retry_state: RetryCallState) -> None:
  28. """请求重试前记录日志"""
  29. attempt = retry_state.attempt_number
  30. last_result = retry_state.outcome
  31. if last_result.failed:
  32. exc = last_result.exception()
  33. logger = retry_state.kwargs.get('logger')
  34. url = retry_state.args[0] if retry_state.args else "unknown"
  35. if logger:
  36. logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
  37. class UniversalCrawler:
  38. """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
  39. def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  40. """
  41. 初始化爬虫
  42. :param platform_config: 平台配置字典
  43. :param rule_dict: 规则字典
  44. :param user_list: 用户列表
  45. :param trace_id: 追踪ID
  46. :param env: 运行环境
  47. """
  48. self.platform = platform_config["platform"]
  49. self.mode = platform_config["mode"]
  50. self.rule_dict = rule_dict
  51. self.user_list = user_list
  52. self.trace_id = trace_id
  53. self.env = env
  54. self.config = platform_config
  55. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  56. self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
  57. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  58. self.has_enough_videos = False
  59. self.base_api = base_url
  60. self.loop_times = platform_config.get('loop_times', 1)
  61. # 提取常用配置为类属性,提高可读性
  62. self.request_method = self.config["method"].upper()
  63. self.request_url = self.config["url"]
  64. self.request_headers = self.config.get("headers", {})
  65. self.request_body = self.config.get("request_body", {})
  66. self.response_data_path = self.config["response_parse"]["data_path"]
  67. self.video_fields_map = self.config["response_parse"]["fields"]
  68. # 下载限制配置
  69. self.download_min_limit = self.config.get("download_limit", {}).get("min", 200)
  70. @retry(
  71. stop=stop_after_attempt(3), # 最多重试3次
  72. wait=wait_fixed(2), # 每次重试间隔2秒
  73. retry=retry_if_exception_type((requests.RequestException, ValueError)),
  74. before=before_send_log, # 添加重试前日志
  75. )
  76. def _send_request(self, url: str, method: str = None, headers: Dict = None,
  77. payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
  78. """
  79. 发送API请求,失败自动重试最多3次
  80. :param url: 请求URL
  81. :param method: 请求方法,默认使用配置中的方法
  82. :param headers: 请求头,默认使用配置中的头
  83. :param payload: 请求体,默认使用配置中的体
  84. :param timeout: 超时时间
  85. :return: 响应JSON数据或None
  86. """
  87. # 使用默认配置(如果未提供参数)
  88. method = method or self.request_method
  89. headers = headers or self.request_headers
  90. payload = payload or self.request_body
  91. try:
  92. self.logger.info(f"{self.trace_id}--正在发送请求: {url}")
  93. response = requests.request(
  94. method=method,
  95. url=url,
  96. headers=headers,
  97. json=payload,
  98. timeout=timeout
  99. )
  100. response.raise_for_status()
  101. resp = response.json()
  102. if resp.get("code") == 0:
  103. return resp
  104. self.logger.warning(f"{self.trace_id}--API响应非零状态码: {resp}")
  105. raise ValueError(f"API响应错误: {resp}")
  106. except requests.exceptions.Timeout:
  107. self.logger.error(f"{self.trace_id}--请求超时: {url}")
  108. raise
  109. except requests.exceptions.RequestException as e:
  110. self.logger.error(f"{self.trace_id}--请求异常: {e}")
  111. raise
  112. except json.JSONDecodeError as e:
  113. self.logger.error(f"{self.trace_id}--解析JSON响应失败: {e}")
  114. raise
  115. except Exception as e:
  116. # 在最后一次失败时记录详细日志
  117. self.aliyun_log.logging(
  118. code="3000",
  119. message=f"请求失败: {url}",
  120. data={"error": str(e)},
  121. trace_id=self.trace_id
  122. )
  123. self.logger.error(f"{self.trace_id}--意外错误: {e}")
  124. raise
  125. def get_video_duration(self, video_url: str, timeout: int = 20) -> float:
  126. """
  127. 获取网络视频的时长(秒),增加网络异常处理和超时控制
  128. :param video_url: 视频URL
  129. :param timeout: 超时时间
  130. :return: 视频时长(秒),失败时返回0
  131. """
  132. # 检查URL是否可访问
  133. try:
  134. response = requests.head(video_url, timeout=timeout)
  135. response.raise_for_status() # 检查HTTP状态码
  136. except requests.exceptions.RequestException as e:
  137. self.logger.error(f"{self.trace_id}--网络错误: 无法访问视频URL - {e}")
  138. return 0
  139. cap = None
  140. try:
  141. # 创建VideoCapture对象
  142. cap = cv2.VideoCapture(video_url)
  143. # 设置缓冲区大小,减少延迟
  144. cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
  145. # 尝试打开视频流(最多尝试3次)
  146. max_attempts = 3
  147. for attempt in range(max_attempts):
  148. if cap.isOpened():
  149. break
  150. self.logger.info(f"{self.trace_id}--尝试打开视频流 ({attempt + 1}/{max_attempts})...")
  151. time.sleep(1)
  152. if not cap.isOpened():
  153. self.logger.error(f"{self.trace_id}--错误: 无法打开视频流 {video_url}")
  154. return 0
  155. # 获取视频属性
  156. fps = cap.get(cv2.CAP_PROP_FPS)
  157. frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
  158. if fps <= 0 or frame_count <= 0:
  159. # 某些网络视频可能无法直接获取总帧数,尝试读取几帧估算
  160. self.logger.info(f"{self.trace_id}--无法获取总帧数,尝试估算...")
  161. frame_count = 0
  162. start_time = time.time()
  163. # 读取10帧估算帧率和时长
  164. for _ in range(10):
  165. ret, frame = cap.read()
  166. if not ret:
  167. break
  168. frame_count += 1
  169. elapsed_time = time.time() - start_time
  170. if elapsed_time > 0:
  171. estimated_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
  172. # 假设视频时长为30秒(可根据实际情况调整)
  173. estimated_duration = 30.0
  174. self.logger.info(f"{self.trace_id}--估算视频时长: {estimated_duration}秒 (基于{frame_count}帧)")
  175. return estimated_duration
  176. else:
  177. self.logger.error(f"{self.trace_id}--错误: 无法估算视频时长")
  178. return 0
  179. duration = frame_count / fps
  180. self.logger.info(f"{self.trace_id}--视频时长: {duration}秒")
  181. return duration
  182. except Exception as e:
  183. self.logger.error(f"{self.trace_id}--获取视频时长时发生异常: {e}")
  184. return 0
  185. finally:
  186. if cap:
  187. cap.release() # 确保资源释放
  188. def _extract_video_list(self, response: Dict) -> List[Dict]:
  189. """从API响应中提取视频列表"""
  190. return safe_extract(response, self.response_data_path) or []
  191. def _is_video_valid(self, video_data: Dict) -> bool:
  192. """
  193. 判断视频是否满足条件
  194. :param video_data: 视频数据
  195. :return: True if valid, False otherwise
  196. """
  197. if not self.rule_dict:
  198. return True
  199. # 检查视频时长规则
  200. rule_duration = self.rule_dict.get("duration")
  201. if rule_duration:
  202. extract_video_url_path = self.video_fields_map.get("video_url")
  203. if not extract_video_url_path:
  204. self.logger.warning(f"{self.trace_id}--缺少视频URL字段映射")
  205. return False
  206. video_url = safe_extract(video_data, extract_video_url_path)
  207. if not video_url:
  208. self.logger.warning(f"{self.trace_id}--无法提取视频URL")
  209. return False
  210. video_duration = self.get_video_duration(video_url)
  211. min_duration = rule_duration.get("min", 0)
  212. max_duration = rule_duration.get("max", float('inf'))
  213. if not (min_duration <= video_duration <= max_duration):
  214. self.logger.info(f"{self.trace_id}--视频时长{video_duration}秒超出范围[{min_duration}, {max_duration}]")
  215. return False
  216. # 检查视频数量规则
  217. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  218. if rule_videos_cnt:
  219. # 这里应该查询数据库获取实际视频数量
  220. # 示例代码,实际实现需要根据业务逻辑完善
  221. video_count = self._get_video_count_from_db() # 假设这是获取视频数量的方法
  222. min_count = rule_videos_cnt.get("min", 0)
  223. if video_count >= min_count:
  224. self.logger.info(f"{self.trace_id}--视频数量{video_count}达到最小要求{min_count}")
  225. return False
  226. return True
  227. def _get_video_count_from_db(self) -> int:
  228. """从数据库获取视频数量(示例方法,需根据实际业务实现)"""
  229. # 实际实现中应该查询数据库
  230. return 0 # 占位符
  231. def _process_video(self, video_data: Dict) -> bool:
  232. """
  233. 处理单个视频数据
  234. :param video_data: 视频数据
  235. :return: 处理成功返回True,失败返回False
  236. """
  237. # 先判断视频是否符合条件
  238. if not self._is_video_valid(video_data):
  239. self.logger.info(f"{self.trace_id}--视频因验证不通过被跳过")
  240. return False
  241. # 创建视频项
  242. item = VideoItem()
  243. # 从配置中获取字段映射并填充数据
  244. for field_name, path in self.video_fields_map.items():
  245. if isinstance(path, str) and path.startswith("$."):
  246. match = safe_extract(video_data, path)
  247. item.add_video_info(field_name, match)
  248. else:
  249. # 如果是固定值(int、str等),直接使用
  250. item.add_video_info(field_name, path)
  251. # 添加固定字段
  252. item.add_video_info("platform", self.platform)
  253. item.add_video_info("strategy", self.config["mode"])
  254. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  255. # 随机选择一个用户
  256. our_user = random.choice(self.user_list)
  257. item.add_video_info("user_id", our_user["uid"])
  258. item.add_video_info("user_name", our_user["nick_name"])
  259. video_title = item.get("title", "未知标题")
  260. self.logger.info(f"{self.trace_id}--正在处理视频: {video_title}")
  261. # 处理管道
  262. trace_id = f"{self.platform}-{uuid.uuid4()}"
  263. pipeline = PiaoQuanPipeline(
  264. platform=self.platform,
  265. mode=self.config["mode"],
  266. rule_dict=self.rule_dict,
  267. env=self.env,
  268. item=item.produce_item(),
  269. trace_id=trace_id,
  270. )
  271. if pipeline.process_item():
  272. self.download_cnt += 1
  273. self.mq.send_msg(item.produce_item())
  274. self.aliyun_log.logging(
  275. code="1002",
  276. message="成功发送至ETL",
  277. data=item.produce_item(),
  278. trace_id=self.trace_id
  279. )
  280. self.logger.info(f"{self.trace_id}--视频处理完成并发送至消息队列,已处理总数: {self.download_cnt}")
  281. # 检查下载限制
  282. if self.download_cnt >= self.download_min_limit:
  283. self.has_enough_videos = True
  284. self.aliyun_log.logging(
  285. code="2000",
  286. message=f"达到下载限制: {self.download_min_limit}",
  287. trace_id=self.trace_id
  288. )
  289. self.logger.info(f"{self.trace_id}--达到下载限制,停止进一步处理")
  290. return True
  291. self.logger.warning(f"{self.trace_id}--通过管道处理视频失败")
  292. return False
  293. def _fetch_video_list(self) -> Optional[List[Dict]]:
  294. """
  295. 获取并解析视频列表
  296. :return: 视频列表或None
  297. """
  298. self.logger.info(f"{self.trace_id}--从{self.request_url}获取视频列表")
  299. response = self._send_request(
  300. self.request_url,
  301. self.request_method,
  302. self.request_headers,
  303. self.request_body
  304. )
  305. if not response:
  306. self.logger.error(f"{self.trace_id}--获取视频列表失败")
  307. return None
  308. video_list = self._extract_video_list(response)
  309. self.logger.info(f"{self.trace_id}--获取到{len(video_list)}个视频")
  310. return video_list
  311. def _execute_post_actions(self):
  312. """执行爬取后的额外操作(如曝光上报)"""
  313. for action in self.config.get("post_actions", []):
  314. if action.get("trigger") == "after_video_processed":
  315. endpoint = action.get("endpoint")
  316. payload = action.get("payload", {})
  317. if endpoint:
  318. self.logger.info(f"{self.trace_id}--执行后置操作: {endpoint}")
  319. self._send_request(endpoint, payload=payload)
  320. def run(self):
  321. """执行爬取任务"""
  322. self.aliyun_log.logging(
  323. code=1003,
  324. message="开始执行爬虫",
  325. data=self.platform,
  326. trace_id=self.trace_id
  327. )
  328. self.logger.info(f"{self.trace_id}--开始{self.platform}执行爬虫")
  329. for loop in range(self.loop_times):
  330. if self.has_enough_videos:
  331. self.aliyun_log.logging(
  332. code=2000,
  333. message=f"[{self.platform}] 达到每日最大爬取量",
  334. data=self.platform,
  335. trace_id=self.trace_id
  336. )
  337. self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止爬虫")
  338. break
  339. self.logger.info(f"{self.trace_id}--开始第{loop + 1}/{self.loop_times}轮循环")
  340. video_list = self._fetch_video_list()
  341. if not video_list:
  342. self.logger.warning(f"{self.trace_id}--视频列表为空,跳过本轮循环")
  343. continue
  344. for video_data in video_list:
  345. if self.has_enough_videos:
  346. self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止处理")
  347. break
  348. self._process_video(video_data)
  349. # 执行额外操作(如曝光上报)
  350. self._execute_post_actions()
  351. # 添加循环间隔
  352. loop_interval = self.config.get("loop_interval", 0)
  353. if loop_interval > 0:
  354. self.logger.info(f"{self.trace_id}--在下一轮循环前等待{loop_interval}秒")
  355. time.sleep(loop_interval)
  356. self.aliyun_log.logging(
  357. code=0000,
  358. message="爬虫执行完成",
  359. data=self.platform,
  360. trace_id=self.trace_id
  361. )
  362. self.logger.info(f"{self.trace_id}--平台{self.platform}的爬虫完成,已处理{self.download_cnt}个视频")
  363. if __name__ == '__main__':
  364. cr = UniversalCrawler(
  365. platform_config={
  366. "platform": "benshanzhufu",
  367. "mode": "recommend",
  368. "method": "POST",
  369. "url": "https://api.example.com/video/list",
  370. "headers": {"Content-Type": "application/json"},
  371. "request_body": {"page": 1, "size": 20},
  372. "response_parse": {
  373. "data_path": "$.data.items",
  374. "fields": {
  375. "title": "$.title",
  376. "video_url": "$.videoUrl",
  377. "author": "$.author.name",
  378. "duration": "$.duration"
  379. }
  380. },
  381. "download_limit": {"min": 200},
  382. "loop_times": 3
  383. },
  384. rule_dict={
  385. 'videos_cnt': {'min': 500, 'max': 0},
  386. 'duration': {'min': 30, 'max': 1200}
  387. },
  388. user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}],
  389. trace_id=str(uuid.uuid4())
  390. )
  391. cr.run()