# spiders/basespider.py import asyncio import random import traceback from typing import List, Dict, Any, Optional from abc import ABC, abstractmethod import aiohttp from core.utils.request_preparer import RequestPreparer from core.utils.spider_config import SpiderConfig from core.utils.log.logger_manager import LoggerManager from core.video_processor import VideoProcessor from services.async_mysql_service import AsyncMysqlService from services.async_mq_producer import AsyncMQProducer from core.base.async_request_client import AsyncRequestClient class BaseSpider(ABC): """爬虫基类 - 简化版本,不包含循环逻辑""" def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod", request_client: AsyncRequestClient = None, db_service: AsyncMysqlService = None, mq_producer: AsyncMQProducer = None): # 基础属性 self.rule_dict = rule_dict self.user_list = user_list self.env = env # 服务依赖 self.request_client = request_client self.db_service = db_service self.mq_producer = mq_producer # 通过类名获取配置 class_name = self.__class__.__name__.lower() self.config = SpiderConfig.get_platform_config(class_name) self._setup_from_config() # 日志服务 self.logger = LoggerManager.get_logger( platform=self.config.platform, mode=self.config.mode ) self.aliyun_log = LoggerManager.get_aliyun_logger( platform=self.config.platform, mode=self.config.mode ) # 请求准备器 self.request_preparer = RequestPreparer( response_parse_config=self.config.response_parse, logger=self.logger, aliyun_log=self.aliyun_log ) # 状态跟踪 self.stats = { 'success': 0, 'fail': 0, 'start_time': None, 'end_time': None } # 如果没有传入服务,则创建默认实例 self._setup_default_services() # 初始化视频处理器 self.video_processor = VideoProcessor( platform=self.platform, mode=self.mode, field_map=self.field_map, feishu_sheetid=self.feishu_sheetid, logger=self.logger, aliyun_log=self.aliyun_log ) def _setup_default_services(self): """设置默认服务实例""" if not self.request_client: self.request_client = AsyncRequestClient( logger=self.logger, aliyun_log=self.aliyun_log ) if not self.db_service: self.db_service = AsyncMysqlService( platform=self.config.platform, mode=self.config.mode ) if not self.mq_producer: self.mq_producer = AsyncMQProducer( topic_name="topic_crawler_etl_prod_v2", platform=self.config.platform, mode=self.config.mode ) def _setup_from_config(self): """从配置中设置属性""" self.platform = self.config.platform self.mode = self.config.mode self.url = self.config.url self.method = self.config.method.upper() self.headers = self.config.headers or {} self.request_body_template = self.config.request_body or {} self.loop_times = self.config.loop_times or 100 self.timeout = self.config.request_timeout or 30 self.feishu_sheetid = self.config.feishu_sheetid # 响应解析配置 response_parse = self.config.response_parse or {} self.data_path = response_parse.get("data_path") self.has_more = response_parse.get("has_more") self.next_cursor = response_parse.get("next_cursor") self.field_map = response_parse.get("fields", {}) async def run(self): """主运行流程 - 子类实现完整循环逻辑""" self.stats['start_time'] = asyncio.get_event_loop().time() self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}") try: await self.before_run() await self.execute() # 子类实现完整的执行逻辑 except Exception as e: self.logger.error(f"爬虫执行异常: {e}\n{traceback.format_exc()}") finally: await self.after_run() self._log_final_stats() @abstractmethod async def execute(self): """执行核心逻辑 - 子类必须实现完整循环逻辑""" pass async def process_data(self, data: List[Dict]): """处理数据""" success_count = 0 for item in data: self.aliyun_log.logging( code="1001", message=f"获取到一条数据", data=item ) try: if await self.process_single_item(item): success_count += 1 self.stats['success'] += 1 else: self.stats['fail'] += 1 except Exception as e: self.logger.error(f"处理单条数据失败: {e}") self.stats['fail'] += 1 self.logger.info(f"批次处理完成: 成功 {success_count}/{len(data)}") return success_count async def process_single_item(self, item: Dict) -> bool: """处理单条数据""" # 1. 补充详情 detail_item = await self.fetch_detail(item) # 2. 使用视频处理器进行完整处理 video_obj = await self.video_processor.process_single_video( raw_data=detail_item, user_info=await self.select_publish_user(), rule_dict=self.rule_dict, env=self.env ) if not video_obj: return False # 3. 推送数据 return await self.push_video(video_obj) async def select_publish_user(self) -> Dict: """选择发布用户""" if self.user_list: return random.choice(self.user_list) return {} async def fetch_detail(self, item: Dict) -> Dict: """补充详情 - 子类可重写""" return item async def push_video(self, video: Dict) -> bool: """推送视频数据""" try: await self.mq_producer.send_msg(video) self.aliyun_log.logging( code="1002", message="推送ETL成功", data=video ) return True except Exception as e: self.logger.error(f"推送视频失败: {e}") return False async def is_video_count_sufficient(self) -> bool: """ 检查视频数量是否足够 未达到 True 反之 False """ max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0) if max_count <= 0: return True current_count = await self.db_service.get_today_videos() if current_count >= max_count: self.logger.info(f"视频数量达到当日最大值: {current_count}/{max_count}") self.aliyun_log.logging( code="1011", message="视频数量达到最大值", data={ "current_count": current_count, "max_count": max_count } ) return False return True async def wait_between_iterations(self, wait_time: int = None): """等待间隔""" if wait_time is None: interval_config = getattr(self.config, 'loop_interval', {}) min_time = interval_config.get('min', 1) max_time = interval_config.get('max', 5) wait_time = random.randint(min_time, max_time) self.logger.info(f"等待 {wait_time} 秒") await asyncio.sleep(wait_time) async def make_request(self, request_body: Dict) -> Optional[Dict]: """发送请求""" async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout) ) as session: return await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json=request_body ) async def before_run(self): """运行前准备 - 子类可重写""" pass async def after_run(self): """运行后清理 - 子类可重写""" pass def _log_final_stats(self): """记录最终统计""" self.stats['end_time'] = asyncio.get_event_loop().time() duration = 0 if self.stats['start_time'] is not None: duration = self.stats['end_time'] - self.stats['start_time'] self.logger.info( f"爬虫执行完成: 成功 {self.stats['success']}, " f"失败 {self.stats['fail']}, 耗时 {duration:.2f}秒" )