123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- import os
- import sys
- import json
- import random
- import time
- import uuid
- import yaml
- import requests
- import cv2
- from datetime import datetime
- from typing import Dict, Any, List, Optional, Union
- from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
- from utils.extractors import safe_extract, extract_multiple
- # 添加公共模块路径
- sys.path.append(os.getcwd())
- print(os.getcwd())
- from application.items import VideoItem
- from application.pipeline import PiaoQuanPipeline
- from application.common.messageQueue import MQ
- from application.common.log import AliyunLogger
- # from application.common.mysql import MysqlHelper
- from configs.messages import MESSAGES
- from configs import codes
- from utils.config_loader import ConfigLoader
- from application.common.log import Local
- from configs.config import base_url
- def before_send_log(retry_state: RetryCallState) -> None:
- """请求重试前记录日志"""
- attempt = retry_state.attempt_number
- last_result = retry_state.outcome
- if last_result.failed:
- exc = last_result.exception()
- logger = retry_state.kwargs.get('logger')
- url = retry_state.args[0] if retry_state.args else "unknown"
- if logger:
- logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
- class UniversalCrawler:
- """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
- def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- """
- 初始化爬虫
- :param platform_config: 平台配置字典
- :param rule_dict: 规则字典
- :param user_list: 用户列表
- :param trace_id: 追踪ID
- :param env: 运行环境
- """
- self.platform = platform_config["platform"]
- self.mode = platform_config["mode"]
- self.rule_dict = rule_dict
- self.user_list = user_list
- self.trace_id = trace_id
- self.env = env
- self.config = platform_config
- self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
- self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
- self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
- self.has_enough_videos = False
- self.base_api = base_url
- self.loop_times = platform_config.get('loop_times', 1)
- # 提取常用配置为类属性,提高可读性
- self.request_method = self.config["method"].upper()
- self.request_url = self.config["url"]
- self.request_headers = self.config.get("headers", {})
- self.request_body = self.config.get("request_body", {})
- self.response_data_path = self.config["response_parse"]["data_path"]
- self.video_fields_map = self.config["response_parse"]["fields"]
- # 下载限制配置
- self.download_min_limit = self.config.get("download_limit", {}).get("min", 200)
- @retry(
- stop=stop_after_attempt(3), # 最多重试3次
- wait=wait_fixed(2), # 每次重试间隔2秒
- retry=retry_if_exception_type((requests.RequestException, ValueError)),
- before=before_send_log, # 添加重试前日志
- )
- def _send_request(self, url: str, method: str = None, headers: Dict = None,
- payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
- """
- 发送API请求,失败自动重试最多3次
- :param url: 请求URL
- :param method: 请求方法,默认使用配置中的方法
- :param headers: 请求头,默认使用配置中的头
- :param payload: 请求体,默认使用配置中的体
- :param timeout: 超时时间
- :return: 响应JSON数据或None
- """
- # 使用默认配置(如果未提供参数)
- method = method or self.request_method
- headers = headers or self.request_headers
- payload = payload or self.request_body
- try:
- self.logger.info(f"{self.trace_id}--正在发送请求: {url}")
- response = requests.request(
- method=method,
- url=url,
- headers=headers,
- json=payload,
- timeout=timeout
- )
- response.raise_for_status()
- resp = response.json()
- if resp.get("code") == 0:
- return resp
- self.logger.warning(f"{self.trace_id}--API响应非零状态码: {resp}")
- raise ValueError(f"API响应错误: {resp}")
- except requests.exceptions.Timeout:
- self.logger.error(f"{self.trace_id}--请求超时: {url}")
- raise
- except requests.exceptions.RequestException as e:
- self.logger.error(f"{self.trace_id}--请求异常: {e}")
- raise
- except json.JSONDecodeError as e:
- self.logger.error(f"{self.trace_id}--解析JSON响应失败: {e}")
- raise
- except Exception as e:
- # 在最后一次失败时记录详细日志
- self.aliyun_log.logging(
- code="3000",
- message=f"请求失败: {url}",
- data={"error": str(e)},
- trace_id=self.trace_id
- )
- self.logger.error(f"{self.trace_id}--意外错误: {e}")
- raise
- def get_video_duration(self, video_url: str, timeout: int = 20) -> float:
- """
- 获取网络视频的时长(秒),增加网络异常处理和超时控制
- :param video_url: 视频URL
- :param timeout: 超时时间
- :return: 视频时长(秒),失败时返回0
- """
- # 检查URL是否可访问
- try:
- response = requests.head(video_url, timeout=timeout)
- response.raise_for_status() # 检查HTTP状态码
- except requests.exceptions.RequestException as e:
- self.logger.error(f"{self.trace_id}--网络错误: 无法访问视频URL - {e}")
- return 0
- cap = None
- try:
- # 创建VideoCapture对象
- cap = cv2.VideoCapture(video_url)
- # 设置缓冲区大小,减少延迟
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
- # 尝试打开视频流(最多尝试3次)
- max_attempts = 3
- for attempt in range(max_attempts):
- if cap.isOpened():
- break
- self.logger.info(f"{self.trace_id}--尝试打开视频流 ({attempt + 1}/{max_attempts})...")
- time.sleep(1)
- if not cap.isOpened():
- self.logger.error(f"{self.trace_id}--错误: 无法打开视频流 {video_url}")
- return 0
- # 获取视频属性
- fps = cap.get(cv2.CAP_PROP_FPS)
- frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
- if fps <= 0 or frame_count <= 0:
- # 某些网络视频可能无法直接获取总帧数,尝试读取几帧估算
- self.logger.info(f"{self.trace_id}--无法获取总帧数,尝试估算...")
- frame_count = 0
- start_time = time.time()
- # 读取10帧估算帧率和时长
- for _ in range(10):
- ret, frame = cap.read()
- if not ret:
- break
- frame_count += 1
- elapsed_time = time.time() - start_time
- if elapsed_time > 0:
- estimated_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
- # 假设视频时长为30秒(可根据实际情况调整)
- estimated_duration = 30.0
- self.logger.info(f"{self.trace_id}--估算视频时长: {estimated_duration}秒 (基于{frame_count}帧)")
- return estimated_duration
- else:
- self.logger.error(f"{self.trace_id}--错误: 无法估算视频时长")
- return 0
- duration = frame_count / fps
- self.logger.info(f"{self.trace_id}--视频时长: {duration}秒")
- return duration
- except Exception as e:
- self.logger.error(f"{self.trace_id}--获取视频时长时发生异常: {e}")
- return 0
- finally:
- if cap:
- cap.release() # 确保资源释放
- def _extract_video_list(self, response: Dict) -> List[Dict]:
- """从API响应中提取视频列表"""
- return safe_extract(response, self.response_data_path) or []
- def _is_video_valid(self, video_data: Dict) -> bool:
- """
- 判断视频是否满足条件
- :param video_data: 视频数据
- :return: True if valid, False otherwise
- """
- if not self.rule_dict:
- return True
- # 检查视频时长规则
- rule_duration = self.rule_dict.get("duration")
- if rule_duration:
- extract_video_url_path = self.video_fields_map.get("video_url")
- if not extract_video_url_path:
- self.logger.warning(f"{self.trace_id}--缺少视频URL字段映射")
- return False
- video_url = safe_extract(video_data, extract_video_url_path)
- if not video_url:
- self.logger.warning(f"{self.trace_id}--无法提取视频URL")
- return False
- video_duration = self.get_video_duration(video_url)
- min_duration = rule_duration.get("min", 0)
- max_duration = rule_duration.get("max", float('inf'))
- if not (min_duration <= video_duration <= max_duration):
- self.logger.info(f"{self.trace_id}--视频时长{video_duration}秒超出范围[{min_duration}, {max_duration}]")
- return False
- # 检查视频数量规则
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if rule_videos_cnt:
- # 这里应该查询数据库获取实际视频数量
- # 示例代码,实际实现需要根据业务逻辑完善
- video_count = self._get_video_count_from_db() # 假设这是获取视频数量的方法
- min_count = rule_videos_cnt.get("min", 0)
- if video_count >= min_count:
- self.logger.info(f"{self.trace_id}--视频数量{video_count}达到最小要求{min_count}")
- return False
- return True
- def _get_video_count_from_db(self) -> int:
- """从数据库获取视频数量(示例方法,需根据实际业务实现)"""
- # 实际实现中应该查询数据库
- return 0 # 占位符
- def _process_video(self, video_data: Dict) -> bool:
- """
- 处理单个视频数据
- :param video_data: 视频数据
- :return: 处理成功返回True,失败返回False
- """
- # 先判断视频是否符合条件
- if not self._is_video_valid(video_data):
- self.logger.info(f"{self.trace_id}--视频因验证不通过被跳过")
- return False
- # 创建视频项
- item = VideoItem()
- # 从配置中获取字段映射并填充数据
- for field_name, path in self.video_fields_map.items():
- if isinstance(path, str) and path.startswith("$."):
- match = safe_extract(video_data, path)
- item.add_video_info(field_name, match)
- else:
- # 如果是固定值(int、str等),直接使用
- item.add_video_info(field_name, path)
- # 添加固定字段
- item.add_video_info("platform", self.platform)
- item.add_video_info("strategy", self.config["mode"])
- item.add_video_info("session", f"{self.platform}-{int(time.time())}")
- # 随机选择一个用户
- our_user = random.choice(self.user_list)
- item.add_video_info("user_id", our_user["uid"])
- item.add_video_info("user_name", our_user["nick_name"])
- video_title = item.get("title", "未知标题")
- self.logger.info(f"{self.trace_id}--正在处理视频: {video_title}")
- # 处理管道
- trace_id = f"{self.platform}-{uuid.uuid4()}"
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.config["mode"],
- rule_dict=self.rule_dict,
- env=self.env,
- item=item.produce_item(),
- trace_id=trace_id,
- )
- if pipeline.process_item():
- self.download_cnt += 1
- self.mq.send_msg(item.produce_item())
- self.aliyun_log.logging(
- code="1002",
- message="成功发送至ETL",
- data=item.produce_item(),
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--视频处理完成并发送至消息队列,已处理总数: {self.download_cnt}")
- # 检查下载限制
- if self.download_cnt >= self.download_min_limit:
- self.has_enough_videos = True
- self.aliyun_log.logging(
- code="2000",
- message=f"达到下载限制: {self.download_min_limit}",
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--达到下载限制,停止进一步处理")
- return True
- self.logger.warning(f"{self.trace_id}--通过管道处理视频失败")
- return False
- def _fetch_video_list(self) -> Optional[List[Dict]]:
- """
- 获取并解析视频列表
- :return: 视频列表或None
- """
- self.logger.info(f"{self.trace_id}--从{self.request_url}获取视频列表")
- response = self._send_request(
- self.request_url,
- self.request_method,
- self.request_headers,
- self.request_body
- )
- if not response:
- self.logger.error(f"{self.trace_id}--获取视频列表失败")
- return None
- video_list = self._extract_video_list(response)
- self.logger.info(f"{self.trace_id}--获取到{len(video_list)}个视频")
- return video_list
- def _execute_post_actions(self):
- """执行爬取后的额外操作(如曝光上报)"""
- for action in self.config.get("post_actions", []):
- if action.get("trigger") == "after_video_processed":
- endpoint = action.get("endpoint")
- payload = action.get("payload", {})
- if endpoint:
- self.logger.info(f"{self.trace_id}--执行后置操作: {endpoint}")
- self._send_request(endpoint, payload=payload)
- def run(self):
- """执行爬取任务"""
- self.aliyun_log.logging(
- code=1003,
- message="开始执行爬虫",
- data=self.platform,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--开始{self.platform}执行爬虫")
- for loop in range(self.loop_times):
- if self.has_enough_videos:
- self.aliyun_log.logging(
- code=2000,
- message=f"[{self.platform}] 达到每日最大爬取量",
- data=self.platform,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止爬虫")
- break
- self.logger.info(f"{self.trace_id}--开始第{loop + 1}/{self.loop_times}轮循环")
- video_list = self._fetch_video_list()
- if not video_list:
- self.logger.warning(f"{self.trace_id}--视频列表为空,跳过本轮循环")
- continue
- for video_data in video_list:
- if self.has_enough_videos:
- self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止处理")
- break
- self._process_video(video_data)
- # 执行额外操作(如曝光上报)
- self._execute_post_actions()
- # 添加循环间隔
- loop_interval = self.config.get("loop_interval", 0)
- if loop_interval > 0:
- self.logger.info(f"{self.trace_id}--在下一轮循环前等待{loop_interval}秒")
- time.sleep(loop_interval)
- self.aliyun_log.logging(
- code=0000,
- message="爬虫执行完成",
- data=self.platform,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--平台{self.platform}的爬虫完成,已处理{self.download_cnt}个视频")
- if __name__ == '__main__':
- cr = UniversalCrawler(
- platform_config={
- "platform": "benshanzhufu",
- "mode": "recommend",
- "method": "POST",
- "url": "https://api.example.com/video/list",
- "headers": {"Content-Type": "application/json"},
- "request_body": {"page": 1, "size": 20},
- "response_parse": {
- "data_path": "$.data.items",
- "fields": {
- "title": "$.title",
- "video_url": "$.videoUrl",
- "author": "$.author.name",
- "duration": "$.duration"
- }
- },
- "download_limit": {"min": 200},
- "loop_times": 3
- },
- rule_dict={
- 'videos_cnt': {'min': 500, 'max': 0},
- 'duration': {'min': 30, 'max': 1200}
- },
- user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}],
- trace_id=str(uuid.uuid4())
- )
- cr.run()
|