universal_crawler.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import random
  2. import time
  3. import uuid
  4. from typing import Dict, List, Optional
  5. import requests
  6. from application.config.common import MQ
  7. from application.functions import MysqlService
  8. from application.items import VideoItem
  9. from application.pipeline import PiaoQuanPipeline
  10. from config.config import base_url
  11. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
  12. from core.utils import safe_extract
  13. from spiders.base_spider import BaseSpider # 抽象基类导入
  14. def before_send_log(retry_state: RetryCallState) -> None:
  15. attempt = retry_state.attempt_number
  16. last_result = retry_state.outcome
  17. if last_result.failed:
  18. exc = last_result.exception()
  19. logger = retry_state.kwargs.get('logger')
  20. url = retry_state.args[0] if retry_state.args else "unknown"
  21. if logger:
  22. logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
  23. class UniversalCrawler(BaseSpider):
  24. def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  25. super().__init__(platform_config, rule_dict, user_list, trace_id, env)
  26. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  27. self.base_api = base_url
  28. self.has_enough_videos = False
  29. self.download_cnt = 0
  30. self.loop_times = self.platform_config.get('loop_times', 1)
  31. self.request_method = self.platform_config["method"].upper()
  32. self.request_url = self.platform_config["url"]
  33. self.request_headers = self.platform_config.get("headers", {})
  34. self.request_body = self.platform_config.get("request_body", {})
  35. self.response_data_path = self.platform_config["response_parse"]["data_path"]
  36. self.video_fields_map = self.platform_config["response_parse"]["fields"]
  37. @retry(stop=stop_after_attempt(3), wait=wait_fixed(2),
  38. retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
  39. def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None,
  40. timeout: int = 30) -> Optional[Dict]:
  41. method = method or self.request_method
  42. headers = headers or self.request_headers
  43. payload = payload or self.request_body
  44. response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout)
  45. response.raise_for_status()
  46. resp = response.json()
  47. if resp.get("code") == 0:
  48. return resp
  49. raise ValueError(f"API响应错误: {resp}")
  50. def fetch_video_data(self) -> Optional[List[Dict]]:
  51. self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}")
  52. try:
  53. response = self._send_request(self.request_url)
  54. return safe_extract(response, self.response_data_path) or []
  55. except Exception as e:
  56. self.logger.error(f"{self.trace_id}--请求失败: {e}")
  57. return []
  58. def is_video_qualified(self, video: Dict) -> bool:
  59. if not self.rule_dict:
  60. return True
  61. rule_duration = self.rule_dict.get("duration")
  62. if rule_duration:
  63. video_url = safe_extract(video, self.video_fields_map.get("video_url"))
  64. duration = self.get_video_duration(video_url)
  65. if not (rule_duration['min'] <= duration <= rule_duration['max']):
  66. return False
  67. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  68. if rule_videos_cnt:
  69. video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
  70. if video_count >= rule_videos_cnt.get("min", 0):
  71. return False
  72. return True
  73. def transform_to_etl_item(self, video: Dict) -> Optional[Dict]:
  74. item = VideoItem()
  75. for field, path in self.video_fields_map.items():
  76. val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
  77. item.add_video_info(field, val)
  78. item.add_video_info("classname", self.platform)
  79. item.add_video_info("strategy", self.mode)
  80. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  81. user = random.choice(self.user_list)
  82. item.add_video_info("user_id", user["uid"])
  83. item.add_video_info("user_name", user["nick_name"])
  84. return item.produce_item()
  85. def push_to_etl(self, item: Dict) -> bool:
  86. trace_id = f"{self.platform}-{uuid.uuid4()}"
  87. pipeline = PiaoQuanPipeline(
  88. platform=self.platform,
  89. mode=self.mode,
  90. rule_dict=self.rule_dict,
  91. env=self.env,
  92. item=item,
  93. trace_id=trace_id,
  94. )
  95. if pipeline.process_item():
  96. self.download_cnt += 1
  97. self.mq.send_msg(item)
  98. self.aliyun_log.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
  99. if self.download_cnt >= self.download_min_limit:
  100. self.has_enough_videos = True
  101. self.aliyun_log.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}",
  102. trace_id=self.trace_id)
  103. return True
  104. return False