123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- import asyncio
- import random
- import time
- import traceback
- import uuid
- from abc import ABC
- from typing import List, Dict, Optional
- import aiohttp
- from core.models.video_item import VideoItem
- from core.utils.helpers import generate_titles
- from core.utils.spider_config import SpiderConfig
- from core.utils.extractors import safe_extract, extract_fields
- from core.utils.log.logger_manager import LoggerManager
- from services.async_mysql_service import AsyncMysqlService
- from services.pipeline import PiaoQuanPipeline
- from core.base.async_request_client import AsyncRequestClient
- from services.async_mq_producer import AsyncMQProducer
- class BaseSpider(ABC):
- """
- 通用爬虫基类:支持严格顺序执行流程
- """
- MAX_RETRIES = 3 # 单个请求最大重试次数
- TIMEOUT = 30 # 请求超时时间(秒)
- def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- self.trace_id = trace_id
- self.env = env
- self.user_list = user_list
- self.rule_dict = rule_dict
- self.class_name = self.__class__.__name__ # 获取子类类名
- # 根据类名自动获取配置
- self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
- if not self.platform_config:
- raise ValueError(f"找不到对应配置: {self.class_name}")
- # 初始化日志和MQ
- self.platform = self.platform_config.platform
- self.mode = self.platform_config.mode
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
- self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
- # 请求配置
- self.method = self.platform_config.method.upper()
- self.url = self.platform_config.url
- self.headers = self.platform_config.headers
- self.body = self.platform_config.request_body
- self.response =self.platform_config.response_parse
- self.field_map = self.response.get("fields", {})
- self.data_path = self.response.get("data_path")
- # 流程控制配置
- self.loop_times = self.platform_config.loop_times # 循环次数
- self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒)
- self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
- self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
- self.logger.info(
- f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
- self.session = None
- self.feishu_sheetid = self.platform_config.feishu_sheetid
- async def crawl_data(self,session) -> Optional[List[Dict]]:
- response = await self.request_client.request(
- session=session,
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=self.body
- )
- data = safe_extract(response, self.data_path)
- return data if data else []
- async def filter_data(self, video: Dict) -> bool:
- """校验视频是否符合规则"""
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=video,
- trace_id=self.platform + str(uuid.uuid1())
- )
- return await pipeline.process_item()
- async def is_video_count_sufficient(self) -> bool:
- """
- 校验视频是否达到当日最大量
- :return:True False
- """
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if not rule_videos_cnt:
- return True
- async with AsyncMysqlService(self.platform, self.mode) as mysql:
- video_count = await mysql.get_today_videos()
- if video_count >= rule_videos_cnt.get("min", 200):
- self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
- self.aliyun_logr.logging(
- code="1011",
- message=f"视频数量达到当日最大值",
- data=f"<今日视频数量>{video_count}"
- )
- return False
- self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
- return True
- async def process_video(self, video: Dict) -> Optional[Dict]:
- """
- 处理单条视频数据,字段映射关系,必要字段检验
- :param video:
- :return:
- """
- self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
- publish_user = random.choice(self.user_list)
- item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
- item_kwargs["user_id"] = publish_user["uid"]
- item_kwargs["user_name"] = publish_user["nick_name"]
- item_kwargs["platform"] = self.platform
- item_kwargs["strategy"] = self.mode
- try:
- item = VideoItem(**item_kwargs)
- video_dict = await item.produce_item()
- if not video_dict:
- self.logger.warning(f"{self.trace_id} 校验失败")
- return None
- return video_dict
- except Exception as e:
- self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
- return None
- async def push_to_etl(self, item: Dict) -> bool:
- """推送数据到ETL(同步)"""
- self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
- try:
- await self.mq_producer.send_msg(item)
- self.aliyun_logr.logging(
- code="1009",
- message="成功发送至ETL",
- data=item,
- trace_id=self.trace_id
- )
- self.logger.info(f"{self.trace_id}--数据推送成功")
- return True
- except Exception as e:
- self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
- return False
- async def get_today_videos(self):
- """
- 查询每天的爬虫爬取到的视频数量
- :return:
- """
- video_count = await self.db_service.get_today_videos()
- return video_count
- async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
- """
- 视频处理
- :return:
- """
- await generate_titles(self.feishu_sheetid,video)
- async def run(self):
- """
- 异步运行爬虫任务,严格按顺序执行
- 1. 爬取
- 2. 处理每条数据,字段校验
- 3. 过滤(重复,平台规则,标题,发布时间)
- 4. 标题处理
- 5. 推送到ETL
- """
- try:
- total_success,total_failed= 0,0
- loop_start_time = time.time()
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
- for loop_index in range(1, self.loop_times + 1):
- # 判断当日视频数量已达到最大量
- if not await self.is_video_count_sufficient():
- return
- self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
- # 请求视频列表
- video_list = await self.crawl_data(session)
- if not video_list:
- self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
- await self._wait_for_next_loop(loop_index)
- continue
- success_count = 0
- fail_count = 0
- for video in video_list:
- # 提取视频字段映射关系
- video_obj = await self.process_video(video)
- if not video_obj:
- self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
- fail_count += 1
- continue
- # 视频过滤规则
- if not await self.filter_data(video_obj):
- self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
- continue
- # 视频处理
- await self.integrated_video_handling(video_obj)
- if await self.push_to_etl(video_obj):
- success_count += 1
- else:
- fail_count += 1
- total_success += success_count
- total_failed += fail_count
- loop_duration = time.time() - loop_start_time
- self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
- f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
- await self._wait_for_next_loop(loop_index)
- # 全局指标日志
- self.aliyun_logr.logging(
- code="1003",
- message="爬虫执行指标汇总",
- data={
- "trace_id": self.trace_id,
- "classname": self.platform,
- "success_count": total_success,
- "fail_count": total_failed
- },
- trace_id=self.trace_id
- )
- self.logger.info(
- f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
- return True
- except Exception as e:
- self.logger.error(f"爬虫致命错误: {e}")
- raise
- async def _wait_for_next_loop(self, current_loop: int) -> None:
- """等待下一次循环请求"""
- if current_loop < self.loop_times and self.loop_interval > 0:
- self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
- await asyncio.sleep(self.loop_interval)
|