universal_crawler.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import random
  2. import time
  3. import uuid
  4. import requests
  5. from typing import Dict, List, Optional
  6. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
  7. from application.config.common import MQ
  8. from configs.config import base_url
  9. from application.functions import MysqlService
  10. from application.items import VideoItem
  11. from application.pipeline import PiaoQuanPipeline
  12. from utils.extractors import safe_extract
  13. from application.spiders.base_spider import BaseSpider # 抽象基类导入
  14. def before_send_log(retry_state: RetryCallState) -> None:
  15. attempt = retry_state.attempt_number
  16. last_result = retry_state.outcome
  17. if last_result.failed:
  18. exc = last_result.exception()
  19. logger = retry_state.kwargs.get('logger')
  20. url = retry_state.args[0] if retry_state.args else "unknown"
  21. if logger:
  22. logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
  23. class UniversalCrawler(BaseSpider):
  24. def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  25. super().__init__(platform_config, rule_dict, user_list, trace_id, env)
  26. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  27. self.base_api = base_url
  28. self.has_enough_videos = False
  29. self.download_cnt = 0
  30. self.loop_times = self.platform_config.get('loop_times', 1)
  31. self.request_method = self.platform_config["method"].upper()
  32. self.request_url = self.platform_config["url"]
  33. self.request_headers = self.platform_config.get("headers", {})
  34. self.request_body = self.platform_config.get("request_body", {})
  35. self.response_data_path = self.platform_config["response_parse"]["data_path"]
  36. self.video_fields_map = self.platform_config["response_parse"]["fields"]
  37. @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
  38. def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
  39. method = method or self.request_method
  40. headers = headers or self.request_headers
  41. payload = payload or self.request_body
  42. response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout)
  43. response.raise_for_status()
  44. resp = response.json()
  45. if resp.get("code") == 0:
  46. return resp
  47. raise ValueError(f"API响应错误: {resp}")
  48. def fetch_video_data(self) -> Optional[List[Dict]]:
  49. self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}")
  50. try:
  51. response = self._send_request(self.request_url)
  52. return safe_extract(response, self.response_data_path) or []
  53. except Exception as e:
  54. self.logger.error(f"{self.trace_id}--请求失败: {e}")
  55. return []
  56. def is_video_qualified(self, video: Dict) -> bool:
  57. if not self.rule_dict:
  58. return True
  59. rule_duration = self.rule_dict.get("duration")
  60. if rule_duration:
  61. video_url = safe_extract(video, self.video_fields_map.get("video_url"))
  62. duration = self.get_video_duration(video_url)
  63. if not (rule_duration['min'] <= duration <= rule_duration['max']):
  64. return False
  65. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  66. if rule_videos_cnt:
  67. video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
  68. if video_count >= rule_videos_cnt.get("min", 0):
  69. return False
  70. return True
  71. def transform_to_etl_item(self, video: Dict) -> Optional[Dict]:
  72. item = VideoItem()
  73. for field, path in self.video_fields_map.items():
  74. val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
  75. item.add_video_info(field, val)
  76. item.add_video_info("platform", self.platform)
  77. item.add_video_info("strategy", self.mode)
  78. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  79. user = random.choice(self.user_list)
  80. item.add_video_info("user_id", user["uid"])
  81. item.add_video_info("user_name", user["nick_name"])
  82. return item.produce_item()
  83. def push_to_etl(self, item: Dict) -> bool:
  84. trace_id = f"{self.platform}-{uuid.uuid4()}"
  85. pipeline = PiaoQuanPipeline(
  86. platform=self.platform,
  87. mode=self.mode,
  88. rule_dict=self.rule_dict,
  89. env=self.env,
  90. item=item,
  91. trace_id=trace_id,
  92. )
  93. if pipeline.process_item():
  94. self.download_cnt += 1
  95. self.mq.send_msg(item)
  96. self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
  97. if self.download_cnt >= self.download_min_limit:
  98. self.has_enough_videos = True
  99. self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id)
  100. return True
  101. return False