base_spider.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import asyncio
  2. import aiohttp
  3. from abc import ABC
  4. from typing import List, Dict, Optional
  5. import time
  6. from application.config.common.log.logger_manager import LoggerManager
  7. from utils.extractors import safe_extract
  8. from application.config.common import MQ
  9. from utils.config_loader import ConfigLoader # 新增导入
  10. class BaseSpider(ABC):
  11. """
  12. 通用爬虫基类:支持严格顺序执行流程
  13. """
  14. MAX_RETRIES = 3 # 单个请求最大重试次数
  15. TIMEOUT = 30 # 请求超时时间(秒)
  16. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  17. self.trace_id = trace_id
  18. self.env = env
  19. self.user_list = user_list
  20. self.rule_dict = rule_dict
  21. self.class_name = self.__class__.__name__ # 获取子类类名
  22. # 根据类名自动获取配置
  23. self.platform_config = ConfigLoader.get_config_by_class_name(self.class_name)
  24. if not self.platform_config:
  25. raise ValueError(f"找不到对应配置: {self.class_name}")
  26. # 初始化日志和MQ
  27. self.platform = self.platform_config.get("platform")
  28. self.mode = self.platform_config.get("mode")
  29. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  30. self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
  31. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  32. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  33. # 请求配置
  34. self.method = self.platform_config.get("method", "GET").upper()
  35. self.url = self.platform_config.get("url")
  36. self.headers = self.platform_config.get("headers", {})
  37. self.body = self.platform_config.get("request_body", {})
  38. self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
  39. self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
  40. self.video_fields_map = self.platform_config.get("video_fields_map", {})
  41. # 流程控制配置
  42. self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
  43. self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
  44. self.logger.info(
  45. f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
  46. async def _send_async_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
  47. """
  48. 发送异步HTTP请求,支持重试机制
  49. """
  50. retries = 0
  51. timeout = aiohttp.ClientTimeout(total=self.TIMEOUT)
  52. while retries < self.MAX_RETRIES:
  53. try:
  54. async with aiohttp.ClientSession(timeout=timeout) as session:
  55. async with session.request(method, url, **kwargs) as response:
  56. response.raise_for_status()
  57. return response
  58. except Exception as e:
  59. retries += 1
  60. remaining_attempts = self.MAX_RETRIES - retries
  61. if retries < self.MAX_RETRIES:
  62. self.logger.warning(
  63. f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
  64. f"剩余尝试次数: {remaining_attempts}"
  65. )
  66. await asyncio.sleep(1) # 异步等待
  67. else:
  68. self.aliyun_logr.logging(
  69. code="5001",
  70. message="请求失败,已达到最大重试次数",
  71. data={
  72. "url": url,
  73. "method": method,
  74. "error": str(e),
  75. "headers": kwargs.get("headers", {}),
  76. "body": kwargs.get("json", {})
  77. },
  78. trace_id=self.trace_id
  79. )
  80. self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
  81. raise
  82. async def crawl_data(self) -> Optional[List[Dict]]:
  83. """异步获取视频数据"""
  84. self.logger.info(f"{self.trace_id}--开始获取视频数据")
  85. try:
  86. response = await self._send_async_request(
  87. method=self.method,
  88. url=self.url,
  89. headers=self.headers,
  90. json=self.body
  91. )
  92. result = await response.json()
  93. data = safe_extract(result, self.data_path)
  94. if not data:
  95. self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
  96. return []
  97. self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
  98. return data
  99. except Exception as e:
  100. self.logger.error(f"{self.trace_id}--获取视频数据失败: {e}")
  101. return []
  102. def filter_data(self, video: Dict) -> bool:
  103. """校验视频是否符合规则"""
  104. if not self.rule_dict:
  105. return True
  106. rule_duration = self.rule_dict.get("duration")
  107. if rule_duration:
  108. video_url = safe_extract(video, self.video_fields_map.get("video_url"))
  109. duration = self.get_video_duration(video_url)
  110. if not (rule_duration['min'] <= duration <= rule_duration['max']):
  111. return False
  112. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  113. if rule_videos_cnt:
  114. video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
  115. if video_count >= rule_videos_cnt.get("min", 0):
  116. return False
  117. return True
  118. def process_video(self, video: Dict) -> Optional[Dict]:
  119. """处理单条视频数据"""
  120. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  121. try:
  122. item = {}
  123. for field, path in self.field_map.items():
  124. value = safe_extract(video, path)
  125. if value is None:
  126. self.logger.warning(f"{self.trace_id}--字段提取失败: {field}")
  127. continue
  128. item[field] = value
  129. if not item:
  130. self.logger.warning(f"{self.trace_id}--视频处理结果为空")
  131. return None
  132. item.update({
  133. "platform": self.platform,
  134. "strategy": self.mode,
  135. "session": f"{self.platform}-{int(time.time())}"
  136. })
  137. self.logger.debug(f"{self.trace_id}--视频处理成功")
  138. return item
  139. except Exception as e:
  140. self.logger.error(f"{self.trace_id}--视频处理异常: {e}")
  141. return None
  142. def push_to_etl(self, item: Dict) -> bool:
  143. """推送数据到ETL(同步)"""
  144. self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('title', '无标题')}")
  145. try:
  146. self.mq.send_msg(item)
  147. self.aliyun_logr.logging(
  148. code="1002",
  149. message="成功发送至ETL",
  150. data=item,
  151. trace_id=self.trace_id
  152. )
  153. self.logger.info(f"{self.trace_id}--数据推送成功")
  154. return True
  155. except Exception as e:
  156. self.logger.error(f"{self.trace_id}--数据推送失败: {e}")
  157. return False
  158. async def run(self):
  159. """
  160. 异步运行爬虫任务,严格按顺序执行
  161. 1. 爬取
  162. 2. 过滤
  163. 3. 处理每条数据
  164. 4. 推送到ETL
  165. """
  166. self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
  167. for loop_index in range(1, self.loop_times + 1):
  168. self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
  169. loop_start_time = time.time()
  170. # 步骤1: 获取视频数据(失败则跳过当前循环)
  171. video_list = await self.crawl_data()
  172. if not video_list:
  173. self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
  174. await self._wait_for_next_loop(loop_index)
  175. continue
  176. # 步骤2: 处理每条视频并推送到ETL
  177. success_count = 0
  178. fail_count = 0
  179. for video in video_list:
  180. # 步骤2.1: 校验视频(失败则跳过)
  181. if not self.filter_data(video):
  182. self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
  183. continue
  184. # 步骤2.2: 处理视频(失败则记录并继续)
  185. item = self.process_video(video)
  186. if not item:
  187. self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
  188. fail_count += 1
  189. continue
  190. # 步骤2.3: 推送到ETL(失败则记录并继续)
  191. if self.push_to_etl(item):
  192. success_count += 1
  193. else:
  194. fail_count += 1
  195. loop_duration = time.time() - loop_start_time
  196. self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
  197. f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
  198. # 等待下一次循环
  199. await self._wait_for_next_loop(loop_index)
  200. self.logger.info(f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成")
  201. return True
  202. async def _wait_for_next_loop(self, current_loop: int) -> None:
  203. """等待下一次循环请求"""
  204. if current_loop < self.loop_times and self.loop_interval > 0:
  205. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  206. await asyncio.sleep(self.loop_interval)