import asyncio import uuid from typing import List, Dict, Optional, Any import aiohttp from core.models.video_item import VideoItem from core.utils.helpers import generate_titles from core.utils.request_preparer import RequestPreparer from core.utils.spider_config import SpiderConfig from core.utils.extractors import safe_extract, extract_fields from core.utils.log.logger_manager import LoggerManager from services.async_mysql_service import AsyncMysqlService from services.pipeline import PiaoQuanPipeline from core.base.async_request_client import AsyncRequestClient from services.async_mq_producer import AsyncMQProducer class BaseSpider: """ 通用爬虫基类,支持: - 依赖请求参数动态替换(cursor 或其它参数) - 支持单请求和依赖请求的分页抓取 - 统一日志、MQ推送、异常捕获、异步请求 子类只需根据业务重写少量方法,如 process_video/process_item。 """ def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"): self.rule_dict = rule_dict self.user_list = user_list self.env = env self.class_name = self.__class__.__name__.lower() # --- 1. 初始化核心组件 --- self._setup_configuration() self._setup_logging() self._setup_services() self._setup_state() # 初始化辅助方法 def _setup_configuration(self): """加载并设置爬虫的核心配置。""" self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name) if not self.platform_config: raise ValueError(f"找不到爬虫配置: {self.class_name}") self.platform = self.platform_config.platform self.mode = self.platform_config.mode self.url = self.platform_config.url self.method = self.platform_config.method.upper() self.headers = self.platform_config.headers or {} # 请求和解析相关的配置 self.request_body_template = self.platform_config.request_body or {} self.response_parse_config = self.platform_config.response_parse or {} self.data_path = self.response_parse_config.get("data_path") # self.next_cursor_path = self.response_parse_config.get("next_cursor") self.field_map = self.response_parse_config.get("fields", {}) # 爬取行为相关的配置 self.loop_times = self.platform_config.loop_times or 100 self.loop_interval = self.platform_config.loop_interval or 5 self.timeout = self.platform_config.request_timeout or 30 self.max_retries = self.platform_config.max_retries or 3 self.feishu_sheetid = self.platform_config.feishu_sheetid def _setup_logging(self): """初始化日志记录器。""" self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...") self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔: {self.loop_interval}s") def _setup_services(self): """初始化外部服务客户端。""" self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log) self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode) def _setup_state(self): """初始化爬虫的内部状态。""" self.last_response_data = {} self.request_preparer = RequestPreparer( response_parse_config=self.response_parse_config, logger=self.logger, aliyun_log=self.aliyun_log ) async def run(self): """ 爬虫主流程 """ await self.before_run() total_success, total_fail = 0, 0 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: for loop_index in range(self.loop_times): if not await self.is_video_count_sufficient(): self.logger.info(f"视频抓取数量已达上限,停止爬取") return succ, fail = await self.run_single_loop(session) total_success += succ total_fail += fail await self._wait_for_next_loop(loop_index + 1) self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}") async def run_single_loop(self, session) -> (int, int): """ 执行单轮的请求、解析和处理。 返回: (本轮成功处理的数量, 本轮失败处理的数量) """ success_count, fail_count = 0, 0 try: # 爬取数据 videos = await self.crawl_data(session) if not videos: self.logger.info(f"无数据返回,停止本轮") return success_count, fail_count for video in videos: # 依赖接口请求 video_obj = await self.fetch_dependent_data(video) res = await self.process_and_push_video(video_obj) if res: success_count += 1 else: fail_count += 1 self.logger.info(f"接口返回<{len(videos)}>条视频,处理成功<{success_count}>条,处理失败:<{fail_count}>") await self.after_run() except Exception as e: self.logger.exception(f"运行异常: {e}") return success_count, fail_count async def fetch_dependent_data(self, video: Dict) -> Dict: """ 可在子类重写以实现依赖请求,用返回结果补充原有 video。 默认不做处理。 """ return video async def crawl_data(self, session) -> Optional[List[Dict]]: """ 请求接口,自动渲染动态参数,自动更新游标 支持单请求和多请求(分页)逻辑。 """ request_body = self.request_preparer.prepare(self.request_body_template, self.last_response_data) # 发送请求 response = await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json = request_body ) if not response: self.logger.error(f"响应为空") return self.last_response_data = response # 解析数据列表 data_list = safe_extract(response, self.data_path) if not data_list: self.logger.info(f"接口返回视频列表为空{response}") self.aliyun_log.logging( code="9021", message="接口返回视频列表为空", data= response ) return return data_list async def process_and_push_video(self, video: Dict[str, Any]) -> bool: """ 数据处理完整流程(字段映射 -> 校验 -> 推送) 子类可重写 process_video 或 filter_data 来定制处理和校验逻辑 """ try: # 字段映射 video_obj = await self.process_video(video) if not video_obj: return False if not await self.filter_data(video_obj): return False await self.integrated_video_handling(video_obj) pushed = await self.push_to_etl(video_obj) return pushed except Exception as e: self.logger.exception(f"视频处理异常: {e}") return False async def process_video(self, video: Dict) -> Optional[Dict]: """ 统一字段抽取及 VideoItem 初始化 子类可重写或扩展以定制字段映射、过滤等 """ self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}") if self.user_list: import random publish_user = random.choice(self.user_list) else: self.logger.error(f"未获取到用户列表数据{self.user_list}") return item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log) item_kwargs.update({ "user_id": publish_user.get("uid"), "user_name": publish_user.get("nick_name"), "platform": self.platform, "strategy": self.mode, }) try: item = VideoItem(**item_kwargs) video_dict = await item.produce_item() if not video_dict: self.logger.warning(f"VideoItem 校验失败") return None return video_dict except Exception as e: self.logger.error(f"VideoItem 初始化失败: {e}") return None async def filter_data(self, video: Dict) -> bool: """ 数据校验过滤,默认使用 PiaoQuanPipeline 子类可重写此方法实现自定义过滤 """ pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def integrated_video_handling(self, video: Dict) -> None: """ 钩子函数:可在此实现自动生成标题或其他业务逻辑 """ await generate_titles(self.feishu_sheetid, video) async def push_to_etl(self, video: Dict) -> bool: """ 推送消息到消息队列ETL """ try: await self.mq_producer.send_msg(video) self.logger.info(f"成功推送视频至ETL") return True except Exception as e: self.logger.exception(f"推送ETL失败: {e}") return False async def is_video_count_sufficient(self) -> bool: """ 判断当天抓取的视频是否已达到上限,达到则停止继续抓取 """ max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0) if max_count <= 0: return True async with AsyncMysqlService(self.platform, self.mode) as mysql: current_count = await mysql.get_today_videos() if current_count >= max_count: self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}") return False self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}") self.aliyun_log.logging(code="1012", message=f"目前入库量{current_count}", data=f"{current_count}/{max_count}" ) return True async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下次循环""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval) async def before_run(self): """运行前预处理钩子,子类可覆盖""" pass async def after_run(self): """运行后处理钩子,子类可覆盖""" pass