import asyncio import random import traceback import uuid from typing import List, Dict, Optional, Any from abc import ABC, abstractmethod from core.models.video_item import VideoItem from core.utils.helpers import generate_titles from core.utils.request_preparer import RequestPreparer from core.utils.spider_config import SpiderConfig from core.utils.extractors import safe_extract, extract_fields from core.utils.log.logger_manager import LoggerManager from services.async_mysql_service import AsyncMysqlService from services.pipeline import PiaoQuanPipeline from core.base.async_request_client import AsyncRequestClient from services.async_mq_producer import AsyncMQProducer class BaseSpider(ABC): """通用爬虫基类""" def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod", request_client: AsyncRequestClient = None, db_service: AsyncMysqlService = None, mq_producer: AsyncMQProducer = None): self.rule_dict = rule_dict self.user_list = user_list self.env = env self.class_name = self.__class__.__name__.lower() # 初始化核心组件 self._setup_configuration() self._setup_logging() self._setup_services(request_client, db_service, mq_producer) self._setup_state() # 通用状态 self.total_success = 0 self.total_fail = 0 self.video = None def _setup_configuration(self): self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name) if not self.platform_config: raise ValueError(f"找不到爬虫配置: {self.class_name}") self.platform = self.platform_config.platform self.mode = self.platform_config.mode self.url = self.platform_config.url self.method = self.platform_config.method.upper() self.headers = self.platform_config.headers or {} self.request_body_template = self.platform_config.request_body or {} self.response_parse_config = self.platform_config.response_parse or {} self.data_path = self.response_parse_config.get("data_path") self.has_more = self.response_parse_config.get("has_more") self.field_map = self.response_parse_config.get("fields", {}) self.next_cursor = self.response_parse_config.get("next_cursor") or "" self.loop_times = self.platform_config.loop_times or 100 self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5} self.timeout = self.platform_config.request_timeout or 30 self.max_retries = self.platform_config.max_retries or 3 self.feishu_sheetid = self.platform_config.feishu_sheetid def _setup_logging(self): self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...") def _setup_services(self, request_client: AsyncRequestClient = None, db_service: AsyncMysqlService = None, mq_producer: AsyncMQProducer = None): """初始化服务组件""" self.request_client = request_client or AsyncRequestClient( logger=self.logger, aliyun_log=self.aliyun_log ) self.db_service = db_service or AsyncMysqlService( platform=self.platform, mode=self.mode ) self.mq_producer = mq_producer or AsyncMQProducer( topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode ) def _setup_state(self): self.last_response_data = {} self.request_preparer = RequestPreparer( response_parse_config=self.response_parse_config, logger=self.logger, aliyun_log=self.aliyun_log ) # 核心入口(统一流程) async def run(self): """主流程:初始化→核心循环→收尾""" self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}") await self.before_run() try: await self.core_loop() # 子类实现具体模式逻辑 except Exception as e: tb = traceback.format_exc() self.logger.exception(f"运行异常: {e},堆栈信息:{tb}") finally: await self.after_run() self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}") @abstractmethod async def core_loop(self): """子类必须实现:模式特有核心循环(推荐/账号)""" pass async def fetch_detail(self, item: Dict) -> Dict: """子类选择实现:补充详情(完全由子类控制)""" return item # 通用数据处理流程 async def process_data(self, video_data: List[Dict]): """处理原始数据列表(清洗→过滤→推送)""" for item in video_data: try: # 补充详情(完全由子类实现) detail_data = await self.fetch_detail(item) # 处理并推送 result = await self.process_and_push_video(detail_data) if result: self.total_success += 1 else: self.total_fail += 1 except Exception as e: self.logger.exception(f"处理单条数据失败: {e}") self.total_fail += 1 async def process_and_push_video(self, video: Dict[str, Any]) -> bool: try: self.video = video video_obj = await self.process_video(video) if not video_obj: return False if not await self.filter_data(video_obj): return False await self.integrated_video_handling(video_obj) return await self.push_to_etl(video_obj) except Exception as e: self.logger.exception(f"视频处理异常: {e}") return False async def publish_video_user(self) -> Dict[str, Any]: """获取随机发布用户""" if self.user_list: return random.choice(self.user_list) else: self.logger.error("未获取到用户列表数据") return None async def process_video(self, video: Dict) -> Optional[Dict]: """ 字段映射 统一字段抽取及 VideoItem 初始化 """ self.logger.info(f"处理视频数据: {video}") publish_user = await self.publish_video_user() # 检查是否成功获取到发布用户 if not publish_user: self.logger.error("无法获取发布用户信息") return None item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log) item_kwargs.update({ "user_id": publish_user.get("uid"), "user_name": publish_user.get("nick_name"), "platform": self.platform, "strategy": self.mode, }) try: item = VideoItem(**item_kwargs) video_dict = await item.produce_item() if not video_dict: self.logger.warning("VideoItem 校验失败") return None return video_dict except Exception as e: self.logger.error(f"VideoItem 初始化失败: {e}") return None async def filter_data(self, video: Dict) -> bool: """ 数据校验过滤,默认使用 PiaoQuanPipeline 子类可重写此方法实现自定义过滤 """ pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def integrated_video_handling(self, video: Dict) -> None: """ 钩子函数:可在此实现自动生成标题或其他业务逻辑 """ # 视频标题处理生成 await generate_titles(self.feishu_sheetid, video,self.logger,self.aliyun_log) async def push_to_etl(self, video: Dict) -> bool: try: await self.mq_producer.send_msg(video) self.aliyun_log.logging(code="1009", message="推送ETL成功", data=video) self.logger.info(f"成功推送视频至ETL: {video}") return True except Exception as e: self.logger.exception(f"推送ETL失败: {e}") return False async def is_video_count_sufficient(self) -> bool: """ 校验当日视频是否达到最大爬取量 True未达到 False达到最大量 :return:True/False """ max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0) if max_count <= 0: self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测") return True current_count = await self.db_service.get_today_videos() if current_count >= max_count: self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}") self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}") return False self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}") self.aliyun_log.logging(code="1012", message=f"目前入库量{current_count}", data=f"{current_count}/{max_count}" ) return True async def wait(self): """等待随机时间间隔""" # 确保loop_interval包含min和max键 min_time = self.loop_interval.get("min", 1) max_time = self.loop_interval.get("max", 5) wait_time = random.randint(min_time, max_time) self.logger.info(f"等待 {wait_time} 秒后继续") await asyncio.sleep(wait_time) async def before_run(self): """运行前钩子(子类可重写)""" pass async def after_run(self): """运行后钩子(子类可重写)""" pass