import asyncio import json import random import time import traceback import uuid from abc import ABC from typing import List, Dict, Optional, Any import aiohttp from core.models.video_item import VideoItem from core.utils.helpers import generate_titles from core.utils.spider_config import SpiderConfig from core.utils.extractors import safe_extract, extract_fields from core.utils.log.logger_manager import LoggerManager from core.utils.template_resolver import resolve_request_body_template from services.async_mysql_service import AsyncMysqlService from services.pipeline import PiaoQuanPipeline from core.base.async_request_client import AsyncRequestClient from services.async_mq_producer import AsyncMQProducer class BaseSpider(ABC): """ 通用爬虫基类:支持严格顺序执行流程 """ MAX_RETRIES = 3 # 单个请求最大重试次数 TIMEOUT = 30 # 请求超时时间(秒) def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): self.trace_id = trace_id self.env = env self.user_list = user_list self.rule_dict = rule_dict self.class_name = self.__class__.__name__ # 获取子类类名 # 根据类名自动获取配置 self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower())) if not self.platform_config: raise ValueError(f"找不到对应配置: {self.class_name}") # 平台信息与日志初始化 self.platform = self.platform_config.platform self.mode = self.platform_config.mode self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) # MQ用于推送至ETL self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode) # 请求配置 self.method = self.platform_config.method.upper() self.url = self.platform_config.url self.headers = self.platform_config.headers self.request_body = self.platform_config.request_body # 响应解析配置 self.response =self.platform_config.response_parse self.field_map = self.response.get("fields", {}) self.data_path = self.response.get("data_path") self.next_cursor_path = self.response.get("next_cursor") self.response_data = self.response.get("data") # 流程控制配置 self.loop_times = self.platform_config.loop_times # 循环次数 self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒) # 数据库与请求客户端 self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr) self.feishu_sheetid = self.platform_config.feishu_sheetid self.timeout = 30 self.max_retries = 3 self.resolved_body = resolve_request_body_template(self.request_body) async def run(self): """ 爬虫入口,执行完整循环(抓取、处理、推送) """ await self.before_run() total_success, total_failed = 0, 0 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: for loop_index in range(1, self.loop_times + 1): # 判断是否已达今日抓取上限 if not await self.is_video_count_sufficient(): return success_count, fail_count = await self.run_single_loop(session) total_success += success_count total_failed += fail_count if loop_index < self.loop_times: await asyncio.sleep(self.loop_interval) await self.after_run() self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_failed}") async def run_single_loop(self, session) -> (int, int): """ 单次抓取循环,抓取视频列表并处理推送 """ success_count, fail_count = 0, 0 video_list = await self.crawl_data(session) if not video_list: self.logger.info(f"{self.trace_id} 未获取到视频") return success_count, fail_count for video in video_list: result = await self.process_and_push_video(video) if result: success_count += 1 else: fail_count += 1 return success_count, fail_count async def process_and_push_video(self, video: Dict[str, Any]) -> bool: """ 单条视频处理流程:字段提取 -> 校验过滤 -> 标题处理 -> 推送ETL """ try: video_obj = await self.process_video(video) if not video_obj: return False if not await self.filter_data(video_obj): return False await self.integrated_video_handling(video_obj) return await self.push_to_etl(video_obj) except Exception as e: self.logger.exception(f"{self.trace_id} 视频处理异常 {e}") return False async def crawl_data(self,session) -> Optional[List[Dict]]: """ 抓取数据,自动重试,自动分页 :param session: :param dynamic_variables: :return: """ response = await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json=self.resolved_body ) print(safe_extract(response, self.response_data)) self.resolved_body = resolve_request_body_template(self.request_body,safe_extract(response, self.response_data) ) data = safe_extract(response, self.data_path) return data if data else [] async def filter_data(self, video: Dict) -> bool: """校验视频是否符合规则""" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def is_video_count_sufficient(self) -> bool: """ 校验视频是否达到当日最大量 :return:True False """ rule_videos_cnt = self.rule_dict.get("videos_cnt") if not rule_videos_cnt: return True async with AsyncMysqlService(self.platform, self.mode) as mysql: video_count = await mysql.get_today_videos() if video_count >= rule_videos_cnt.get("min", 200): self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}") self.aliyun_logr.logging( code="1011", message=f"视频数量达到当日最大值", data=f"<今日视频数量>{video_count}" ) return False self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}") return True async def process_video(self, video: Dict) -> Optional[Dict]: """ 处理单条视频数据,字段映射关系,必要字段检验 :param video: :return: """ self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}") publish_user = random.choice(self.user_list) item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr) item_kwargs["user_id"] = publish_user["uid"] item_kwargs["user_name"] = publish_user["nick_name"] item_kwargs["platform"] = self.platform item_kwargs["strategy"] = self.mode try: item = VideoItem(**item_kwargs) video_dict = await item.produce_item() if not video_dict: self.logger.warning(f"{self.trace_id} 校验失败") return None return video_dict except Exception as e: self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}") return None async def push_to_etl(self, video: Dict[str, Any]) -> bool: """ 推送处理完毕的视频到 ETL """ try: await self.mq_producer.send_msg(video) return True except Exception as e: self.logger.exception(f"{self.trace_id} 推送ETL失败 {e}") return False async def integrated_video_handling(self,video: Dict) -> Optional[Dict]: """ 视频处理 :return: """ await generate_titles(self.feishu_sheetid,video) async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下一次循环请求""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval) async def before_run(self): """ 可覆写钩子:在运行前执行,如拉取Token等 """ pass async def after_run(self): """ 可覆写钩子:在运行后执行,如统计汇报等 """ pass