123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import os
- import sys
- import json
- import random
- import time
- import uuid
- import yaml
- import requests
- from datetime import datetime
- from typing import Dict, Any, List, Optional
- from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
- from utils.extractors import safe_extract,extract_multiple
- # 添加公共模块路径
- sys.path.append(os.getcwd())
- print(os.getcwd())
- from application.items import VideoItem
- from application.pipeline import PiaoQuanPipeline
- from application.common.messageQueue import MQ
- from application.common.log import AliyunLogger
- # from application.common.mysql import MysqlHelper
- from configs.messages import MESSAGES
- from configs import codes
- from utils.config_loader import ConfigLoader
- from application.common.log import Local
- from configs.config import base_url
- class UniversalCrawler:
- """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
- def __init__(self, platform: str, mode: str, rule_dict: Dict, user_list: List, env: str = "prod"):
- """
- 初始化爬虫
- :param platform: 平台名称(对应YAML文件名)
- :param env: 运行环境
- """
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.user_list = user_list
- self.env = env
- self.config_path = "/Users/zhangliang/Documents/piaoquan/AutoScraperX/configs/spiders_config.yaml"
- self.config = ConfigLoader().get_platform_config(self.platform)
- self.aliyun_log = AliyunLogger(platform=platform, mode=self.config["mode"])
- self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
- # self.mysql = MysqlHelper(mode=self.config["mode"], platform=platform)
- self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
- self.download_cnt = 0
- self.limit_flag = False
- self.base_api = base_url
- @retry(
- stop=stop_after_attempt(3), # 最多重试 3 次
- wait=wait_fixed(2), # 每次重试间隔 2 秒
- retry=retry_if_exception_type((requests.RequestException, ValueError))
- )
- def _send_request(self, method: str, url: str, headers, payload, timeout = 30) -> Optional[
- Dict]:
- """发送API请求,失败自动重试最多3次"""
- try:
- response = requests.request(
- method=method,
- url=url,
- headers=headers,
- json=payload,
- timeout=timeout
- )
- response.raise_for_status()
- resp = response.json()
- if resp["code"] == 0:
- return response.json()
- raise ValueError(f"接口响应非0:{resp}")
- except Exception as e:
- # 在最后一次失败时才记录日志
- self.aliyun_log.logging(
- code="3000",
- message=f"请求失败: {url}",
- data={"error": str(e)}
- )
- return
- def _process_video(self, video_data: Dict) -> bool:
- """处理单个视频数据"""
- # 从配置中获取字段映射
- field_map = self.config["response_parse"]["fields"]
- # 创建视频项
- item = VideoItem()
- for field_name, path in field_map.items():
- if isinstance(path, str) and path.startswith("$."):
- match = safe_extract(video_data,path)
- item.add_video_info(field_name, match)
- else:
- # 如果是固定值(int、str等),直接使用
- item.add_video_info(field_name,path)
- # 添加固定字段
- item.add_video_info("platform", self.platform)
- item.add_video_info("strategy", self.config["mode"])
- item.add_video_info("session", f"{self.platform}-{int(time.time())}")
- # 随机选择一个用户
- our_user = random.choice(self.user_list)
- item.add_video_info("user_id", our_user["uid"])
- item.add_video_info("user_name", our_user["nick_name"])
- print(item)
- # 处理管道
- trace_id = f"{self.platform}-{uuid.uuid4()}"
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.config["mode"],
- rule_dict=self.rule_dict,
- env=self.env,
- item=item.produce_item(),
- trace_id=trace_id,
- )
- if pipeline.process_item():
- self.download_cnt += 1
- self.mq.send_msg(item.produce_item())
- self.aliyun_log.logging(
- code="1002",
- message="成功发送至ETL",
- data=item.produce_item()
- )
- # 检查下载限制
- min_limit = self.config.get("download_limit", {}).get("min", 200)
- if self.download_cnt >= min_limit:
- self.limit_flag = True
- self.aliyun_log.logging(
- code="2000",
- message=f"达到下载限制: {min_limit}",
- )
- return True
- return False
- # --------------------- 自定义处理函数 ---------------------
- def _func_current_timestamp(self, _) -> int:
- """获取当前时间戳"""
- return int(time.time())
- def _func_formatted_time(self, _) -> str:
- """获取格式化时间"""
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- def _func_random_delay(self, _) -> None:
- """随机延迟"""
- min_delay = self.config.get("delay", {}).get("min", 3)
- max_delay = self.config.get("delay", {}).get("max", 8)
- time.sleep(random.randint(min_delay, max_delay))
- return None
- def run(self):
- """执行爬取任务"""
- self.logger.info(f"开始执行爬虫{self.platform}")
- while not self.limit_flag:
- # 获取初始列表数据
- initial_data = self._send_request(
- self.config["method"].upper(),
- self.config["url"],
- self.config.get("headers", {}),
- self.config.get("request_body", {})
- )
- print(initial_data)
- if not initial_data:
- return
- video_objs = safe_extract(initial_data,self.config["response_parse"]["data_path"])
- self.logger.info(f"获取到的视频列表:{json.dumps(video_objs)}")
- next_cursor = None
- # 处理视频列表
- video_list = safe_extract(
- initial_data,
- self.config["response_parse"]["data_path"]
- )
- for video_data in video_list:
- self.logger.info(f"视频对象{video_data}")
- if self.limit_flag:
- break
- self._process_video(video_data)
- # 执行额外操作(如曝光上报)
- for action in self.config.get("post_actions", []):
- if action["trigger"] == "after_video_processed":
- self._send_request(action["endpoint"], action.get("payload", {}))
- if __name__ == '__main__':
- cr = UniversalCrawler("benshanzhufu", "recommend",
- rule_dict={'videos_cnt': {'min': 500, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
- user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}])
- cr.run()
|