|
@@ -1,261 +1,270 @@
|
|
|
+# spiders/basespider.py
|
|
|
import asyncio
|
|
|
import random
|
|
|
import traceback
|
|
|
-import uuid
|
|
|
-from typing import List, Dict, Optional, Any
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
|
-from core.models.video_item import VideoItem
|
|
|
-from core.utils.helpers import generate_titles
|
|
|
+import aiohttp
|
|
|
+
|
|
|
from core.utils.request_preparer import RequestPreparer
|
|
|
from core.utils.spider_config import SpiderConfig
|
|
|
-from core.utils.extractors import safe_extract, extract_fields
|
|
|
from core.utils.log.logger_manager import LoggerManager
|
|
|
+from core.video_processor import VideoProcessor
|
|
|
from services.async_mysql_service import AsyncMysqlService
|
|
|
-from services.pipeline import PiaoQuanPipeline
|
|
|
-from core.base.async_request_client import AsyncRequestClient
|
|
|
from services.async_mq_producer import AsyncMQProducer
|
|
|
+from core.base.async_request_client import AsyncRequestClient
|
|
|
|
|
|
|
|
|
class BaseSpider(ABC):
|
|
|
- """通用爬虫基类"""
|
|
|
-
|
|
|
+ """爬虫基类 - 简化版本,不包含循环逻辑"""
|
|
|
+
|
|
|
def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
|
|
|
request_client: AsyncRequestClient = None,
|
|
|
db_service: AsyncMysqlService = None,
|
|
|
mq_producer: AsyncMQProducer = None):
|
|
|
+ # 基础属性
|
|
|
self.rule_dict = rule_dict
|
|
|
self.user_list = user_list
|
|
|
self.env = env
|
|
|
- self.class_name = self.__class__.__name__.lower()
|
|
|
-
|
|
|
- # 初始化核心组件
|
|
|
- self._setup_configuration()
|
|
|
- self._setup_logging()
|
|
|
- self._setup_services(request_client, db_service, mq_producer)
|
|
|
- self._setup_state()
|
|
|
-
|
|
|
- # 通用状态
|
|
|
- self.total_success = 0
|
|
|
- self.total_fail = 0
|
|
|
- self.video = None
|
|
|
-
|
|
|
-
|
|
|
- def _setup_configuration(self):
|
|
|
- self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
|
|
|
- if not self.platform_config:
|
|
|
- raise ValueError(f"找不到爬虫配置: {self.class_name}")
|
|
|
- self.platform = self.platform_config.platform
|
|
|
- self.mode = self.platform_config.mode
|
|
|
- self.url = self.platform_config.url
|
|
|
- self.method = self.platform_config.method.upper()
|
|
|
- self.headers = self.platform_config.headers or {}
|
|
|
- self.request_body_template = self.platform_config.request_body or {}
|
|
|
- self.response_parse_config = self.platform_config.response_parse or {}
|
|
|
- self.data_path = self.response_parse_config.get("data_path")
|
|
|
- self.has_more = self.response_parse_config.get("has_more")
|
|
|
- self.field_map = self.response_parse_config.get("fields", {})
|
|
|
- self.next_cursor = self.response_parse_config.get("next_cursor") or ""
|
|
|
- self.loop_times = self.platform_config.loop_times or 100
|
|
|
- self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
|
|
|
- self.timeout = self.platform_config.request_timeout or 30
|
|
|
- self.max_retries = self.platform_config.max_retries or 3
|
|
|
- self.feishu_sheetid = self.platform_config.feishu_sheetid
|
|
|
-
|
|
|
- def _setup_logging(self):
|
|
|
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
|
|
|
- self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
|
|
|
- self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
|
|
|
-
|
|
|
- def _setup_services(self, request_client: AsyncRequestClient = None,
|
|
|
- db_service: AsyncMysqlService = None,
|
|
|
- mq_producer: AsyncMQProducer = None):
|
|
|
- """初始化服务组件"""
|
|
|
- self.request_client = request_client or AsyncRequestClient(
|
|
|
+
|
|
|
+ # 服务依赖
|
|
|
+ self.request_client = request_client
|
|
|
+ self.db_service = db_service
|
|
|
+ self.mq_producer = mq_producer
|
|
|
+
|
|
|
+ # 通过类名获取配置
|
|
|
+ class_name = self.__class__.__name__.lower()
|
|
|
+ self.config = SpiderConfig.get_platform_config(class_name)
|
|
|
+ self._setup_from_config()
|
|
|
+
|
|
|
+ # 日志服务
|
|
|
+ self.logger = LoggerManager.get_logger(
|
|
|
+ platform=self.config.platform,
|
|
|
+ mode=self.config.mode
|
|
|
+ )
|
|
|
+ self.aliyun_log = LoggerManager.get_aliyun_logger(
|
|
|
+ platform=self.config.platform,
|
|
|
+ mode=self.config.mode
|
|
|
+ )
|
|
|
+
|
|
|
+ # 请求准备器
|
|
|
+ self.request_preparer = RequestPreparer(
|
|
|
+ response_parse_config=self.config.response_parse,
|
|
|
logger=self.logger,
|
|
|
aliyun_log=self.aliyun_log
|
|
|
)
|
|
|
- self.db_service = db_service or AsyncMysqlService(
|
|
|
- platform=self.platform,
|
|
|
- mode=self.mode
|
|
|
- )
|
|
|
- self.mq_producer = mq_producer or AsyncMQProducer(
|
|
|
- topic_name="topic_crawler_etl_prod_v2",
|
|
|
+
|
|
|
+ # 状态跟踪
|
|
|
+ self.stats = {
|
|
|
+ 'success': 0,
|
|
|
+ 'fail': 0,
|
|
|
+ 'start_time': None,
|
|
|
+ 'end_time': None
|
|
|
+ }
|
|
|
+
|
|
|
+ # 如果没有传入服务,则创建默认实例
|
|
|
+ self._setup_default_services()
|
|
|
+
|
|
|
+ # 初始化视频处理器
|
|
|
+ self.video_processor = VideoProcessor(
|
|
|
platform=self.platform,
|
|
|
- mode=self.mode
|
|
|
- )
|
|
|
- def _setup_state(self):
|
|
|
- self.last_response_data = {}
|
|
|
- self.request_preparer = RequestPreparer(
|
|
|
- response_parse_config=self.response_parse_config,
|
|
|
+ mode=self.mode,
|
|
|
+ field_map=self.field_map,
|
|
|
+ feishu_sheetid=self.feishu_sheetid,
|
|
|
logger=self.logger,
|
|
|
aliyun_log=self.aliyun_log
|
|
|
)
|
|
|
|
|
|
- # 核心入口(统一流程)
|
|
|
+ def _setup_default_services(self):
|
|
|
+ """设置默认服务实例"""
|
|
|
+ if not self.request_client:
|
|
|
+ self.request_client = AsyncRequestClient(
|
|
|
+ logger=self.logger,
|
|
|
+ aliyun_log=self.aliyun_log
|
|
|
+ )
|
|
|
+
|
|
|
+ if not self.db_service:
|
|
|
+ self.db_service = AsyncMysqlService(
|
|
|
+ platform=self.config.platform,
|
|
|
+ mode=self.config.mode
|
|
|
+ )
|
|
|
+
|
|
|
+ if not self.mq_producer:
|
|
|
+ self.mq_producer = AsyncMQProducer(
|
|
|
+ topic_name="topic_crawler_etl_prod_v2",
|
|
|
+ platform=self.config.platform,
|
|
|
+ mode=self.config.mode
|
|
|
+ )
|
|
|
+
|
|
|
+ def _setup_from_config(self):
|
|
|
+ """从配置中设置属性"""
|
|
|
+ self.platform = self.config.platform
|
|
|
+ self.mode = self.config.mode
|
|
|
+ self.url = self.config.url
|
|
|
+ self.method = self.config.method.upper()
|
|
|
+ self.headers = self.config.headers or {}
|
|
|
+ self.request_body_template = self.config.request_body or {}
|
|
|
+ self.loop_times = self.config.loop_times or 100
|
|
|
+ self.timeout = self.config.request_timeout or 30
|
|
|
+ self.feishu_sheetid = self.config.feishu_sheetid
|
|
|
+
|
|
|
+ # 响应解析配置
|
|
|
+ response_parse = self.config.response_parse or {}
|
|
|
+ self.data_path = response_parse.get("data_path")
|
|
|
+ self.has_more = response_parse.get("has_more")
|
|
|
+ self.next_cursor = response_parse.get("next_cursor")
|
|
|
+ self.field_map = response_parse.get("fields", {})
|
|
|
+
|
|
|
async def run(self):
|
|
|
- """主流程:初始化→核心循环→收尾"""
|
|
|
+ """主运行流程 - 子类实现完整循环逻辑"""
|
|
|
+ self.stats['start_time'] = asyncio.get_event_loop().time()
|
|
|
self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
|
|
|
- await self.before_run()
|
|
|
+
|
|
|
try:
|
|
|
- await self.core_loop() # 子类实现具体模式逻辑
|
|
|
+ await self.before_run()
|
|
|
+ await self.execute() # 子类实现完整的执行逻辑
|
|
|
except Exception as e:
|
|
|
- tb = traceback.format_exc()
|
|
|
- self.logger.exception(f"运行异常: {e},堆栈信息:{tb}")
|
|
|
+ self.logger.error(f"爬虫执行异常: {e}\n{traceback.format_exc()}")
|
|
|
finally:
|
|
|
await self.after_run()
|
|
|
- self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}")
|
|
|
+ self._log_final_stats()
|
|
|
|
|
|
@abstractmethod
|
|
|
- async def core_loop(self):
|
|
|
- """子类必须实现:模式特有核心循环(推荐/账号)"""
|
|
|
+ async def execute(self):
|
|
|
+ """执行核心逻辑 - 子类必须实现完整循环逻辑"""
|
|
|
pass
|
|
|
|
|
|
- async def fetch_detail(self, item: Dict) -> Dict:
|
|
|
- """子类选择实现:补充详情(完全由子类控制)"""
|
|
|
- return item
|
|
|
-
|
|
|
- # 通用数据处理流程
|
|
|
- async def process_data(self, video_data: List[Dict]):
|
|
|
- """处理原始数据列表(清洗→过滤→推送)"""
|
|
|
- for item in video_data:
|
|
|
+ async def process_data(self, data: List[Dict]):
|
|
|
+ """处理数据"""
|
|
|
+ success_count = 0
|
|
|
+ for item in data:
|
|
|
+ self.aliyun_log.logging(
|
|
|
+ code="1001",
|
|
|
+ message=f"获取到一条数据",
|
|
|
+ data=item
|
|
|
+ )
|
|
|
try:
|
|
|
- # 补充详情(完全由子类实现)
|
|
|
- detail_data = await self.fetch_detail(item)
|
|
|
- # 处理并推送
|
|
|
- result = await self.process_and_push_video(detail_data)
|
|
|
- if result:
|
|
|
- self.total_success += 1
|
|
|
+ if await self.process_single_item(item):
|
|
|
+ success_count += 1
|
|
|
+ self.stats['success'] += 1
|
|
|
else:
|
|
|
- self.total_fail += 1
|
|
|
+ self.stats['fail'] += 1
|
|
|
except Exception as e:
|
|
|
- self.logger.exception(f"处理单条数据失败: {e}")
|
|
|
- self.total_fail += 1
|
|
|
+ self.logger.error(f"处理单条数据失败: {e}")
|
|
|
+ self.stats['fail'] += 1
|
|
|
+ self.logger.info(f"批次处理完成: 成功 {success_count}/{len(data)}")
|
|
|
+ return success_count
|
|
|
|
|
|
- async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
|
|
|
- try:
|
|
|
- self.video = video
|
|
|
- video_obj = await self.process_video(video)
|
|
|
- if not video_obj:
|
|
|
- return False
|
|
|
- if not await self.filter_data(video_obj):
|
|
|
- return False
|
|
|
- # video_obj = await self.integrated_video_handling(video_obj)
|
|
|
- return await self.push_to_etl(video_obj)
|
|
|
- except Exception as e:
|
|
|
- self.logger.exception(f"视频处理异常: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
- async def publish_video_user(self) -> Dict[str, Any]:
|
|
|
- """获取随机发布用户"""
|
|
|
- if self.user_list:
|
|
|
- return random.choice(self.user_list)
|
|
|
- else:
|
|
|
- self.logger.error("未获取到用户列表数据")
|
|
|
- return None
|
|
|
|
|
|
|
|
|
- async def process_video(self, video: Dict) -> Optional[Dict]:
|
|
|
- """
|
|
|
- 字段映射
|
|
|
- 统一字段抽取及 VideoItem 初始化
|
|
|
- """
|
|
|
- self.logger.info(f"处理视频数据: {video}")
|
|
|
- publish_user = await self.publish_video_user()
|
|
|
-
|
|
|
- # 检查是否成功获取到发布用户
|
|
|
- if not publish_user:
|
|
|
- self.logger.error("无法获取发布用户信息")
|
|
|
- return None
|
|
|
-
|
|
|
- item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
|
|
|
- item_kwargs.update({
|
|
|
- "user_id": publish_user.get("uid"),
|
|
|
- "user_name": publish_user.get("nick_name"),
|
|
|
- "platform": self.platform,
|
|
|
- "strategy": self.mode,
|
|
|
- })
|
|
|
- try:
|
|
|
- item = VideoItem(**item_kwargs)
|
|
|
- video_dict = await item.produce_item()
|
|
|
- if not video_dict:
|
|
|
- self.logger.warning("VideoItem 校验失败")
|
|
|
- return None
|
|
|
- return video_dict
|
|
|
- except Exception as e:
|
|
|
- self.logger.error(f"VideoItem 初始化失败: {e}")
|
|
|
- return None
|
|
|
+ async def process_single_item(self, item: Dict) -> bool:
|
|
|
+ """处理单条数据"""
|
|
|
+ # 1. 补充详情
|
|
|
+ detail_item = await self.fetch_detail(item)
|
|
|
|
|
|
- async def filter_data(self, video: Dict) -> bool:
|
|
|
- """
|
|
|
- 数据校验过滤,默认使用 PiaoQuanPipeline
|
|
|
- 子类可重写此方法实现自定义过滤
|
|
|
- """
|
|
|
- pipeline = PiaoQuanPipeline(
|
|
|
- platform=self.platform,
|
|
|
- mode=self.mode,
|
|
|
+ # 2. 使用视频处理器进行完整处理
|
|
|
+ video_obj = await self.video_processor.process_single_video(
|
|
|
+ raw_data=detail_item,
|
|
|
+ user_info=await self.select_publish_user(),
|
|
|
rule_dict=self.rule_dict,
|
|
|
- env=self.env,
|
|
|
- item=video,
|
|
|
- trace_id=self.platform + str(uuid.uuid1())
|
|
|
+ env=self.env
|
|
|
)
|
|
|
- return await pipeline.process_item()
|
|
|
|
|
|
- async def integrated_video_handling(self, video: Dict) -> None:
|
|
|
- """
|
|
|
- 钩子函数:可在此实现自动生成标题或其他业务逻辑
|
|
|
- """
|
|
|
- # 视频标题处理生成
|
|
|
- return await generate_titles(self.feishu_sheetid, video,self.logger,self.aliyun_log)
|
|
|
+ if not video_obj:
|
|
|
+ return False
|
|
|
|
|
|
- async def push_to_etl(self, video: Dict) -> bool:
|
|
|
+ # 3. 推送数据
|
|
|
+ return await self.push_video(video_obj)
|
|
|
+
|
|
|
+ async def select_publish_user(self) -> Dict:
|
|
|
+ """选择发布用户"""
|
|
|
+ if self.user_list:
|
|
|
+ return random.choice(self.user_list)
|
|
|
+ return {}
|
|
|
+
|
|
|
+ async def fetch_detail(self, item: Dict) -> Dict:
|
|
|
+ """补充详情 - 子类可重写"""
|
|
|
+ return item
|
|
|
+
|
|
|
+ async def push_video(self, video: Dict) -> bool:
|
|
|
+ """推送视频数据"""
|
|
|
try:
|
|
|
await self.mq_producer.send_msg(video)
|
|
|
- self.aliyun_log.logging(code="1009",
|
|
|
- message="推送ETL成功",
|
|
|
- data=video)
|
|
|
- self.logger.info(f"成功推送视频至ETL: {video}")
|
|
|
+ self.aliyun_log.logging(
|
|
|
+ code="1002",
|
|
|
+ message="推送ETL成功",
|
|
|
+ data=video
|
|
|
+ )
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
- self.logger.exception(f"推送ETL失败: {e}")
|
|
|
+ self.logger.error(f"推送视频失败: {e}")
|
|
|
return False
|
|
|
|
|
|
async def is_video_count_sufficient(self) -> bool:
|
|
|
"""
|
|
|
- 校验当日视频是否达到最大爬取量
|
|
|
- True未达到
|
|
|
- False达到最大量
|
|
|
- :return:True/False
|
|
|
+ 检查视频数量是否足够
|
|
|
+ 未达到 True 反之 False
|
|
|
"""
|
|
|
max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
|
|
|
if max_count <= 0:
|
|
|
- self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测")
|
|
|
return True
|
|
|
+
|
|
|
current_count = await self.db_service.get_today_videos()
|
|
|
if current_count >= max_count:
|
|
|
- self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}")
|
|
|
- self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}")
|
|
|
+ self.logger.info(f"视频数量达到当日最大值: {current_count}/{max_count}")
|
|
|
+ self.aliyun_log.logging(
|
|
|
+ code="1011",
|
|
|
+ message="视频数量达到最大值",
|
|
|
+ data={
|
|
|
+ "current_count": current_count,
|
|
|
+ "max_count": max_count
|
|
|
+ }
|
|
|
+ )
|
|
|
return False
|
|
|
- self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
|
|
|
- self.aliyun_log.logging(code="1012",
|
|
|
- message=f"目前入库量{current_count}",
|
|
|
- data=f"{current_count}/{max_count}"
|
|
|
- )
|
|
|
+
|
|
|
return True
|
|
|
|
|
|
- async def wait(self):
|
|
|
- """等待随机时间间隔"""
|
|
|
- # 确保loop_interval包含min和max键
|
|
|
- min_time = self.loop_interval.get("min", 1)
|
|
|
- max_time = self.loop_interval.get("max", 5)
|
|
|
- wait_time = random.randint(min_time, max_time)
|
|
|
- self.logger.info(f"等待 {wait_time} 秒后继续")
|
|
|
+ async def wait_between_iterations(self, wait_time: int = None):
|
|
|
+ """等待间隔"""
|
|
|
+ if wait_time is None:
|
|
|
+ interval_config = getattr(self.config, 'loop_interval', {})
|
|
|
+ min_time = interval_config.get('min', 1)
|
|
|
+ max_time = interval_config.get('max', 5)
|
|
|
+ wait_time = random.randint(min_time, max_time)
|
|
|
+
|
|
|
+ self.logger.info(f"等待 {wait_time} 秒")
|
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
|
+ async def make_request(self, request_body: Dict) -> Optional[Dict]:
|
|
|
+ """发送请求"""
|
|
|
+ async with aiohttp.ClientSession(
|
|
|
+ timeout=aiohttp.ClientTimeout(total=self.timeout)
|
|
|
+ ) as session:
|
|
|
+ return await self.request_client.request(
|
|
|
+ session=session,
|
|
|
+ method=self.method,
|
|
|
+ url=self.url,
|
|
|
+ headers=self.headers,
|
|
|
+ json=request_body
|
|
|
+ )
|
|
|
+
|
|
|
async def before_run(self):
|
|
|
- """运行前钩子(子类可重写)"""
|
|
|
+ """运行前准备 - 子类可重写"""
|
|
|
pass
|
|
|
|
|
|
async def after_run(self):
|
|
|
- """运行后钩子(子类可重写)"""
|
|
|
- pass
|
|
|
+ """运行后清理 - 子类可重写"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ def _log_final_stats(self):
|
|
|
+ """记录最终统计"""
|
|
|
+ self.stats['end_time'] = asyncio.get_event_loop().time()
|
|
|
+ duration = 0
|
|
|
+ if self.stats['start_time'] is not None:
|
|
|
+ duration = self.stats['end_time'] - self.stats['start_time']
|
|
|
+
|
|
|
+ self.logger.info(
|
|
|
+ f"爬虫执行完成: 成功 {self.stats['success']}, "
|
|
|
+ f"失败 {self.stats['fail']}, 耗时 {duration:.2f}秒"
|
|
|
+ )
|