123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- import asyncio
- import random
- import traceback
- import uuid
- import aiohttp
- from abc import ABC
- from typing import List, Dict, Optional
- import time
- from core.utils.log.logger_manager import LoggerManager
- from services.pipeline import PiaoQuanPipeline
- from core.utils.extractors import safe_extract
- from core.utils.config_loader import ConfigLoader
- from services.async_mysql_service import AsyncMysqlService
- from core.models.video_item import VideoItem
- class BaseSpider(ABC):
- """
- 通用爬虫基类:支持严格顺序执行流程
- """
- MAX_RETRIES = 3 # 单个请求最大重试次数
- TIMEOUT = 30 # 请求超时时间(秒)
- def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- self.trace_id = trace_id
- self.env = env
- self.user_list = user_list
- self.rule_dict = rule_dict
- self.class_name = self.__class__.__name__ # 获取子类类名
- # 根据类名自动获取配置
- self.platform_config = ConfigLoader.get_platform_config(platform=str(self.class_name.lower()))
- if not self.platform_config:
- raise ValueError(f"找不到对应配置: {self.class_name}")
- # 初始化日志和MQ
- self.platform = self.platform_config.get("platform")
- self.mode = self.platform_config.get("mode")
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
- self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
- # 请求配置
- self.method = self.platform_config.get("method", "GET").upper()
- self.url = self.platform_config.get("url")
- self.headers = self.platform_config.get("headers", {})
- self.body = self.platform_config.get("request_body", {})
- self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
- self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
- self.video_fields_map = self.platform_config.get("video_fields_map", {})
- # 流程控制配置
- self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
- self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
- self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
- self.logger.info(
- f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
- self.session = None
- async def _send_async_request(self, session: aiohttp.ClientSession, method: str, url: str,
- **kwargs) -> aiohttp.ClientResponse:
- """
- 使用提供的 session 发送异步HTTP请求,支持重试机制
- """
- retries = 0
- self.logger.info(f"{self.trace_id}--请求准备: {method} {url}, 参数: {kwargs}")
- while retries < self.MAX_RETRIES:
- try:
- async with session.request(method, url, **kwargs) as response:
- response.raise_for_status()
- self.logger.info(f"{self.trace_id}--请求成功: {response.status}")
- return await response.json()
- except Exception as e:
- retries += 1
- remaining_attempts = self.MAX_RETRIES - retries
- if retries < self.MAX_RETRIES:
- self.logger.warning(
- f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
- f"剩余尝试次数: {remaining_attempts}"
- )
- await asyncio.sleep(1)
- else:
- self.aliyun_logr.logging(
- code="5001",
- message="请求失败,已达到最大重试次数",
- data={
- "url": url,
- "method": method,
- "error": str(e),
- "headers": kwargs.get("headers", {}),
- "body": kwargs.get("json", {})
- },
- trace_id=self.trace_id
- )
- self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
- raise
- async def crawl_data(self) -> Optional[List[Dict]]:
- """异步获取视频数据"""
- self.logger.info(f"{self.trace_id}--开始获取视频数据")
- try:
- response = await self._send_async_request(
- session=self.session,
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=self.body
- )
- self.logger.debug(f"{self.trace_id}--响应结果: {response}")
- data = safe_extract(response, self.data_path)
- if not data:
- self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
- return []
- self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
- return data
- except Exception as e:
- traceback.extract_stack()
- self.logger.exception(f"{self.trace_id}--获取视频数据失败: {e}")
- return []
- async def filter_data(self, video: Dict) -> bool:
- """校验视频是否符合规则"""
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=video,
- trace_id=self.platform + str(uuid.uuid1())
- )
- return await pipeline.process_item()
- async def is_video_count_sufficient(self) -> bool:
- """
- 校验视频是否达到当日最大量
- :return:True False
- """
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if not rule_videos_cnt:
- return True
- async with AsyncMysqlService(self.platform, self.mode) as mysql:
- video_count = await mysql.get_today_videos()
- if video_count >= rule_videos_cnt.get("min", 200):
- self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}")
- return False
- return True
- async def process_video(self, video: Dict) -> Optional[Dict]:
- """
- 处理单条视频数据,字段映射关系,必要字段检验
- :param video:
- :return:
- """
- self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
- publish_user = random.choice(self.user_list)
- try:
- # 从 field_map 中动态构建 VideoItem 初始化参数
- item_kwargs = {}
- for field, path in self.field_map.items():
- if not isinstance(path, str) or not path.startswith("$"):
- item_kwargs[field] = path
- continue
- value = safe_extract(video, path)
- if value is None:
- self.logger.warning(f"{self.trace_id}--字段提取失败: {field} 路径: {path}")
- continue
- item_kwargs[field] = value
- item_kwargs["user_id"] = publish_user["uid"]
- item_kwargs["user_name"] = publish_user["nick_name"]
- # 手动注入 platform 与 strategy
- item_kwargs["platform"] = self.platform
- item_kwargs["strategy"] = self.mode
- try:
- item = VideoItem(**item_kwargs)
- except Exception as e:
- self.logger.warning(f"{self.trace_id}--VideoItem 初始化失败: {e}, 数据: {item_kwargs}")
- return None
- video_dict = await item.produce_item()
- if not video_dict:
- self.logger.warning(f"{self.trace_id}--VideoItem 校验失败")
- return None
- return video_dict
- except Exception as e:
- self.logger.exception(f"{self.trace_id}--视频处理异常: {e}")
- return None
- async def push_to_etl(self, item: Dict) -> bool:
- """推送数据到ETL(同步)"""
- self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
- try:
- self.mq.send_msg(item)
- self.aliyun_logr.logging(
- code="1009",
- message="成功发送至ETL",
- data=item,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--数据推送成功")
- return True
- except Exception as e:
- self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
- return False
- async def get_today_videos(self):
- """
- 查询每天的爬虫爬取到的视频数量
- :return:
- """
- video_count = self.db_service.get_today_videos()
- return video_count
- async def integrated_video_handling(self):
- """
- 视频处理
- :return:
- """
- pass
- async def run(self):
- """
- 异步运行爬虫任务,严格按顺序执行
- 1. 爬取
- 2. 处理每条数据,字段校验
- 3. 过滤(重复,平台规则,标题,发布时间)
- 4. 标题处理
- 5. 推送到ETL
- """
- try:
- self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
- total_success = 0
- total_failed = 0
- async with aiohttp.ClientSession(
- timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)
- ) as session: # 上下文管理
- self.session = session
- for loop_index in range(1, self.loop_times + 1):
- if not await self.is_video_count_sufficient():
- return
- self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
- loop_start_time = time.time()
- video_list = await self.crawl_data()
- if not video_list:
- self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
- await self._wait_for_next_loop(loop_index)
- continue
- success_count = 0
- fail_count = 0
- for video in video_list:
- video_obj = await self.process_video(video)
- if not video_obj:
- self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
- fail_count += 1
- continue
- if not await self.filter_data(video_obj):
- self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
- continue
- await self.integrated_video_handling()
- if await self.push_to_etl(video_obj):
- success_count += 1
- else:
- fail_count += 1
- total_success += success_count
- total_failed += fail_count
- loop_duration = time.time() - loop_start_time
- self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
- f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
- await self._wait_for_next_loop(loop_index)
- # 全局指标日志
- self.aliyun_logr.logging(
- code="1003",
- message="爬虫执行指标汇总",
- data={
- "trace_id": self.trace_id,
- "platform": self.platform,
- "success_count": total_success,
- "fail_count": total_failed
- },
- trace_id=self.trace_id
- )
- self.logger.info(
- f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
- return True
- except Exception as e:
- self.logger.error(f"爬虫致命错误: {e}")
- raise
- async def _wait_for_next_loop(self, current_loop: int) -> None:
- """等待下一次循环请求"""
- if current_loop < self.loop_times and self.loop_interval > 0:
- self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
- await asyncio.sleep(self.loop_interval)
|