123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import random
- import time
- import uuid
- import requests
- from typing import Dict, List, Optional
- from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
- from application.config.common import MQ
- from configs.config import base_url
- from application.functions import MysqlService
- from application.items import VideoItem
- from application.pipeline import PiaoQuanPipeline
- from utils.extractors import safe_extract
- from application.spiders.base_spider import BaseSpider # 抽象基类导入
- def before_send_log(retry_state: RetryCallState) -> None:
- attempt = retry_state.attempt_number
- last_result = retry_state.outcome
- if last_result.failed:
- exc = last_result.exception()
- logger = retry_state.kwargs.get('logger')
- url = retry_state.args[0] if retry_state.args else "unknown"
- if logger:
- logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
- class UniversalCrawler(BaseSpider):
- def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
- super().__init__(platform_config, rule_dict, user_list, trace_id, env)
- self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
- self.base_api = base_url
- self.has_enough_videos = False
- self.download_cnt = 0
- self.loop_times = self.platform_config.get('loop_times', 1)
- self.request_method = self.platform_config["method"].upper()
- self.request_url = self.platform_config["url"]
- self.request_headers = self.platform_config.get("headers", {})
- self.request_body = self.platform_config.get("request_body", {})
- self.response_data_path = self.platform_config["response_parse"]["data_path"]
- self.video_fields_map = self.platform_config["response_parse"]["fields"]
- @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
- def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
- method = method or self.request_method
- headers = headers or self.request_headers
- payload = payload or self.request_body
- response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout)
- response.raise_for_status()
- resp = response.json()
- if resp.get("code") == 0:
- return resp
- raise ValueError(f"API响应错误: {resp}")
- def fetch_video_data(self) -> Optional[List[Dict]]:
- self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}")
- try:
- response = self._send_request(self.request_url)
- return safe_extract(response, self.response_data_path) or []
- except Exception as e:
- self.logger.error(f"{self.trace_id}--请求失败: {e}")
- return []
- def is_video_qualified(self, video: Dict) -> bool:
- if not self.rule_dict:
- return True
- rule_duration = self.rule_dict.get("duration")
- if rule_duration:
- video_url = safe_extract(video, self.video_fields_map.get("video_url"))
- duration = self.get_video_duration(video_url)
- if not (rule_duration['min'] <= duration <= rule_duration['max']):
- return False
- rule_videos_cnt = self.rule_dict.get("videos_cnt")
- if rule_videos_cnt:
- video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
- if video_count >= rule_videos_cnt.get("min", 0):
- return False
- return True
- def transform_to_etl_item(self, video: Dict) -> Optional[Dict]:
- item = VideoItem()
- for field, path in self.video_fields_map.items():
- val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
- item.add_video_info(field, val)
- item.add_video_info("platform", self.platform)
- item.add_video_info("strategy", self.mode)
- item.add_video_info("session", f"{self.platform}-{int(time.time())}")
- user = random.choice(self.user_list)
- item.add_video_info("user_id", user["uid"])
- item.add_video_info("user_name", user["nick_name"])
- return item.produce_item()
- def push_to_etl(self, item: Dict) -> bool:
- trace_id = f"{self.platform}-{uuid.uuid4()}"
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=item,
- trace_id=trace_id,
- )
- if pipeline.process_item():
- self.download_cnt += 1
- self.mq.send_msg(item)
- self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
- if self.download_cnt >= self.download_min_limit:
- self.has_enough_videos = True
- self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id)
- return True
- return False
|