import random import time import uuid from typing import Dict, List, Optional import requests from application.config.common import MQ from application.functions import MysqlService from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from config.config import base_url from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState from core.utils import safe_extract from spiders.base_spider import BaseSpider # 抽象基类导入 def before_send_log(retry_state: RetryCallState) -> None: attempt = retry_state.attempt_number last_result = retry_state.outcome if last_result.failed: exc = last_result.exception() logger = retry_state.kwargs.get('logger') url = retry_state.args[0] if retry_state.args else "unknown" if logger: logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}") class UniversalCrawler(BaseSpider): def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): super().__init__(platform_config, rule_dict, user_list, trace_id, env) self.mq = MQ(topic_name=f"topic_crawler_etl_{env}") self.base_api = base_url self.has_enough_videos = False self.download_cnt = 0 self.loop_times = self.platform_config.get('loop_times', 1) self.request_method = self.platform_config["method"].upper() self.request_url = self.platform_config["url"] self.request_headers = self.platform_config.get("headers", {}) self.request_body = self.platform_config.get("request_body", {}) self.response_data_path = self.platform_config["response_parse"]["data_path"] self.video_fields_map = self.platform_config["response_parse"]["fields"] @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log) def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]: method = method or self.request_method headers = headers or self.request_headers payload = payload or self.request_body response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout) response.raise_for_status() resp = response.json() if resp.get("code") == 0: return resp raise ValueError(f"API响应错误: {resp}") def fetch_video_data(self) -> Optional[List[Dict]]: self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}") try: response = self._send_request(self.request_url) return safe_extract(response, self.response_data_path) or [] except Exception as e: self.logger.error(f"{self.trace_id}--请求失败: {e}") return [] def is_video_qualified(self, video: Dict) -> bool: if not self.rule_dict: return True rule_duration = self.rule_dict.get("duration") if rule_duration: video_url = safe_extract(video, self.video_fields_map.get("video_url")) duration = self.get_video_duration(video_url) if not (rule_duration['min'] <= duration <= rule_duration['max']): return False rule_videos_cnt = self.rule_dict.get("videos_cnt") if rule_videos_cnt: video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos() if video_count >= rule_videos_cnt.get("min", 0): return False return True def transform_to_etl_item(self, video: Dict) -> Optional[Dict]: item = VideoItem() for field, path in self.video_fields_map.items(): val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path item.add_video_info(field, val) item.add_video_info("classname", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", f"{self.platform}-{int(time.time())}") user = random.choice(self.user_list) item.add_video_info("user_id", user["uid"]) item.add_video_info("user_name", user["nick_name"]) return item.produce_item() def push_to_etl(self, item: Dict) -> bool: trace_id = f"{self.platform}-{uuid.uuid4()}" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=item, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(item) self.aliyun_log.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id) if self.download_cnt >= self.download_min_limit: self.has_enough_videos = True self.aliyun_log.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id) return True return False