import asyncio import random import traceback import uuid import aiohttp from abc import ABC from typing import List, Dict, Optional import time from core.utils.log.logger_manager import LoggerManager from services.pipeline import PiaoQuanPipeline from core.utils.extractors import safe_extract from core.utils.config_loader import ConfigLoader from services.async_mysql_service import AsyncMysqlService from core.models.video_item import VideoItem class BaseSpider(ABC): """ 通用爬虫基类:支持严格顺序执行流程 """ MAX_RETRIES = 3 # 单个请求最大重试次数 TIMEOUT = 30 # 请求超时时间(秒) def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): self.trace_id = trace_id self.env = env self.user_list = user_list self.rule_dict = rule_dict self.class_name = self.__class__.__name__ # 获取子类类名 # 根据类名自动获取配置 self.platform_config = ConfigLoader.get_platform_config(platform=str(self.class_name.lower())) if not self.platform_config: raise ValueError(f"找不到对应配置: {self.class_name}") # 初始化日志和MQ self.platform = self.platform_config.get("platform") self.mode = self.platform_config.get("mode") self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}") self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.mq = MQ(topic_name=f"topic_crawler_etl_{env}") # 请求配置 self.method = self.platform_config.get("method", "GET").upper() self.url = self.platform_config.get("url") self.headers = self.platform_config.get("headers", {}) self.body = self.platform_config.get("request_body", {}) self.field_map = self.platform_config.get("response_parse", {}).get("fields", {}) self.data_path = self.platform_config.get("response_parse", {}).get("data_path") self.video_fields_map = self.platform_config.get("video_fields_map", {}) # 流程控制配置 self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数 self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒) self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) self.logger.info( f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒") self.session = None async def _send_async_request(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> aiohttp.ClientResponse: """ 使用提供的 session 发送异步HTTP请求,支持重试机制 """ retries = 0 self.logger.info(f"{self.trace_id}--请求准备: {method} {url}, 参数: {kwargs}") while retries < self.MAX_RETRIES: try: async with session.request(method, url, **kwargs) as response: response.raise_for_status() self.logger.info(f"{self.trace_id}--请求成功: {response.status}") return await response.json() except Exception as e: retries += 1 remaining_attempts = self.MAX_RETRIES - retries if retries < self.MAX_RETRIES: self.logger.warning( f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. " f"剩余尝试次数: {remaining_attempts}" ) await asyncio.sleep(1) else: self.aliyun_logr.logging( code="5001", message="请求失败,已达到最大重试次数", data={ "url": url, "method": method, "error": str(e), "headers": kwargs.get("headers", {}), "body": kwargs.get("json", {}) }, trace_id=self.trace_id ) self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}") raise async def crawl_data(self) -> Optional[List[Dict]]: """异步获取视频数据""" self.logger.info(f"{self.trace_id}--开始获取视频数据") try: response = await self._send_async_request( session=self.session, method=self.method, url=self.url, headers=self.headers, json=self.body ) self.logger.debug(f"{self.trace_id}--响应结果: {response}") data = safe_extract(response, self.data_path) if not data: self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}") return [] self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据") return data except Exception as e: traceback.extract_stack() self.logger.exception(f"{self.trace_id}--获取视频数据失败: {e}") return [] async def filter_data(self, video: Dict) -> bool: """校验视频是否符合规则""" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def is_video_count_sufficient(self) -> bool: """ 校验视频是否达到当日最大量 :return:True False """ rule_videos_cnt = self.rule_dict.get("videos_cnt") if not rule_videos_cnt: return True async with AsyncMysqlService(self.platform, self.mode) as mysql: video_count = await mysql.get_today_videos() if video_count >= rule_videos_cnt.get("min", 200): self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}") return False return True async def process_video(self, video: Dict) -> Optional[Dict]: """ 处理单条视频数据,字段映射关系,必要字段检验 :param video: :return: """ self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}") publish_user = random.choice(self.user_list) try: # 从 field_map 中动态构建 VideoItem 初始化参数 item_kwargs = {} for field, path in self.field_map.items(): if not isinstance(path, str) or not path.startswith("$"): item_kwargs[field] = path continue value = safe_extract(video, path) if value is None: self.logger.warning(f"{self.trace_id}--字段提取失败: {field} 路径: {path}") continue item_kwargs[field] = value item_kwargs["user_id"] = publish_user["uid"] item_kwargs["user_name"] = publish_user["nick_name"] # 手动注入 platform 与 strategy item_kwargs["platform"] = self.platform item_kwargs["strategy"] = self.mode try: item = VideoItem(**item_kwargs) except Exception as e: self.logger.warning(f"{self.trace_id}--VideoItem 初始化失败: {e}, 数据: {item_kwargs}") return None video_dict = await item.produce_item() if not video_dict: self.logger.warning(f"{self.trace_id}--VideoItem 校验失败") return None return video_dict except Exception as e: self.logger.exception(f"{self.trace_id}--视频处理异常: {e}") return None async def push_to_etl(self, item: Dict) -> bool: """推送数据到ETL(同步)""" self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}") try: self.mq.send_msg(item) self.aliyun_logr.logging( code="1009", message="成功发送至ETL", data=item, trace_id=self.trace_id ) self.logger.info(f"{self.trace_id}--数据推送成功") return True except Exception as e: self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}") return False async def get_today_videos(self): """ 查询每天的爬虫爬取到的视频数量 :return: """ video_count = self.db_service.get_today_videos() return video_count async def integrated_video_handling(self): """ 视频处理 :return: """ pass async def run(self): """ 异步运行爬虫任务,严格按顺序执行 1. 爬取 2. 处理每条数据,字段校验 3. 过滤(重复,平台规则,标题,发布时间) 4. 标题处理 5. 推送到ETL """ try: self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务") total_success = 0 total_failed = 0 async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.TIMEOUT) ) as session: # 上下文管理 self.session = session for loop_index in range(1, self.loop_times + 1): if not await self.is_video_count_sufficient(): return self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求") loop_start_time = time.time() video_list = await self.crawl_data() if not video_list: self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环") await self._wait_for_next_loop(loop_index) continue success_count = 0 fail_count = 0 for video in video_list: video_obj = await self.process_video(video) if not video_obj: self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过") fail_count += 1 continue if not await self.filter_data(video_obj): self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过") continue await self.integrated_video_handling() if await self.push_to_etl(video_obj): success_count += 1 else: fail_count += 1 total_success += success_count total_failed += fail_count loop_duration = time.time() - loop_start_time self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. " f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒") await self._wait_for_next_loop(loop_index) # 全局指标日志 self.aliyun_logr.logging( code="1003", message="爬虫执行指标汇总", data={ "trace_id": self.trace_id, "platform": self.platform, "success_count": total_success, "fail_count": total_failed }, trace_id=self.trace_id ) self.logger.info( f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}") return True except Exception as e: self.logger.error(f"爬虫致命错误: {e}") raise async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下一次循环请求""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval)