import asyncio import uuid from typing import List, Dict, Optional, Any import aiohttp from core.models.video_item import VideoItem from core.utils.helpers import generate_titles from core.utils.spider_config import SpiderConfig from core.utils.extractors import safe_extract, extract_fields from core.utils.log.logger_manager import LoggerManager from core.utils.template_resolver import resolve_request_body_template from services.async_mysql_service import AsyncMysqlService from services.pipeline import PiaoQuanPipeline from core.base.async_request_client import AsyncRequestClient from services.async_mq_producer import AsyncMQProducer class BaseSpider: """ 通用爬虫基类,支持: - 依赖请求参数动态替换(cursor 或其它参数) - 支持单请求和依赖请求的分页抓取 - 统一日志、MQ推送、异常捕获、异步请求 子类只需根据业务重写少量方法,如 process_video/process_item。 """ def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"): self.trace_id = trace_id self.env = env self.user_list = user_list self.rule_dict = rule_dict self.class_name = self.__class__.__name__.lower() # 读取配置 self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name) if not self.platform_config: raise ValueError(f"找不到对应配置: {self.class_name}") self.platform = self.platform_config.platform self.mode = self.platform_config.mode self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode) self.method = self.platform_config.method.upper() self.url = self.platform_config.url self.headers = self.platform_config.headers or {} self.request_body_template = self.platform_config.request_body or {} self.response_parse = self.platform_config.response_parse or {} self.next_cursor_path = self.response_parse.get("next_cursor") self.data_path = self.response_parse.get("data_path") self.field_map = self.response_parse.get("fields", {}) self.loop_times = self.platform_config.loop_times or 100 self.loop_interval = self.platform_config.loop_interval or 5 self.feishu_sheetid = self.platform_config.feishu_sheetid self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_logr) self.timeout = 30 self.max_retries = 3 # 当前分页游标,默认空字符串,支持动态替换request_body中任何字段(如cursor) self.dynamic_params = {key: "" for key in self.request_body_template.keys()} # 允许子类重写,支持多游标等复杂情况 self.current_cursor = "" self.download_cnt = 0 self.limit_flag = False async def run(self): """ 爬虫主流程 """ await self.before_run() total_success, total_fail = 0, 0 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: for loop_index in range(self.loop_times): if self.limit_flag: self.logger.info(f"{self.trace_id} 已达到抓取限制,停止爬虫") break if not await self.is_video_count_sufficient(): self.logger.info(f"{self.trace_id} 视频抓取数量已达上限,提前结束") break succ, fail = await self.run_single_loop(session) total_success += succ total_fail += fail await self._wait_for_next_loop(loop_index + 1) await self.after_run() self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_fail}") async def run_single_loop(self, session) -> (int, int): """ 单轮请求与处理 """ success_count, fail_count = 0, 0 try: videos = await self.crawl_data(session) if not videos: self.logger.info(f"{self.trace_id} 无数据返回,停止本轮") return success_count, fail_count for video in videos: # 依赖接口请求 video_obj = await self.fetch_dependent_data(video) res = await self.process_and_push_video(video_obj) if res: success_count += 1 else: fail_count += 1 if self.limit_flag: break except Exception as e: self.logger.exception(f"{self.trace_id} 运行异常: {e}") return success_count, fail_count async def fetch_dependent_data(self, video: Dict) -> Dict: """ 可在子类重写以实现依赖请求,用返回结果补充原有 video。 默认不做处理。 """ return video async def crawl_data(self, session) -> Optional[List[Dict]]: """ 请求接口,自动渲染动态参数,自动更新游标 支持单请求和多请求(分页)逻辑。 """ # 动态渲染请求体 # resolved_body = self._render_request_body() # 发送请求 response = await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json= self.dynamic_params ) if not response: self.logger.error(f"{self.trace_id} 响应为空") return [] # 更新游标(支持动态参数更新) if self.next_cursor_path: next_cursor = safe_extract(response, self.next_cursor_path) or "" self._update_cursor(next_cursor) # 解析数据列表 data_list = safe_extract(response, self.data_path) if not data_list: self.logger.info(f"{self.trace_id} 未获取到有效数据") return [] return data_list def _render_request_body(self) -> Dict: """ 用当前动态参数渲染请求体模板,支持多参数动态替换 """ body = {} for k, v in self.request_body_template.items(): if isinstance(v, str) and v.startswith("{{") and v.endswith("}}"): key = v.strip("{} ") body[k] = self.dynamic_params.get(key, "") else: body[k] = v return body def _update_cursor(self, cursor_value: str): """ 更新分页游标并动态参数,方便下一次请求使用 """ self.current_cursor = cursor_value # 如果配置的游标字段在请求体中,更新动态参数 if "cursor" in self.dynamic_params: self.dynamic_params["cursor"] = cursor_value async def process_and_push_video(self, video: Dict[str, Any]) -> bool: """ 数据处理完整流程(字段映射 -> 校验 -> 推送) 子类可重写 process_video 或 filter_data 来定制处理和校验逻辑 """ try: video_obj = await self.process_video(video) if not video_obj: return False if not await self.filter_data(video_obj): return False await self.integrated_video_handling(video_obj) pushed = await self.push_to_etl(video_obj) # 达到下载上限,停止继续抓取 if self.rule_dict.get("videos_cnt", {}).get("min") and \ self.download_cnt >= self.rule_dict["videos_cnt"]["min"]: self.limit_flag = True if pushed: self.download_cnt += 1 return pushed except Exception as e: self.logger.exception(f"{self.trace_id} 视频处理异常: {e}") return False async def process_video(self, video: Dict) -> Optional[Dict]: """ 统一字段抽取及 VideoItem 初始化 子类可重写或扩展以定制字段映射、过滤等 """ self.logger.debug(f"{self.trace_id} 处理视频数据: {video.get('title', '无标题')}") publish_user = None if self.user_list: import random publish_user = random.choice(self.user_list) else: publish_user = {"uid": "default", "nick_name": "default_user"} item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id, aliyun_log=self.aliyun_logr) item_kwargs.update({ "user_id": publish_user.get("uid"), "user_name": publish_user.get("nick_name"), "platform": self.platform, "strategy": self.mode }) try: item = VideoItem(**item_kwargs) video_dict = await item.produce_item() if not video_dict: self.logger.warning(f"{self.trace_id} VideoItem 校验失败") return None return video_dict except Exception as e: self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}") return None async def filter_data(self, video: Dict) -> bool: """ 数据校验过滤,默认使用 PiaoQuanPipeline 子类可重写此方法实现自定义过滤 """ pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video, trace_id=self.platform + str(uuid.uuid1()) ) return await pipeline.process_item() async def integrated_video_handling(self, video: Dict) -> None: """ 钩子函数:可在此实现自动生成标题或其他业务逻辑 """ await generate_titles(self.feishu_sheetid, video) async def push_to_etl(self, video: Dict) -> bool: """ 推送消息到消息队列ETL """ try: await self.mq_producer.send_msg(video) self.logger.info(f"{self.trace_id} 成功推送视频至ETL") return True except Exception as e: self.logger.exception(f"{self.trace_id} 推送ETL失败: {e}") return False async def is_video_count_sufficient(self) -> bool: """ 判断当天抓取的视频是否已达到上限,达到则停止继续抓取 """ max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0) if max_count <= 0: return True async with AsyncMysqlService(self.platform, self.mode) as mysql: current_count = await mysql.get_today_videos() if current_count >= max_count: self.logger.info(f"{self.trace_id} 今日视频已达上限: {current_count}") self.aliyun_logr.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}") return False return True async def _wait_for_next_loop(self, current_loop: int) -> None: """等待下次循环""" if current_loop < self.loop_times and self.loop_interval > 0: self.logger.info(f"{self.trace_id} 等待 {self.loop_interval} 秒后进行下一次请求") await asyncio.sleep(self.loop_interval) async def before_run(self): """运行前预处理钩子,子类可覆盖""" pass async def after_run(self): """运行后处理钩子,子类可覆盖""" pass