base_spider.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import asyncio
  2. import aiohttp
  3. from abc import ABC
  4. from typing import List, Dict, Optional
  5. import time
  6. from application.config.common import LoggerManager
  7. from utils.extractors import safe_extract
  8. from application.config.common import MQ
  9. from utils.config_loader import ConfigLoader # 新增导入
  10. class BaseSpider(ABC):
  11. """
  12. 通用爬虫基类:支持严格顺序执行流程
  13. """
  14. MAX_RETRIES = 3 # 单个请求最大重试次数
  15. TIMEOUT = 30 # 请求超时时间(秒)
  16. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  17. self.trace_id = trace_id
  18. self.env = env
  19. self.user_list = user_list
  20. self.rule_dict = rule_dict
  21. self.class_name = self.__class__.__name__ # 获取子类类名
  22. # 根据类名自动获取配置
  23. self.platform_config = ConfigLoader.get_config_by_class_name(self.class_name)
  24. if not self.platform_config:
  25. raise ValueError(f"找不到对应配置: {self.class_name}")
  26. # 初始化日志和MQ
  27. self.platform = self.platform_config.get("platform")
  28. self.mode = self.platform_config.get("mode")
  29. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  30. self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
  31. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  32. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  33. # 请求配置
  34. self.method = self.platform_config.get("method", "GET").upper()
  35. self.url = self.platform_config.get("url")
  36. self.headers = self.platform_config.get("headers", {})
  37. self.body = self.platform_config.get("request_body", {})
  38. self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
  39. self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
  40. self.video_fields_map = self.platform_config.get("video_fields_map", {})
  41. # 流程控制配置
  42. self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
  43. self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
  44. self.logger.info(
  45. f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
  46. async def _send_async_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
  47. """
  48. 发送异步HTTP请求,支持重试机制
  49. """
  50. retries = 0
  51. timeout = aiohttp.ClientTimeout(total=self.TIMEOUT)
  52. while retries < self.MAX_RETRIES:
  53. try:
  54. async with aiohttp.ClientSession(timeout=timeout) as session:
  55. async with session.request(method, url, **kwargs) as response:
  56. response.raise_for_status()
  57. return response
  58. except Exception as e:
  59. retries += 1
  60. remaining_attempts = self.MAX_RETRIES - retries
  61. if retries < self.MAX_RETRIES:
  62. self.logger.warning(
  63. f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
  64. f"剩余尝试次数: {remaining_attempts}"
  65. )
  66. await asyncio.sleep(1) # 异步等待
  67. else:
  68. self.aliyun_logr.logging(
  69. code="5001",
  70. message="请求失败,已达到最大重试次数",
  71. data={
  72. "url": url,
  73. "method": method,
  74. "error": str(e),
  75. "headers": kwargs.get("headers", {}),
  76. "body": kwargs.get("json", {})
  77. },
  78. trace_id=self.trace_id
  79. )
  80. self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
  81. raise
  82. async def crawl_data(self) -> Optional[List[Dict]]:
  83. """异步获取视频数据"""
  84. self.logger.info(f"{self.trace_id}--开始获取视频数据")
  85. try:
  86. response = await self._send_async_request(
  87. method=self.method,
  88. url=self.url,
  89. headers=self.headers,
  90. json=self.body
  91. )
  92. result = await response.json()
  93. data = safe_extract(result, self.data_path)
  94. if not data:
  95. self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
  96. return []
  97. self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
  98. return data
  99. except Exception as e:
  100. self.logger.error(f"{self.trace_id}--获取视频数据失败: {e}")
  101. return []
  102. def filter_data(self, video: Dict) -> bool:
  103. """校验视频是否符合规则"""
  104. if not self.rule_dict:
  105. return True
  106. rule_duration = self.rule_dict.get("duration")
  107. if rule_duration:
  108. video_url = safe_extract(video, self.video_fields_map.get("video_url"))
  109. duration = self.get_video_duration(video_url)
  110. if not (rule_duration['min'] <= duration <= rule_duration['max']):
  111. return False
  112. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  113. if rule_videos_cnt:
  114. video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
  115. if video_count >= rule_videos_cnt.get("min", 0):
  116. return False
  117. return True
  118. def process_video(self, video: Dict) -> Optional[Dict]:
  119. """处理单条视频数据"""
  120. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  121. try:
  122. item = {}
  123. for field, path in self.field_map.items():
  124. value = safe_extract(video, path)
  125. if value is None:
  126. self.logger.warning(f"{self.trace_id}--字段提取失败: {field}")
  127. continue
  128. item[field] = value
  129. if not item:
  130. self.logger.warning(f"{self.trace_id}--视频处理结果为空")
  131. return None
  132. item.update({
  133. "platform": self.platform,
  134. "strategy": self.mode,
  135. "session": f"{self.platform}-{int(time.time())}"
  136. })
  137. self.logger.debug(f"{self.trace_id}--视频处理成功")
  138. return item
  139. except Exception as e:
  140. self.logger.error(f"{self.trace_id}--视频处理异常: {e}")
  141. return None
  142. def push_to_etl(self, item: Dict) -> bool:
  143. """推送数据到ETL(同步)"""
  144. self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('title', '无标题')}")
  145. try:
  146. self.mq.send_msg(item)
  147. self.aliyun_logr.logging(
  148. code="1002",
  149. message="成功发送至ETL",
  150. data=item,
  151. trace_id=self.trace_id
  152. )
  153. self.logger.info(f"{self.trace_id}--数据推送成功")
  154. return True
  155. except Exception as e:
  156. self.logger.error(f"{self.trace_id}--数据推送失败: {e}")
  157. return False
  158. async def run(self):
  159. """
  160. 异步运行爬虫任务,严格按顺序执行
  161. 1. 爬取
  162. 2. 过滤
  163. 3. 处理每条数据
  164. 4. 推送到ETL
  165. """
  166. self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
  167. for loop_index in range(1, self.loop_times + 1):
  168. self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
  169. loop_start_time = time.time()
  170. # 步骤1: 获取视频数据(失败则跳过当前循环)
  171. video_list = await self.crawl_data()
  172. if not video_list:
  173. self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
  174. await self._wait_for_next_loop(loop_index)
  175. continue
  176. # 步骤2: 处理每条视频并推送到ETL
  177. success_count = 0
  178. fail_count = 0
  179. for video in video_list:
  180. # 步骤2.1: 校验视频(失败则跳过)
  181. if not self.filter_data(video):
  182. self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
  183. continue
  184. # 步骤2.2: 处理视频(失败则记录并继续)
  185. item = self.process_video(video)
  186. if not item:
  187. self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
  188. fail_count += 1
  189. continue
  190. # 步骤2.3: 推送到ETL(失败则记录并继续)
  191. if self.push_to_etl(item):
  192. success_count += 1
  193. else:
  194. fail_count += 1
  195. loop_duration = time.time() - loop_start_time
  196. self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
  197. f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
  198. # 等待下一次循环
  199. await self._wait_for_next_loop(loop_index)
  200. self.logger.info(f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成")
  201. return True
  202. async def _wait_for_next_loop(self, current_loop: int) -> None:
  203. """等待下一次循环请求"""
  204. if current_loop < self.loop_times and self.loop_interval > 0:
  205. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  206. await asyncio.sleep(self.loop_interval)