123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- import asyncio
- import random
- import traceback
- import uuid
- from typing import List, Dict, Optional, Any
- from abc import ABC, abstractmethod
- from core.models.video_item import VideoItem
- from core.utils.helpers import generate_titles
- from core.utils.request_preparer import RequestPreparer
- from core.utils.spider_config import SpiderConfig
- from core.utils.extractors import safe_extract, extract_fields
- from core.utils.log.logger_manager import LoggerManager
- from services.async_mysql_service import AsyncMysqlService
- from services.pipeline import PiaoQuanPipeline
- from core.base.async_request_client import AsyncRequestClient
- from services.async_mq_producer import AsyncMQProducer
- class BaseSpider(ABC):
- """通用爬虫基类"""
- def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
- self.rule_dict = rule_dict
- self.user_list = user_list
- self.env = env
- self.class_name = self.__class__.__name__.lower()
- print(self.class_name)
- # 初始化核心组件
- self._setup_configuration()
- self._setup_logging()
- self._setup_services()
- self._setup_state()
- # 通用状态
- self.total_success = 0
- self.total_fail = 0
- def _setup_configuration(self):
- self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
- if not self.platform_config:
- raise ValueError(f"找不到爬虫配置: {self.class_name}")
- self.platform = self.platform_config.platform
- self.mode = self.platform_config.mode
- self.url = self.platform_config.url
- self.method = self.platform_config.method.upper()
- self.headers = self.platform_config.headers or {}
- self.request_body_template = self.platform_config.request_body or {}
- self.response_parse_config = self.platform_config.response_parse or {}
- self.data_path = self.response_parse_config.get("data_path")
- self.has_more = self.response_parse_config.get("has_more")
- self.field_map = self.response_parse_config.get("fields", {})
- self.loop_times = self.platform_config.loop_times or 100
- self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
- self.timeout = self.platform_config.request_timeout or 30
- self.max_retries = self.platform_config.max_retries or 3
- self.feishu_sheetid = self.platform_config.feishu_sheetid
- def _setup_logging(self):
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
- def _setup_services(self):
- self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
- self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
- self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
- def _setup_state(self):
- self.last_response_data = {}
- self.request_preparer = RequestPreparer(
- response_parse_config=self.response_parse_config,
- logger=self.logger,
- aliyun_log=self.aliyun_log
- )
- # 核心入口(统一流程)
- async def run(self):
- """主流程:初始化→核心循环→收尾"""
- self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
- await self.before_run()
- try:
- await self.core_loop() # 子类实现具体模式逻辑
- except Exception as e:
- tb = traceback.format_exc()
- self.logger.exception(f"运行异常: {e},堆栈信息:{tb}")
- finally:
- await self.after_run()
- self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}")
- @abstractmethod
- async def core_loop(self):
- """子类必须实现:模式特有核心循环(推荐/账号)"""
- pass
- async def fetch_detail(self, item: Dict) -> Dict:
- """子类选择实现:补充详情(完全由子类控制)"""
- return item
- # 通用数据处理流程
- async def process_raw_data(self, raw_data: List[Dict]):
- """处理原始数据列表(清洗→过滤→推送)"""
- for item in raw_data:
- try:
- # 补充详情(完全由子类实现)
- detail_data = await self.fetch_detail(item)
- # 处理并推送
- result = await self.process_and_push_video(detail_data)
- if result:
- self.total_success += 1
- else:
- self.total_fail += 1
- except Exception as e:
- self.logger.exception(f"处理单条数据失败: {e}")
- self.total_fail += 1
- async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
- try:
- video_obj = await self.process_video(video)
- if not video_obj:
- return False
- if not await self.filter_data(video_obj):
- return False
- await self.integrated_video_handling(video_obj)
- return await self.push_to_etl(video_obj)
- except Exception as e:
- self.logger.exception(f"视频处理异常: {e}")
- return False
- async def process_video(self, video: Dict) -> Optional[Dict]:
- """
- 字段映射
- 统一字段抽取及 VideoItem 初始化
- """
- self.logger.info(f"处理视频数据: {video}")
- if self.user_list:
- publish_user = random.choice(self.user_list)
- else:
- self.logger.error(f"未获取到用户列表数据{self.user_list}")
- return
- item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
- item_kwargs.update({
- "user_id": publish_user.get("uid"),
- "user_name": publish_user.get("nick_name"),
- "platform": self.platform,
- "strategy": self.mode,
- })
- try:
- item = VideoItem(**item_kwargs)
- video_dict = await item.produce_item()
- if not video_dict:
- self.logger.warning(f"VideoItem 校验失败")
- return None
- return video_dict
- except Exception as e:
- self.logger.error(f"VideoItem 初始化失败: {e}")
- return None
- async def filter_data(self, video: Dict) -> bool:
- """
- 数据校验过滤,默认使用 PiaoQuanPipeline
- 子类可重写此方法实现自定义过滤
- """
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=video,
- trace_id=self.platform + str(uuid.uuid1())
- )
- return await pipeline.process_item()
- async def integrated_video_handling(self, video: Dict) -> None:
- """
- 钩子函数:可在此实现自动生成标题或其他业务逻辑
- """
- await generate_titles(self.feishu_sheetid, video)
- async def push_to_etl(self, video: Dict) -> bool:
- try:
- await self.mq_producer.send_msg(video)
- self.aliyun_log.logging(code="1009",
- message="推送ETL成功",
- data=video)
- self.logger.info(f"成功推送视频至ETL: {video}")
- return True
- except Exception as e:
- self.logger.exception(f"推送ETL失败: {e}")
- return False
- async def is_video_count_sufficient(self) -> bool:
- """
- 校验当日视频是否达到最大爬取量
- True未达到
- False达到最大量
- :return:True/False
- """
- max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
- if max_count <= 0:
- self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测")
- return True
- current_count = await self.db_service.get_today_videos()
- if current_count >= max_count:
- self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}")
- self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}")
- return False
- self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
- self.aliyun_log.logging(code="1012",
- message=f"目前入库量{current_count}",
- data=f"{current_count}/{max_count}"
- )
- return True
- async def wait(self):
- wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
- self.logger.info(f"等待 {wait_time} 秒后继续")
- await asyncio.sleep(wait_time)
- async def before_run(self):
- """运行前钩子(子类可重写)"""
- pass
- async def after_run(self):
- """运行后钩子(子类可重写)"""
- pass
|