123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import asyncio
- import json
- import random
- import time
- import traceback
- import uuid
- from abc import ABC
- from typing import List, Dict, Optional, Any
- import aiohttp
- from core.models.video_item import VideoItem
- from core.utils.helpers import generate_titles
- from core.utils.spider_config import SpiderConfig
- from core.utils.extractors import safe_extract, extract_fields
- from core.utils.log.logger_manager import LoggerManager
- from core.utils.template_resolver import resolve_request_body_template
- from services.async_mysql_service import AsyncMysqlService
- from services.pipeline import PiaoQuanPipeline
- from core.base.async_request_client import AsyncRequestClient
- from services.async_mq_producer import AsyncMQProducer
- class BaseSpider(ABC):
- """
- 通用爬虫基类:支持严格顺序执行流程
- """
- MAX_RETRIES = 3 # 单个请求最大重试次数
- TIMEOUT = 30 # 请求超时时间(秒)
- def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- self.trace_id = trace_id
- self.env = env
- self.user_list = user_list
- self.rule_dict = rule_dict
- self.class_name = self.__class__.__name__ # 获取子类类名
- # 根据类名自动获取配置
- self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
- if not self.platform_config:
- raise ValueError(f"找不到对应配置: {self.class_name}")
- # 平台信息与日志初始化
- self.platform = self.platform_config.platform
- self.mode = self.platform_config.mode
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- # MQ用于推送至ETL
- self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
- # 请求配置
- self.method = self.platform_config.method.upper()
- self.url = self.platform_config.url
- self.headers = self.platform_config.headers
- self.request_body = self.platform_config.request_body
- # 响应解析配置
- self.response =self.platform_config.response_parse
- self.field_map = self.response.get("fields", {})
- self.data_path = self.response.get("data_path")
- self.next_cursor_path = self.response.get("next_cursor")
- self.response_data = self.response.get("data")
- # 流程控制配置
- self.loop_times = self.platform_config.loop_times # 循环次数
- self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒)
- # 数据库与请求客户端
- self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
- self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
- self.feishu_sheetid = self.platform_config.feishu_sheetid
- self.timeout = 30
- self.max_retries = 3
- self.resolved_body = resolve_request_body_template(self.request_body)
- async def run(self):
- """
- 爬虫入口,执行完整循环(抓取、处理、推送)
- """
- await self.before_run()
- total_success, total_failed = 0, 0
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
- for loop_index in range(1, self.loop_times + 1):
- # 判断是否已达今日抓取上限
- if not await self.is_video_count_sufficient():
- return
- success_count, fail_count = await self.run_single_loop(session)
- total_success += success_count
- total_failed += fail_count
- if loop_index < self.loop_times:
- await asyncio.sleep(self.loop_interval)
- await self.after_run()
- self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_failed}")
- async def run_single_loop(self, session) -> (int, int):
- """
- 单次抓取循环,抓取视频列表并处理推送
- """
- success_count, fail_count = 0, 0
- video_list = await self.crawl_data(session)
- if not video_list:
- self.logger.info(f"{self.trace_id} 未获取到视频")
- return success_count, fail_count
- for video in video_list:
- result = await self.process_and_push_video(video)
- if result:
- success_count += 1
- else:
- fail_count += 1
- return success_count, fail_count
- async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
- """
- 单条视频处理流程:字段提取 -> 校验过滤 -> 标题处理 -> 推送ETL
- """
- try:
- video_obj = await self.process_video(video)
- if not video_obj:
- return False
- if not await self.filter_data(video_obj):
- return False
- await self.integrated_video_handling(video_obj)
- return await self.push_to_etl(video_obj)
- except Exception as e:
- self.logger.exception(f"{self.trace_id} 视频处理异常 {e}")
- return False
- async def crawl_data(self,session) -> Optional[List[Dict]]:
- """
- 抓取数据,自动重试,自动分页
- :param session:
- :param dynamic_variables:
- :return:
- """
- response = await self.request_client.request(
- session=session,
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=self.resolved_body
- )
- print(safe_extract(response, self.response_data))
- self.resolved_body = resolve_request_body_template(self.request_body,safe_extract(response, self.response_data) )
- data = safe_extract(response, self.data_path)
- return data if data else []
- async def filter_data(self, video: Dict) -> bool:
- """校验视频是否符合规则"""
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=video,
- trace_id=self.platform + str(uuid.uuid1())
- )
- return await pipeline.process_item()
- async def is_video_count_sufficient(self) -> bool:
- """
- 校验视频是否达到当日最大量
- :return:True False
- """
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if not rule_videos_cnt:
- return True
- async with AsyncMysqlService(self.platform, self.mode) as mysql:
- video_count = await mysql.get_today_videos()
- if video_count >= rule_videos_cnt.get("min", 200):
- self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
- self.aliyun_logr.logging(
- code="1011",
- message=f"视频数量达到当日最大值",
- data=f"<今日视频数量>{video_count}"
- )
- return False
- self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
- return True
- async def process_video(self, video: Dict) -> Optional[Dict]:
- """
- 处理单条视频数据,字段映射关系,必要字段检验
- :param video:
- :return:
- """
- self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
- publish_user = random.choice(self.user_list)
- item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
- item_kwargs["user_id"] = publish_user["uid"]
- item_kwargs["user_name"] = publish_user["nick_name"]
- item_kwargs["platform"] = self.platform
- item_kwargs["strategy"] = self.mode
- try:
- item = VideoItem(**item_kwargs)
- video_dict = await item.produce_item()
- if not video_dict:
- self.logger.warning(f"{self.trace_id} 校验失败")
- return None
- return video_dict
- except Exception as e:
- self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
- return None
- async def push_to_etl(self, video: Dict[str, Any]) -> bool:
- """
- 推送处理完毕的视频到 ETL
- """
- try:
- await self.mq_producer.send_msg(video)
- return True
- except Exception as e:
- self.logger.exception(f"{self.trace_id} 推送ETL失败 {e}")
- return False
- async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
- """
- 视频处理
- :return:
- """
- await generate_titles(self.feishu_sheetid,video)
- async def _wait_for_next_loop(self, current_loop: int) -> None:
- """等待下一次循环请求"""
- if current_loop < self.loop_times and self.loop_interval > 0:
- self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
- await asyncio.sleep(self.loop_interval)
- async def before_run(self):
- """
- 可覆写钩子:在运行前执行,如拉取Token等
- """
- pass
- async def after_run(self):
- """
- 可覆写钩子:在运行后执行,如统计汇报等
- """
- pass
|