import random import time import uuid import requests from typing import Dict, List, Optional from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState from application.config.common import MQ from configs.config import base_url from application.functions import MysqlService from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from utils.extractors import safe_extract from application.spiders.base_spider import BaseSpider # 抽象基类导入 def before_send_log(retry_state: RetryCallState) -> None: attempt = retry_state.attempt_number last_result = retry_state.outcome if last_result.failed: exc = last_result.exception() logger = retry_state.kwargs.get('logger') url = retry_state.args[0] if retry_state.args else "unknown" if logger: logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}") class UniversalCrawler(BaseSpider): def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): super().__init__(platform_config, rule_dict, user_list, trace_id, env) self.mq = MQ(topic_name=f"topic_crawler_etl_{env}") self.base_api = base_url self.has_enough_videos = False self.download_cnt = 0 self.loop_times = self.platform_config.get('loop_times', 1) self.request_method = self.platform_config["method"].upper() self.request_url = self.platform_config["url"] self.request_headers = self.platform_config.get("headers", {}) self.request_body = self.platform_config.get("request_body", {}) self.response_data_path = self.platform_config["response_parse"]["data_path"] self.video_fields_map = self.platform_config["response_parse"]["fields"] @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log) def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]: method = method or self.request_method headers = headers or self.request_headers payload = payload or self.request_body response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout) response.raise_for_status() resp = response.json() if resp.get("code") == 0: return resp raise ValueError(f"API响应错误: {resp}") def fetch_video_data(self) -> Optional[List[Dict]]: self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}") try: response = self._send_request(self.request_url) return safe_extract(response, self.response_data_path) or [] except Exception as e: self.logger.error(f"{self.trace_id}--请求失败: {e}") return [] def is_video_qualified(self, video: Dict) -> bool: if not self.rule_dict: return True rule_duration = self.rule_dict.get("duration") if rule_duration: video_url = safe_extract(video, self.video_fields_map.get("video_url")) duration = self.get_video_duration(video_url) if not (rule_duration['min'] <= duration <= rule_duration['max']): return False rule_videos_cnt = self.rule_dict.get("videos_cnt") if rule_videos_cnt: video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos() if video_count >= rule_videos_cnt.get("min", 0): return False return True def transform_to_etl_item(self, video: Dict) -> Optional[Dict]: item = VideoItem() for field, path in self.video_fields_map.items(): val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path item.add_video_info(field, val) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", f"{self.platform}-{int(time.time())}") user = random.choice(self.user_list) item.add_video_info("user_id", user["uid"]) item.add_video_info("user_name", user["nick_name"]) return item.produce_item() def push_to_etl(self, item: Dict) -> bool: trace_id = f"{self.platform}-{uuid.uuid4()}" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=item, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(item) self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id) if self.download_cnt >= self.download_min_limit: self.has_enough_videos = True self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id) return True return False