123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- # spiders/basespider.py
- import asyncio
- import random
- import traceback
- from typing import List, Dict, Any, Optional
- from abc import ABC, abstractmethod
- import aiohttp
- from core.utils.request_preparer import RequestPreparer
- from core.utils.spider_config import SpiderConfig
- from core.utils.log.logger_manager import LoggerManager
- from core.video_processor import VideoProcessor
- from services.async_mysql_service import AsyncMysqlService
- from services.async_mq_producer import AsyncMQProducer
- from core.base.async_request_client import AsyncRequestClient
- class BaseSpider(ABC):
- """爬虫基类 - 简化版本,不包含循环逻辑"""
- def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
- request_client: AsyncRequestClient = None,
- db_service: AsyncMysqlService = None,
- mq_producer: AsyncMQProducer = None):
- # 基础属性
- self.rule_dict = rule_dict
- self.user_list = user_list
- self.env = env
- # 服务依赖
- self.request_client = request_client
- self.db_service = db_service
- self.mq_producer = mq_producer
- # 通过类名获取配置
- class_name = self.__class__.__name__.lower()
- self.config = SpiderConfig.get_platform_config(class_name)
- self._setup_from_config()
- # 日志服务
- self.logger = LoggerManager.get_logger(
- platform=self.config.platform,
- mode=self.config.mode
- )
- self.aliyun_log = LoggerManager.get_aliyun_logger(
- platform=self.config.platform,
- mode=self.config.mode
- )
- # 请求准备器
- self.request_preparer = RequestPreparer(
- response_parse_config=self.config.response_parse,
- logger=self.logger,
- aliyun_log=self.aliyun_log
- )
- # 状态跟踪
- self.stats = {
- 'success': 0,
- 'fail': 0,
- 'start_time': None,
- 'end_time': None
- }
- # 如果没有传入服务,则创建默认实例
- self._setup_default_services()
- # 初始化视频处理器
- self.video_processor = VideoProcessor(
- platform=self.platform,
- mode=self.mode,
- field_map=self.field_map,
- feishu_sheetid=self.feishu_sheetid,
- logger=self.logger,
- aliyun_log=self.aliyun_log
- )
- def _setup_default_services(self):
- """设置默认服务实例"""
- if not self.request_client:
- self.request_client = AsyncRequestClient(
- logger=self.logger,
- aliyun_log=self.aliyun_log
- )
- if not self.db_service:
- self.db_service = AsyncMysqlService(
- platform=self.config.platform,
- mode=self.config.mode
- )
- if not self.mq_producer:
- self.mq_producer = AsyncMQProducer(
- topic_name="topic_crawler_etl_prod_v2",
- platform=self.config.platform,
- mode=self.config.mode
- )
- def _setup_from_config(self):
- """从配置中设置属性"""
- self.platform = self.config.platform
- self.mode = self.config.mode
- self.url = self.config.url
- self.method = self.config.method.upper()
- self.headers = self.config.headers or {}
- self.request_body_template = self.config.request_body or {}
- self.loop_times = self.config.loop_times or 100
- self.timeout = self.config.request_timeout or 30
- self.feishu_sheetid = self.config.feishu_sheetid
- # 响应解析配置
- response_parse = self.config.response_parse or {}
- self.data_path = response_parse.get("data_path")
- self.has_more = response_parse.get("has_more")
- self.next_cursor = response_parse.get("next_cursor")
- self.field_map = response_parse.get("fields", {})
- async def run(self):
- """主运行流程 - 子类实现完整循环逻辑"""
- self.stats['start_time'] = asyncio.get_event_loop().time()
- self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
- try:
- await self.before_run()
- await self.execute() # 子类实现完整的执行逻辑
- except Exception as e:
- self.logger.error(f"爬虫执行异常: {e}\n{traceback.format_exc()}")
- finally:
- await self.after_run()
- self._log_final_stats()
- @abstractmethod
- async def execute(self):
- """执行核心逻辑 - 子类必须实现完整循环逻辑"""
- pass
- async def process_data(self, data: List[Dict]):
- """处理数据"""
- success_count = 0
- for item in data:
- self.aliyun_log.logging(
- code="1001",
- message=f"获取到一条数据",
- data=item
- )
- try:
- if await self.process_single_item(item):
- success_count += 1
- self.stats['success'] += 1
- else:
- self.stats['fail'] += 1
- except Exception as e:
- self.logger.error(f"处理单条数据失败: {e}")
- self.stats['fail'] += 1
- self.logger.info(f"批次处理完成: 成功 {success_count}/{len(data)}")
- return success_count
- async def process_single_item(self, item: Dict) -> bool:
- """处理单条数据"""
- # 1. 补充详情
- detail_item = await self.fetch_detail(item)
- # 2. 使用视频处理器进行完整处理
- video_obj = await self.video_processor.process_single_video(
- raw_data=detail_item,
- user_info=await self.select_publish_user(),
- rule_dict=self.rule_dict,
- env=self.env
- )
- if not video_obj:
- return False
- # 3. 推送数据
- return await self.push_video(video_obj)
- async def select_publish_user(self) -> Dict:
- """选择发布用户"""
- if self.user_list:
- return random.choice(self.user_list)
- return {}
- async def fetch_detail(self, item: Dict) -> Dict:
- """补充详情 - 子类可重写"""
- return item
- async def push_video(self, video: Dict) -> bool:
- """推送视频数据"""
- try:
- await self.mq_producer.send_msg(video)
- self.aliyun_log.logging(
- code="1002",
- message="推送ETL成功",
- data=video
- )
- return True
- except Exception as e:
- self.logger.error(f"推送视频失败: {e}")
- return False
- async def is_video_count_sufficient(self) -> bool:
- """
- 检查视频数量是否足够
- 未达到 True 反之 False
- """
- max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
- if max_count <= 0:
- return True
- current_count = await self.db_service.get_today_videos()
- if current_count >= max_count:
- self.logger.info(f"视频数量达到当日最大值: {current_count}/{max_count}")
- self.aliyun_log.logging(
- code="1011",
- message="视频数量达到最大值",
- data={
- "current_count": current_count,
- "max_count": max_count
- }
- )
- return False
- return True
- async def wait_between_iterations(self, wait_time: int = None):
- """等待间隔"""
- if wait_time is None:
- interval_config = getattr(self.config, 'loop_interval', {})
- min_time = interval_config.get('min', 1)
- max_time = interval_config.get('max', 5)
- wait_time = random.randint(min_time, max_time)
- self.logger.info(f"等待 {wait_time} 秒")
- await asyncio.sleep(wait_time)
- async def make_request(self, request_body: Dict) -> Optional[Dict]:
- """发送请求"""
- async with aiohttp.ClientSession(
- timeout=aiohttp.ClientTimeout(total=self.timeout)
- ) as session:
- return await self.request_client.request(
- session=session,
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=request_body
- )
- async def before_run(self):
- """运行前准备 - 子类可重写"""
- pass
- async def after_run(self):
- """运行后清理 - 子类可重写"""
- pass
- def _log_final_stats(self):
- """记录最终统计"""
- self.stats['end_time'] = asyncio.get_event_loop().time()
- duration = 0
- if self.stats['start_time'] is not None:
- duration = self.stats['end_time'] - self.stats['start_time']
- self.logger.info(
- f"爬虫执行完成: 成功 {self.stats['success']}, "
- f"失败 {self.stats['fail']}, 耗时 {duration:.2f}秒"
- )
|