universal_crawler.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import os
  2. import sys
  3. import json
  4. import random
  5. import time
  6. import uuid
  7. import yaml
  8. import requests
  9. from datetime import datetime
  10. from typing import Dict, Any, List, Optional
  11. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
  12. from utils.extractors import safe_extract,extract_multiple
  13. # 添加公共模块路径
  14. sys.path.append(os.getcwd())
  15. print(os.getcwd())
  16. from application.items import VideoItem
  17. from application.pipeline import PiaoQuanPipeline
  18. from application.common.messageQueue import MQ
  19. from application.common.log import AliyunLogger
  20. # from application.common.mysql import MysqlHelper
  21. from configs.messages import MESSAGES
  22. from configs import codes
  23. from utils.config_loader import ConfigLoader
  24. from application.common.log import Local
  25. from configs.config import base_url
  26. class UniversalCrawler:
  27. """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
  28. def __init__(self, platform: str, mode: str, rule_dict: Dict, user_list: List, env: str = "prod"):
  29. """
  30. 初始化爬虫
  31. :param platform: 平台名称(对应YAML文件名)
  32. :param env: 运行环境
  33. """
  34. self.platform = platform
  35. self.mode = mode
  36. self.rule_dict = rule_dict
  37. self.user_list = user_list
  38. self.env = env
  39. self.config_path = "/Users/zhangliang/Documents/piaoquan/AutoScraperX/configs/spiders_config.yaml"
  40. self.config = ConfigLoader().get_platform_config(self.platform)
  41. self.aliyun_log = AliyunLogger(platform=platform, mode=self.config["mode"])
  42. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  43. # self.mysql = MysqlHelper(mode=self.config["mode"], platform=platform)
  44. self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
  45. self.download_cnt = 0
  46. self.limit_flag = False
  47. self.base_api = base_url
  48. @retry(
  49. stop=stop_after_attempt(3), # 最多重试 3 次
  50. wait=wait_fixed(2), # 每次重试间隔 2 秒
  51. retry=retry_if_exception_type((requests.RequestException, ValueError))
  52. )
  53. def _send_request(self, method: str, url: str, headers, payload, timeout = 30) -> Optional[
  54. Dict]:
  55. """发送API请求,失败自动重试最多3次"""
  56. try:
  57. response = requests.request(
  58. method=method,
  59. url=url,
  60. headers=headers,
  61. json=payload,
  62. timeout=timeout
  63. )
  64. response.raise_for_status()
  65. resp = response.json()
  66. if resp["code"] == 0:
  67. return response.json()
  68. raise ValueError(f"接口响应非0:{resp}")
  69. except Exception as e:
  70. # 在最后一次失败时才记录日志
  71. self.aliyun_log.logging(
  72. code="3000",
  73. message=f"请求失败: {url}",
  74. data={"error": str(e)}
  75. )
  76. return
  77. def _process_video(self, video_data: Dict) -> bool:
  78. """处理单个视频数据"""
  79. # 从配置中获取字段映射
  80. field_map = self.config["response_parse"]["fields"]
  81. # 创建视频项
  82. item = VideoItem()
  83. for field_name, path in field_map.items():
  84. if isinstance(path, str) and path.startswith("$."):
  85. match = safe_extract(video_data,path)
  86. item.add_video_info(field_name, match)
  87. else:
  88. # 如果是固定值(int、str等),直接使用
  89. item.add_video_info(field_name,path)
  90. # 添加固定字段
  91. item.add_video_info("platform", self.platform)
  92. item.add_video_info("strategy", self.config["mode"])
  93. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  94. # 随机选择一个用户
  95. our_user = random.choice(self.user_list)
  96. item.add_video_info("user_id", our_user["uid"])
  97. item.add_video_info("user_name", our_user["nick_name"])
  98. print(item)
  99. # 处理管道
  100. trace_id = f"{self.platform}-{uuid.uuid4()}"
  101. pipeline = PiaoQuanPipeline(
  102. platform=self.platform,
  103. mode=self.config["mode"],
  104. rule_dict=self.rule_dict,
  105. env=self.env,
  106. item=item.produce_item(),
  107. trace_id=trace_id,
  108. )
  109. if pipeline.process_item():
  110. self.download_cnt += 1
  111. self.mq.send_msg(item.produce_item())
  112. self.aliyun_log.logging(
  113. code="1002",
  114. message="成功发送至ETL",
  115. data=item.produce_item()
  116. )
  117. # 检查下载限制
  118. min_limit = self.config.get("download_limit", {}).get("min", 200)
  119. if self.download_cnt >= min_limit:
  120. self.limit_flag = True
  121. self.aliyun_log.logging(
  122. code="2000",
  123. message=f"达到下载限制: {min_limit}",
  124. )
  125. return True
  126. return False
  127. # --------------------- 自定义处理函数 ---------------------
  128. def _func_current_timestamp(self, _) -> int:
  129. """获取当前时间戳"""
  130. return int(time.time())
  131. def _func_formatted_time(self, _) -> str:
  132. """获取格式化时间"""
  133. return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  134. def _func_random_delay(self, _) -> None:
  135. """随机延迟"""
  136. min_delay = self.config.get("delay", {}).get("min", 3)
  137. max_delay = self.config.get("delay", {}).get("max", 8)
  138. time.sleep(random.randint(min_delay, max_delay))
  139. return None
  140. def run(self):
  141. """执行爬取任务"""
  142. self.logger.info(f"开始执行爬虫{self.platform}")
  143. while not self.limit_flag:
  144. # 获取初始列表数据
  145. initial_data = self._send_request(
  146. self.config["method"].upper(),
  147. self.config["url"],
  148. self.config.get("headers", {}),
  149. self.config.get("request_body", {})
  150. )
  151. print(initial_data)
  152. if not initial_data:
  153. return
  154. video_objs = safe_extract(initial_data,self.config["response_parse"]["data_path"])
  155. self.logger.info(f"获取到的视频列表:{json.dumps(video_objs)}")
  156. next_cursor = None
  157. # 处理视频列表
  158. video_list = safe_extract(
  159. initial_data,
  160. self.config["response_parse"]["data_path"]
  161. )
  162. for video_data in video_list:
  163. self.logger.info(f"视频对象{video_data}")
  164. if self.limit_flag:
  165. break
  166. self._process_video(video_data)
  167. # 执行额外操作(如曝光上报)
  168. for action in self.config.get("post_actions", []):
  169. if action["trigger"] == "after_video_processed":
  170. self._send_request(action["endpoint"], action.get("payload", {}))
  171. if __name__ == '__main__':
  172. cr = UniversalCrawler("benshanzhufu", "recommend",
  173. rule_dict={'videos_cnt': {'min': 500, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
  174. user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}])
  175. cr.run()