import asyncio import aiohttp from abc import ABC from typing import List, Dict, Optional import time from application.config.common.log.logger_manager import LoggerManager from utils.extractors import safe_extract from application.config.common import MQ from utils.config_loader import ConfigLoader # 新增导入 class BaseSpider(ABC): """ 通用爬虫基类:支持严格顺序执行流程 """ MAX_RETRIES = 3 # 单个请求最大重试次数 TIMEOUT = 30 # 请求超时时间(秒) def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): self.trace_id = trace_id self.env = env self.user_list = user_list self.rule_dict = rule_dict self.class_name = self.__class__.__name__ # 获取子类类名 # 根据类名自动获取配置 self.platform_config = ConfigLoader.get_config_by_class_name(self.class_name) if not self.platform_config: raise ValueError(f"找不到对应配置: {self.class_name}") # 初始化日志和MQ self.platform = self.platform_config.get("platform") self.mode = self.platform_config.get("mode") self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}") self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.mq = MQ(topic_name=f"topic_crawler_etl_{env}") # 请求配置 self.method = self.platform_config.get("method", "GET").upper() self.url = self.platform_config.get("url") self.headers = self.platform_config.get("headers", {}) self.body = self.platform_config.get("request_body", {}) self.field_map = self.platform_config.get("response_parse", {}).get("fields", {}) self.data_path = self.platform_config.get("response_parse", {}).get("data_path") self.video_fields_map = self.platform_config.get("video_fields_map", {}) # 流程控制配置 self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数 self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒) self.logger.info( f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒") async def _send_async_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse: """ 发送异步HTTP请求,支持重试机制 """ retries = 0 timeout = aiohttp.ClientTimeout(total=self.TIMEOUT) while retries < self.MAX_RETRIES: try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.request(method, url, **kwargs) as response: response.raise_for_status() return response except Exception as e: retries += 1 remaining_attempts = self.MAX_RETRIES - retries if retries < self.MAX_RETRIES: self.logger.warning( f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. " f"剩余尝试次数: {remaining_attempts}" ) await asyncio.sleep(1) # 异步等待 else: self.aliyun_logr.logging( code="5001", message="请求失败,已达到最大重试次数", data={ "url": url, "method": method, "error": str(e), "headers": kwargs.get("headers", {}), "body": kwargs.get("json", {}) }, trace_id=self.trace_id ) self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}") raise async def crawl_data(self) -> Optional[List[Dict]]: """异步获取视频数据""" self.logger.info(f"{self.trace_id}--开始获取视频数据") try: response = await self._send_async_request( method=self.method, url=self.url, headers=self.headers, json=self.body ) result = await response.json() data = safe_extract(result, self.data_path) if not data: self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}") return [] self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据") return data except Exception as e: self.logger.error(f"{self.trace_id}--获取视频数据失败: {e}") return [] def filter_data(self, video: Dict) -> bool: """校验视频是否符合规则""" if not self.rule_dict: return True rule_duration = self.rule_dict.get("duration") if rule_duration: video_url = safe_extract(video, self.video_fields_map.get("video_url")) duration = self.get_video_duration(video_url) if not (rule_duration['min'] <= duration <= rule_duration['max']): return False rule_videos_cnt = self.rule_dict.get("videos_cnt") if rule_videos_cnt: video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos() if video_count >= rule_videos_cnt.get("min", 0): return False return True def process_video(self, video: Dict) -> Optional[Dict]: """处理单条视频数据""" self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}") try: item = {} for field, path in self.field_map.items(): value = safe_extract(video, path) if value is None: self.logger.warning(f"{self.trace_id}--字段提取失败: {field}") continue item[field] = value if not item: self.logger.warning(f"{self.trace_id}--视频处理结果为空") return None item.update({ "platform": self.platform, "strategy": self.mode, "session": f"{self.platform}-{int(time.time())}" }) self.logger.debug(f"{self.trace_id}--视频处理成功") return item except Exception as e: self.logger.error(f"{self.trace_id}--视频处理异常: {e}") return None def push_to_etl(self, item: Dict) -> bool: """推送数据到ETL(同步)""" self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('title', '无标题')}") try: self.mq.send_msg(item) self.aliyun_logr.logging( code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id ) self.logger.info(f"{self.trace_id}--数据推送成功") return True except Exception as e: self.logger.error(f"{self.trace_id}--数据推送失败: {e}") return False async def run(self): """ 异步运行爬虫任务,严格按顺序执行 1. 爬取 2. 过滤 3. 处理每条数据 4. 推送到ETL """ self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务") for loop_index in range(1, self.loop_times + 1): self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求") loop_start_time = time.time() # 步骤1: 获取视频数据(失败则跳过当前循环) video_list = await self.crawl_data() if not video_list: self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环") await self._wait_for_next_loop(loop_index) continue # 步骤2: 处理每条视频并推送到ETL success_count = 0 fail_count = 0 for video in video_list: # 步骤2.1: 校验视频(失败则跳过) if not self.filter_data(video): self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过") continue # 步骤2.2: 处理视频(失败则记录并继续) item = self.process_video(video) if not item: self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过") fail_count += 1 continue # 步骤2.3: 推送到ETL(失败则记录并继续) if self.push_to_etl(item): success_count += 1 else: fail_count += 1 loop_duration = time.time() - loop_start_time self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. " f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒") # 等待下一次循环 await self._wait_for_next_loop(loop_index) self.logger.info(f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成") return True async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下一次循环请求""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval)