|
@@ -0,0 +1,323 @@
|
|
|
+import asyncio
|
|
|
+import random
|
|
|
+import traceback
|
|
|
+import uuid
|
|
|
+
|
|
|
+import aiohttp
|
|
|
+from abc import ABC
|
|
|
+from typing import List, Dict, Optional
|
|
|
+import time
|
|
|
+from core.utils.log.logger_manager import LoggerManager
|
|
|
+from services.pipeline import PiaoQuanPipeline
|
|
|
+from core.utils.extractors import safe_extract
|
|
|
+from core.utils.config_loader import ConfigLoader
|
|
|
+from services.async_mysql_service import AsyncMysqlService
|
|
|
+from core.models.video_item import VideoItem
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class BaseSpider(ABC):
|
|
|
+ """
|
|
|
+ 通用爬虫基类:支持严格顺序执行流程
|
|
|
+ """
|
|
|
+
|
|
|
+ MAX_RETRIES = 3 # 单个请求最大重试次数
|
|
|
+ TIMEOUT = 30 # 请求超时时间(秒)
|
|
|
+
|
|
|
+ def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
|
|
|
+ self.trace_id = trace_id
|
|
|
+ self.env = env
|
|
|
+ self.user_list = user_list
|
|
|
+ self.rule_dict = rule_dict
|
|
|
+ self.class_name = self.__class__.__name__ # 获取子类类名
|
|
|
+
|
|
|
+ # 根据类名自动获取配置
|
|
|
+ self.platform_config = ConfigLoader.get_platform_config(platform=str(self.class_name.lower()))
|
|
|
+ if not self.platform_config:
|
|
|
+ raise ValueError(f"找不到对应配置: {self.class_name}")
|
|
|
+
|
|
|
+ # 初始化日志和MQ
|
|
|
+ self.platform = self.platform_config.get("platform")
|
|
|
+ self.mode = self.platform_config.get("mode")
|
|
|
+ self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
|
|
|
+
|
|
|
+ self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
|
|
|
+ self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
|
|
|
+ self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
|
|
|
+
|
|
|
+ # 请求配置
|
|
|
+ self.method = self.platform_config.get("method", "GET").upper()
|
|
|
+ self.url = self.platform_config.get("url")
|
|
|
+ self.headers = self.platform_config.get("headers", {})
|
|
|
+ self.body = self.platform_config.get("request_body", {})
|
|
|
+ self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
|
|
|
+ self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
|
|
|
+ self.video_fields_map = self.platform_config.get("video_fields_map", {})
|
|
|
+
|
|
|
+ # 流程控制配置
|
|
|
+ self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
|
|
|
+ self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
|
|
|
+
|
|
|
+ self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
|
|
|
+
|
|
|
+ self.logger.info(
|
|
|
+ f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
|
|
|
+
|
|
|
+ self.session = None
|
|
|
+
|
|
|
+ async def _send_async_request(self, session: aiohttp.ClientSession, method: str, url: str,
|
|
|
+ **kwargs) -> aiohttp.ClientResponse:
|
|
|
+ """
|
|
|
+ 使用提供的 session 发送异步HTTP请求,支持重试机制
|
|
|
+ """
|
|
|
+ retries = 0
|
|
|
+ self.logger.info(f"{self.trace_id}--请求准备: {method} {url}, 参数: {kwargs}")
|
|
|
+
|
|
|
+ while retries < self.MAX_RETRIES:
|
|
|
+ try:
|
|
|
+ async with session.request(method, url, **kwargs) as response:
|
|
|
+ response.raise_for_status()
|
|
|
+ self.logger.info(f"{self.trace_id}--请求成功: {response.status}")
|
|
|
+ return await response.json()
|
|
|
+ except Exception as e:
|
|
|
+ retries += 1
|
|
|
+ remaining_attempts = self.MAX_RETRIES - retries
|
|
|
+
|
|
|
+ if retries < self.MAX_RETRIES:
|
|
|
+ self.logger.warning(
|
|
|
+ f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
|
|
|
+ f"剩余尝试次数: {remaining_attempts}"
|
|
|
+ )
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ else:
|
|
|
+ self.aliyun_logr.logging(
|
|
|
+ code="5001",
|
|
|
+ message="请求失败,已达到最大重试次数",
|
|
|
+ data={
|
|
|
+ "url": url,
|
|
|
+ "method": method,
|
|
|
+ "error": str(e),
|
|
|
+ "headers": kwargs.get("headers", {}),
|
|
|
+ "body": kwargs.get("json", {})
|
|
|
+ },
|
|
|
+ trace_id=self.trace_id
|
|
|
+ )
|
|
|
+ self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ async def crawl_data(self) -> Optional[List[Dict]]:
|
|
|
+ """异步获取视频数据"""
|
|
|
+ self.logger.info(f"{self.trace_id}--开始获取视频数据")
|
|
|
+ try:
|
|
|
+ response = await self._send_async_request(
|
|
|
+ session=self.session,
|
|
|
+ method=self.method,
|
|
|
+ url=self.url,
|
|
|
+ headers=self.headers,
|
|
|
+ json=self.body
|
|
|
+ )
|
|
|
+ self.logger.debug(f"{self.trace_id}--响应结果: {response}")
|
|
|
+
|
|
|
+ data = safe_extract(response, self.data_path)
|
|
|
+
|
|
|
+ if not data:
|
|
|
+ self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
|
|
|
+ return data
|
|
|
+ except Exception as e:
|
|
|
+ traceback.extract_stack()
|
|
|
+ self.logger.exception(f"{self.trace_id}--获取视频数据失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ async def filter_data(self, video: Dict) -> bool:
|
|
|
+ """校验视频是否符合规则"""
|
|
|
+ pipeline = PiaoQuanPipeline(
|
|
|
+ platform=self.platform,
|
|
|
+ mode=self.mode,
|
|
|
+ rule_dict=self.rule_dict,
|
|
|
+ env=self.env,
|
|
|
+ item=video,
|
|
|
+ trace_id=self.platform + str(uuid.uuid1())
|
|
|
+ )
|
|
|
+ return await pipeline.process_item()
|
|
|
+
|
|
|
+
|
|
|
+ async def is_video_count_sufficient(self) -> bool:
|
|
|
+ """
|
|
|
+ 校验视频是否达到当日最大量
|
|
|
+ :return:True False
|
|
|
+ """
|
|
|
+ rule_videos_cnt = self.rule_dict.get("videos_cnt")
|
|
|
+ if not rule_videos_cnt:
|
|
|
+ return True
|
|
|
+ async with AsyncMysqlService(self.platform, self.mode) as mysql:
|
|
|
+ video_count = await mysql.get_today_videos()
|
|
|
+ if video_count >= rule_videos_cnt.get("min", 200):
|
|
|
+ self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}")
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ async def process_video(self, video: Dict) -> Optional[Dict]:
|
|
|
+ """
|
|
|
+ 处理单条视频数据,字段映射关系,必要字段检验
|
|
|
+ :param video:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
|
|
|
+ publish_user = random.choice(self.user_list)
|
|
|
+ try:
|
|
|
+ # 从 field_map 中动态构建 VideoItem 初始化参数
|
|
|
+ item_kwargs = {}
|
|
|
+ for field, path in self.field_map.items():
|
|
|
+ if not isinstance(path, str) or not path.startswith("$"):
|
|
|
+ item_kwargs[field] = path
|
|
|
+ continue
|
|
|
+
|
|
|
+ value = safe_extract(video, path)
|
|
|
+ if value is None:
|
|
|
+ self.logger.warning(f"{self.trace_id}--字段提取失败: {field} 路径: {path}")
|
|
|
+ continue
|
|
|
+ item_kwargs[field] = value
|
|
|
+
|
|
|
+ item_kwargs["user_id"] = publish_user["uid"]
|
|
|
+ item_kwargs["user_name"] = publish_user["nick_name"]
|
|
|
+ # 手动注入 platform 与 strategy
|
|
|
+ item_kwargs["platform"] = self.platform
|
|
|
+ item_kwargs["strategy"] = self.mode
|
|
|
+
|
|
|
+
|
|
|
+ try:
|
|
|
+ item = VideoItem(**item_kwargs)
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"{self.trace_id}--VideoItem 初始化失败: {e}, 数据: {item_kwargs}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ video_dict = await item.produce_item()
|
|
|
+ if not video_dict:
|
|
|
+ self.logger.warning(f"{self.trace_id}--VideoItem 校验失败")
|
|
|
+ return None
|
|
|
+ return video_dict
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.exception(f"{self.trace_id}--视频处理异常: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def push_to_etl(self, item: Dict) -> bool:
|
|
|
+ """推送数据到ETL(同步)"""
|
|
|
+ self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
|
|
|
+ try:
|
|
|
+ self.mq.send_msg(item)
|
|
|
+ self.aliyun_logr.logging(
|
|
|
+ code="1009",
|
|
|
+ message="成功发送至ETL",
|
|
|
+ data=item,
|
|
|
+ trace_id=self.trace_id
|
|
|
+ )
|
|
|
+ self.logger.info(f"{self.trace_id}--数据推送成功")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ async def get_today_videos(self):
|
|
|
+ """
|
|
|
+ 查询每天的爬虫爬取到的视频数量
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ video_count = self.db_service.get_today_videos()
|
|
|
+ return video_count
|
|
|
+
|
|
|
+ async def integrated_video_handling(self):
|
|
|
+ """
|
|
|
+ 视频处理
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ pass
|
|
|
+ async def run(self):
|
|
|
+ """
|
|
|
+ 异步运行爬虫任务,严格按顺序执行
|
|
|
+ 1. 爬取
|
|
|
+ 2. 处理每条数据,字段校验
|
|
|
+ 3. 过滤(重复,平台规则,标题,发布时间)
|
|
|
+ 4. 标题处理
|
|
|
+ 5. 推送到ETL
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
|
|
|
+ total_success = 0
|
|
|
+ total_failed = 0
|
|
|
+
|
|
|
+ async with aiohttp.ClientSession(
|
|
|
+ timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)
|
|
|
+ ) as session: # 上下文管理
|
|
|
+ self.session = session
|
|
|
+
|
|
|
+ for loop_index in range(1, self.loop_times + 1):
|
|
|
+ if not await self.is_video_count_sufficient():
|
|
|
+ return
|
|
|
+ self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
|
|
|
+ loop_start_time = time.time()
|
|
|
+
|
|
|
+ video_list = await self.crawl_data()
|
|
|
+ if not video_list:
|
|
|
+ self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
|
|
|
+ await self._wait_for_next_loop(loop_index)
|
|
|
+ continue
|
|
|
+
|
|
|
+ success_count = 0
|
|
|
+ fail_count = 0
|
|
|
+
|
|
|
+ for video in video_list:
|
|
|
+ video_obj = await self.process_video(video)
|
|
|
+ if not video_obj:
|
|
|
+ self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
|
|
|
+ fail_count += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not await self.filter_data(video_obj):
|
|
|
+ self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
|
|
|
+ continue
|
|
|
+
|
|
|
+ await self.integrated_video_handling()
|
|
|
+
|
|
|
+ if await self.push_to_etl(video_obj):
|
|
|
+ success_count += 1
|
|
|
+ else:
|
|
|
+ fail_count += 1
|
|
|
+
|
|
|
+ total_success += success_count
|
|
|
+ total_failed += fail_count
|
|
|
+
|
|
|
+ loop_duration = time.time() - loop_start_time
|
|
|
+ self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
|
|
|
+ f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
|
|
|
+
|
|
|
+ await self._wait_for_next_loop(loop_index)
|
|
|
+
|
|
|
+ # 全局指标日志
|
|
|
+ self.aliyun_logr.logging(
|
|
|
+ code="1003",
|
|
|
+ message="爬虫执行指标汇总",
|
|
|
+ data={
|
|
|
+ "trace_id": self.trace_id,
|
|
|
+ "platform": self.platform,
|
|
|
+ "success_count": total_success,
|
|
|
+ "fail_count": total_failed
|
|
|
+ },
|
|
|
+ trace_id=self.trace_id
|
|
|
+ )
|
|
|
+
|
|
|
+ self.logger.info(
|
|
|
+ f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"爬虫致命错误: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ async def _wait_for_next_loop(self, current_loop: int) -> None:
|
|
|
+ """等待下一次循环请求"""
|
|
|
+ if current_loop < self.loop_times and self.loop_interval > 0:
|
|
|
+ self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
|
|
|
+ await asyncio.sleep(self.loop_interval)
|