123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- import asyncio
- import aiohttp
- from abc import ABC
- from typing import List, Dict, Optional
- import time
- from application.config.common.log.logger_manager import LoggerManager
- from utils.extractors import safe_extract
- from application.config.common import MQ
- from utils.config_loader import ConfigLoader # 新增导入
- class BaseSpider(ABC):
- """
- 通用爬虫基类:支持严格顺序执行流程
- """
- MAX_RETRIES = 3 # 单个请求最大重试次数
- TIMEOUT = 30 # 请求超时时间(秒)
- def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- self.trace_id = trace_id
- self.env = env
- self.user_list = user_list
- self.rule_dict = rule_dict
- self.class_name = self.__class__.__name__ # 获取子类类名
- # 根据类名自动获取配置
- self.platform_config = ConfigLoader.get_config_by_class_name(self.class_name)
- if not self.platform_config:
- raise ValueError(f"找不到对应配置: {self.class_name}")
- # 初始化日志和MQ
- self.platform = self.platform_config.get("platform")
- self.mode = self.platform_config.get("mode")
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
- self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
- # 请求配置
- self.method = self.platform_config.get("method", "GET").upper()
- self.url = self.platform_config.get("url")
- self.headers = self.platform_config.get("headers", {})
- self.body = self.platform_config.get("request_body", {})
- self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
- self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
- self.video_fields_map = self.platform_config.get("video_fields_map", {})
- # 流程控制配置
- self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
- self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
- self.logger.info(
- f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
- async def _send_async_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
- """
- 发送异步HTTP请求,支持重试机制
- """
- retries = 0
- timeout = aiohttp.ClientTimeout(total=self.TIMEOUT)
- while retries < self.MAX_RETRIES:
- try:
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.request(method, url, **kwargs) as response:
- response.raise_for_status()
- return response
- except Exception as e:
- retries += 1
- remaining_attempts = self.MAX_RETRIES - retries
- if retries < self.MAX_RETRIES:
- self.logger.warning(
- f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
- f"剩余尝试次数: {remaining_attempts}"
- )
- await asyncio.sleep(1) # 异步等待
- else:
- self.aliyun_logr.logging(
- code="5001",
- message="请求失败,已达到最大重试次数",
- data={
- "url": url,
- "method": method,
- "error": str(e),
- "headers": kwargs.get("headers", {}),
- "body": kwargs.get("json", {})
- },
- trace_id=self.trace_id
- )
- self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
- raise
- async def crawl_data(self) -> Optional[List[Dict]]:
- """异步获取视频数据"""
- self.logger.info(f"{self.trace_id}--开始获取视频数据")
- try:
- response = await self._send_async_request(
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=self.body
- )
- result = await response.json()
- data = safe_extract(result, self.data_path)
- if not data:
- self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
- return []
- self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
- return data
- except Exception as e:
- self.logger.error(f"{self.trace_id}--获取视频数据失败: {e}")
- return []
- def filter_data(self, video: Dict) -> bool:
- """校验视频是否符合规则"""
- if not self.rule_dict:
- return True
- rule_duration = self.rule_dict.get("duration")
- if rule_duration:
- video_url = safe_extract(video, self.video_fields_map.get("video_url"))
- duration = self.get_video_duration(video_url)
- if not (rule_duration['min'] <= duration <= rule_duration['max']):
- return False
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if rule_videos_cnt:
- video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
- if video_count >= rule_videos_cnt.get("min", 0):
- return False
- return True
- def process_video(self, video: Dict) -> Optional[Dict]:
- """处理单条视频数据"""
- self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
- try:
- item = {}
- for field, path in self.field_map.items():
- value = safe_extract(video, path)
- if value is None:
- self.logger.warning(f"{self.trace_id}--字段提取失败: {field}")
- continue
- item[field] = value
- if not item:
- self.logger.warning(f"{self.trace_id}--视频处理结果为空")
- return None
- item.update({
- "platform": self.platform,
- "strategy": self.mode,
- "session": f"{self.platform}-{int(time.time())}"
- })
- self.logger.debug(f"{self.trace_id}--视频处理成功")
- return item
- except Exception as e:
- self.logger.error(f"{self.trace_id}--视频处理异常: {e}")
- return None
- def push_to_etl(self, item: Dict) -> bool:
- """推送数据到ETL(同步)"""
- self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('title', '无标题')}")
- try:
- self.mq.send_msg(item)
- self.aliyun_logr.logging(
- code="1002",
- message="成功发送至ETL",
- data=item,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--数据推送成功")
- return True
- except Exception as e:
- self.logger.error(f"{self.trace_id}--数据推送失败: {e}")
- return False
- async def run(self):
- """
- 异步运行爬虫任务,严格按顺序执行
- 1. 爬取
- 2. 过滤
- 3. 处理每条数据
- 4. 推送到ETL
- """
- self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
- for loop_index in range(1, self.loop_times + 1):
- self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
- loop_start_time = time.time()
- # 步骤1: 获取视频数据(失败则跳过当前循环)
- video_list = await self.crawl_data()
- if not video_list:
- self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
- await self._wait_for_next_loop(loop_index)
- continue
- # 步骤2: 处理每条视频并推送到ETL
- success_count = 0
- fail_count = 0
- for video in video_list:
- # 步骤2.1: 校验视频(失败则跳过)
- if not self.filter_data(video):
- self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
- continue
- # 步骤2.2: 处理视频(失败则记录并继续)
- item = self.process_video(video)
- if not item:
- self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
- fail_count += 1
- continue
- # 步骤2.3: 推送到ETL(失败则记录并继续)
- if self.push_to_etl(item):
- success_count += 1
- else:
- fail_count += 1
- loop_duration = time.time() - loop_start_time
- self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
- f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
- # 等待下一次循环
- await self._wait_for_next_loop(loop_index)
- self.logger.info(f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成")
- return True
- async def _wait_for_next_loop(self, current_loop: int) -> None:
- """等待下一次循环请求"""
- if current_loop < self.loop_times and self.loop_interval > 0:
- self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
- await asyncio.sleep(self.loop_interval)
|