import os import sys import json import random import time import uuid import yaml import requests from datetime import datetime from typing import Dict, Any, List, Optional from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from utils.extractors import safe_extract,extract_multiple # 添加公共模块路径 sys.path.append(os.getcwd()) print(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger # from application.common.mysql import MysqlHelper from configs.messages import MESSAGES from configs import codes from utils.config_loader import ConfigLoader from application.common.log import Local from configs.config import base_url class UniversalCrawler: """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑""" def __init__(self, platform: str, mode: str, rule_dict: Dict, user_list: List, env: str = "prod"): """ 初始化爬虫 :param platform: 平台名称(对应YAML文件名) :param env: 运行环境 """ self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.config_path = "/Users/zhangliang/Documents/piaoquan/AutoScraperX/configs/spiders_config.yaml" self.config = ConfigLoader().get_platform_config(self.platform) self.aliyun_log = AliyunLogger(platform=platform, mode=self.config["mode"]) self.mq = MQ(topic_name=f"topic_crawler_etl_{env}") # self.mysql = MysqlHelper(mode=self.config["mode"], platform=platform) self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True) self.download_cnt = 0 self.limit_flag = False self.base_api = base_url @retry( stop=stop_after_attempt(3), # 最多重试 3 次 wait=wait_fixed(2), # 每次重试间隔 2 秒 retry=retry_if_exception_type((requests.RequestException, ValueError)) ) def _send_request(self, method: str, url: str, headers, payload, timeout = 30) -> Optional[ Dict]: """发送API请求,失败自动重试最多3次""" try: response = requests.request( method=method, url=url, headers=headers, json=payload, timeout=timeout ) response.raise_for_status() resp = response.json() if resp["code"] == 0: return response.json() raise ValueError(f"接口响应非0:{resp}") except Exception as e: # 在最后一次失败时才记录日志 self.aliyun_log.logging( code="3000", message=f"请求失败: {url}", data={"error": str(e)} ) return def _process_video(self, video_data: Dict) -> bool: """处理单个视频数据""" # 从配置中获取字段映射 field_map = self.config["response_parse"]["fields"] # 创建视频项 item = VideoItem() for field_name, path in field_map.items(): if isinstance(path, str) and path.startswith("$."): match = safe_extract(video_data,path) item.add_video_info(field_name, match) else: # 如果是固定值(int、str等),直接使用 item.add_video_info(field_name,path) # 添加固定字段 item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.config["mode"]) item.add_video_info("session", f"{self.platform}-{int(time.time())}") # 随机选择一个用户 our_user = random.choice(self.user_list) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) print(item) # 处理管道 trace_id = f"{self.platform}-{uuid.uuid4()}" pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.config["mode"], rule_dict=self.rule_dict, env=self.env, item=item.produce_item(), trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(item.produce_item()) self.aliyun_log.logging( code="1002", message="成功发送至ETL", data=item.produce_item() ) # 检查下载限制 min_limit = self.config.get("download_limit", {}).get("min", 200) if self.download_cnt >= min_limit: self.limit_flag = True self.aliyun_log.logging( code="2000", message=f"达到下载限制: {min_limit}", ) return True return False # --------------------- 自定义处理函数 --------------------- def _func_current_timestamp(self, _) -> int: """获取当前时间戳""" return int(time.time()) def _func_formatted_time(self, _) -> str: """获取格式化时间""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _func_random_delay(self, _) -> None: """随机延迟""" min_delay = self.config.get("delay", {}).get("min", 3) max_delay = self.config.get("delay", {}).get("max", 8) time.sleep(random.randint(min_delay, max_delay)) return None def run(self): """执行爬取任务""" self.logger.info(f"开始执行爬虫{self.platform}") while not self.limit_flag: # 获取初始列表数据 initial_data = self._send_request( self.config["method"].upper(), self.config["url"], self.config.get("headers", {}), self.config.get("request_body", {}) ) print(initial_data) if not initial_data: return video_objs = safe_extract(initial_data,self.config["response_parse"]["data_path"]) self.logger.info(f"获取到的视频列表:{json.dumps(video_objs)}") next_cursor = None # 处理视频列表 video_list = safe_extract( initial_data, self.config["response_parse"]["data_path"] ) for video_data in video_list: self.logger.info(f"视频对象{video_data}") if self.limit_flag: break self._process_video(video_data) # 执行额外操作(如曝光上报) for action in self.config.get("post_actions", []): if action["trigger"] == "after_video_processed": self._send_request(action["endpoint"], action.get("payload", {})) if __name__ == '__main__': cr = UniversalCrawler("benshanzhufu", "recommend", rule_dict={'videos_cnt': {'min': 500, 'max': 0}, 'duration': {'min': 30, 'max': 1200}}, user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}]) cr.run()