import asyncio import random import time import traceback import uuid from abc import ABC from typing import List, Dict, Optional import aiohttp from core.models.video_item import VideoItem from core.utils.helpers import generate_titles from core.utils.spider_config import SpiderConfig from core.utils.extractors import safe_extract, extract_fields from core.utils.log.logger_manager import LoggerManager from services.async_mysql_service import AsyncMysqlService from services.pipeline import PiaoQuanPipeline from core.base.async_request_client import AsyncRequestClient from services.async_mq_producer import AsyncMQProducer class BaseSpider(ABC): """ 通用爬虫基类:支持严格顺序执行流程 """ MAX_RETRIES = 3 # 单个请求最大重试次数 TIMEOUT = 30 # 请求超时时间(秒) def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): self.trace_id = trace_id self.env = env self.user_list = user_list self.rule_dict = rule_dict self.class_name = self.__class__.__name__ # 获取子类类名 # 根据类名自动获取配置 self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower())) if not self.platform_config: raise ValueError(f"找不到对应配置: {self.class_name}") # 初始化日志和MQ self.platform = self.platform_config.platform self.mode = self.platform_config.mode self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}") self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode) # 请求配置 self.method = self.platform_config.method.upper() self.url = self.platform_config.url self.headers = self.platform_config.headers self.body = self.platform_config.request_body self.response =self.platform_config.response_parse self.field_map = self.response.get("fields", {}) self.data_path = self.response.get("data_path") # 流程控制配置 self.loop_times = self.platform_config.loop_times # 循环次数 self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒) self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr) self.logger.info( f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒") self.session = None self.feishu_sheetid = self.platform_config.feishu_sheetid async def crawl_data(self,session) -> Optional[List[Dict]]: response = await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json=self.body ) data = safe_extract(response, self.data_path) return data if data else [] async def filter_data(self, video: Dict) -> bool: """校验视频是否符合规则""" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def is_video_count_sufficient(self) -> bool: """ 校验视频是否达到当日最大量 :return:True False """ rule_videos_cnt = self.rule_dict.get("videos_cnt") if not rule_videos_cnt: return True async with AsyncMysqlService(self.platform, self.mode) as mysql: video_count = await mysql.get_today_videos() if video_count >= rule_videos_cnt.get("min", 200): self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}") self.aliyun_logr.logging( code="1011", message=f"视频数量达到当日最大值", data=f"<今日视频数量>{video_count}" ) return False self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}") return True async def process_video(self, video: Dict) -> Optional[Dict]: """ 处理单条视频数据,字段映射关系,必要字段检验 :param video: :return: """ self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}") publish_user = random.choice(self.user_list) item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr) item_kwargs["user_id"] = publish_user["uid"] item_kwargs["user_name"] = publish_user["nick_name"] item_kwargs["platform"] = self.platform item_kwargs["strategy"] = self.mode try: item = VideoItem(**item_kwargs) video_dict = await item.produce_item() if not video_dict: self.logger.warning(f"{self.trace_id} 校验失败") return None return video_dict except Exception as e: self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}") return None async def push_to_etl(self, item: Dict) -> bool: """推送数据到ETL(同步)""" self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}") try: await self.mq_producer.send_msg(item) self.aliyun_logr.logging( code="1009", message="成功发送至ETL", data=item, trace_id=self.trace_id ) self.logger.info(f"{self.trace_id}--数据推送成功") return True except Exception as e: self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}") return False async def get_today_videos(self): """ 查询每天的爬虫爬取到的视频数量 :return: """ video_count = await self.db_service.get_today_videos() return video_count async def integrated_video_handling(self,video: Dict) -> Optional[Dict]: """ 视频处理 :return: """ await generate_titles(self.feishu_sheetid,video) async def run(self): """ 异步运行爬虫任务,严格按顺序执行 1. 爬取 2. 处理每条数据,字段校验 3. 过滤(重复,平台规则,标题,发布时间) 4. 标题处理 5. 推送到ETL """ try: total_success,total_failed= 0,0 loop_start_time = time.time() async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session: for loop_index in range(1, self.loop_times + 1): # 判断当日视频数量已达到最大量 if not await self.is_video_count_sufficient(): return self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求") # 请求视频列表 video_list = await self.crawl_data(session) if not video_list: self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环") await self._wait_for_next_loop(loop_index) continue success_count = 0 fail_count = 0 for video in video_list: # 提取视频字段映射关系 video_obj = await self.process_video(video) if not video_obj: self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过") fail_count += 1 continue # 视频过滤规则 if not await self.filter_data(video_obj): self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过") continue # 视频处理 await self.integrated_video_handling(video_obj) if await self.push_to_etl(video_obj): success_count += 1 else: fail_count += 1 total_success += success_count total_failed += fail_count loop_duration = time.time() - loop_start_time self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. " f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒") await self._wait_for_next_loop(loop_index) # 全局指标日志 self.aliyun_logr.logging( code="1003", message="爬虫执行指标汇总", data={ "trace_id": self.trace_id, "classname": self.platform, "success_count": total_success, "fail_count": total_failed }, trace_id=self.trace_id ) self.logger.info( f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}") return True except Exception as e: self.logger.error(f"爬虫致命错误: {e}") raise async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下一次循环请求""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval)