base_spider.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import asyncio
  2. import random
  3. import traceback
  4. import uuid
  5. import aiohttp
  6. from abc import ABC
  7. from typing import List, Dict, Optional
  8. import time
  9. from core.utils.log.logger_manager import LoggerManager
  10. from services.pipeline import PiaoQuanPipeline
  11. from core.utils.extractors import safe_extract
  12. from core.utils.config_loader import ConfigLoader
  13. from services.async_mysql_service import AsyncMysqlService
  14. from core.models.video_item import VideoItem
  15. class BaseSpider(ABC):
  16. """
  17. 通用爬虫基类:支持严格顺序执行流程
  18. """
  19. MAX_RETRIES = 3 # 单个请求最大重试次数
  20. TIMEOUT = 30 # 请求超时时间(秒)
  21. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  22. self.trace_id = trace_id
  23. self.env = env
  24. self.user_list = user_list
  25. self.rule_dict = rule_dict
  26. self.class_name = self.__class__.__name__ # 获取子类类名
  27. # 根据类名自动获取配置
  28. self.platform_config = ConfigLoader.get_platform_config(platform=str(self.class_name.lower()))
  29. if not self.platform_config:
  30. raise ValueError(f"找不到对应配置: {self.class_name}")
  31. # 初始化日志和MQ
  32. self.platform = self.platform_config.get("platform")
  33. self.mode = self.platform_config.get("mode")
  34. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  35. self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
  36. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  37. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  38. # 请求配置
  39. self.method = self.platform_config.get("method", "GET").upper()
  40. self.url = self.platform_config.get("url")
  41. self.headers = self.platform_config.get("headers", {})
  42. self.body = self.platform_config.get("request_body", {})
  43. self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
  44. self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
  45. self.video_fields_map = self.platform_config.get("video_fields_map", {})
  46. # 流程控制配置
  47. self.loop_times = self.platform_config.get("loop_times", 1) # 循环次数
  48. self.loop_interval = self.platform_config.get("loop_interval", 0) # 循环间隔(秒)
  49. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  50. self.logger.info(
  51. f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
  52. self.session = None
  53. async def _send_async_request(self, session: aiohttp.ClientSession, method: str, url: str,
  54. **kwargs) -> aiohttp.ClientResponse:
  55. """
  56. 使用提供的 session 发送异步HTTP请求,支持重试机制
  57. """
  58. retries = 0
  59. self.logger.info(f"{self.trace_id}--请求准备: {method} {url}, 参数: {kwargs}")
  60. while retries < self.MAX_RETRIES:
  61. try:
  62. async with session.request(method, url, **kwargs) as response:
  63. response.raise_for_status()
  64. self.logger.info(f"{self.trace_id}--请求成功: {response.status}")
  65. return await response.json()
  66. except Exception as e:
  67. retries += 1
  68. remaining_attempts = self.MAX_RETRIES - retries
  69. if retries < self.MAX_RETRIES:
  70. self.logger.warning(
  71. f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
  72. f"剩余尝试次数: {remaining_attempts}"
  73. )
  74. await asyncio.sleep(1)
  75. else:
  76. self.aliyun_logr.logging(
  77. code="5001",
  78. message="请求失败,已达到最大重试次数",
  79. data={
  80. "url": url,
  81. "method": method,
  82. "error": str(e),
  83. "headers": kwargs.get("headers", {}),
  84. "body": kwargs.get("json", {})
  85. },
  86. trace_id=self.trace_id
  87. )
  88. self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
  89. raise
  90. async def crawl_data(self) -> Optional[List[Dict]]:
  91. """异步获取视频数据"""
  92. self.logger.info(f"{self.trace_id}--开始获取视频数据")
  93. try:
  94. response = await self._send_async_request(
  95. session=self.session,
  96. method=self.method,
  97. url=self.url,
  98. headers=self.headers,
  99. json=self.body
  100. )
  101. self.logger.debug(f"{self.trace_id}--响应结果: {response}")
  102. data = safe_extract(response, self.data_path)
  103. if not data:
  104. self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
  105. return []
  106. self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
  107. return data
  108. except Exception as e:
  109. traceback.extract_stack()
  110. self.logger.exception(f"{self.trace_id}--获取视频数据失败: {e}")
  111. return []
  112. async def filter_data(self, video: Dict) -> bool:
  113. """校验视频是否符合规则"""
  114. pipeline = PiaoQuanPipeline(
  115. platform=self.platform,
  116. mode=self.mode,
  117. rule_dict=self.rule_dict,
  118. env=self.env,
  119. item=video,
  120. trace_id=self.platform + str(uuid.uuid1())
  121. )
  122. return await pipeline.process_item()
  123. async def is_video_count_sufficient(self) -> bool:
  124. """
  125. 校验视频是否达到当日最大量
  126. :return:True False
  127. """
  128. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  129. if not rule_videos_cnt:
  130. return True
  131. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  132. video_count = await mysql.get_today_videos()
  133. if video_count >= rule_videos_cnt.get("min", 200):
  134. self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}")
  135. return False
  136. return True
  137. async def process_video(self, video: Dict) -> Optional[Dict]:
  138. """
  139. 处理单条视频数据,字段映射关系,必要字段检验
  140. :param video:
  141. :return:
  142. """
  143. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  144. publish_user = random.choice(self.user_list)
  145. try:
  146. # 从 field_map 中动态构建 VideoItem 初始化参数
  147. item_kwargs = {}
  148. for field, path in self.field_map.items():
  149. if not isinstance(path, str) or not path.startswith("$"):
  150. item_kwargs[field] = path
  151. continue
  152. value = safe_extract(video, path)
  153. if value is None:
  154. self.logger.warning(f"{self.trace_id}--字段提取失败: {field} 路径: {path}")
  155. continue
  156. item_kwargs[field] = value
  157. item_kwargs["user_id"] = publish_user["uid"]
  158. item_kwargs["user_name"] = publish_user["nick_name"]
  159. # 手动注入 platform 与 strategy
  160. item_kwargs["platform"] = self.platform
  161. item_kwargs["strategy"] = self.mode
  162. try:
  163. item = VideoItem(**item_kwargs)
  164. except Exception as e:
  165. self.logger.warning(f"{self.trace_id}--VideoItem 初始化失败: {e}, 数据: {item_kwargs}")
  166. return None
  167. video_dict = await item.produce_item()
  168. if not video_dict:
  169. self.logger.warning(f"{self.trace_id}--VideoItem 校验失败")
  170. return None
  171. return video_dict
  172. except Exception as e:
  173. self.logger.exception(f"{self.trace_id}--视频处理异常: {e}")
  174. return None
  175. async def push_to_etl(self, item: Dict) -> bool:
  176. """推送数据到ETL(同步)"""
  177. self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
  178. try:
  179. self.mq.send_msg(item)
  180. self.aliyun_logr.logging(
  181. code="1009",
  182. message="成功发送至ETL",
  183. data=item,
  184. trace_id=self.trace_id
  185. )
  186. self.logger.info(f"{self.trace_id}--数据推送成功")
  187. return True
  188. except Exception as e:
  189. self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
  190. return False
  191. async def get_today_videos(self):
  192. """
  193. 查询每天的爬虫爬取到的视频数量
  194. :return:
  195. """
  196. video_count = self.db_service.get_today_videos()
  197. return video_count
  198. async def integrated_video_handling(self):
  199. """
  200. 视频处理
  201. :return:
  202. """
  203. pass
  204. async def run(self):
  205. """
  206. 异步运行爬虫任务,严格按顺序执行
  207. 1. 爬取
  208. 2. 处理每条数据,字段校验
  209. 3. 过滤(重复,平台规则,标题,发布时间)
  210. 4. 标题处理
  211. 5. 推送到ETL
  212. """
  213. try:
  214. self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
  215. total_success = 0
  216. total_failed = 0
  217. async with aiohttp.ClientSession(
  218. timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)
  219. ) as session: # 上下文管理
  220. self.session = session
  221. for loop_index in range(1, self.loop_times + 1):
  222. if not await self.is_video_count_sufficient():
  223. return
  224. self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
  225. loop_start_time = time.time()
  226. video_list = await self.crawl_data()
  227. if not video_list:
  228. self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
  229. await self._wait_for_next_loop(loop_index)
  230. continue
  231. success_count = 0
  232. fail_count = 0
  233. for video in video_list:
  234. video_obj = await self.process_video(video)
  235. if not video_obj:
  236. self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
  237. fail_count += 1
  238. continue
  239. if not await self.filter_data(video_obj):
  240. self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
  241. continue
  242. await self.integrated_video_handling()
  243. if await self.push_to_etl(video_obj):
  244. success_count += 1
  245. else:
  246. fail_count += 1
  247. total_success += success_count
  248. total_failed += fail_count
  249. loop_duration = time.time() - loop_start_time
  250. self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
  251. f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
  252. await self._wait_for_next_loop(loop_index)
  253. # 全局指标日志
  254. self.aliyun_logr.logging(
  255. code="1003",
  256. message="爬虫执行指标汇总",
  257. data={
  258. "trace_id": self.trace_id,
  259. "platform": self.platform,
  260. "success_count": total_success,
  261. "fail_count": total_failed
  262. },
  263. trace_id=self.trace_id
  264. )
  265. self.logger.info(
  266. f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
  267. return True
  268. except Exception as e:
  269. self.logger.error(f"爬虫致命错误: {e}")
  270. raise
  271. async def _wait_for_next_loop(self, current_loop: int) -> None:
  272. """等待下一次循环请求"""
  273. if current_loop < self.loop_times and self.loop_interval > 0:
  274. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  275. await asyncio.sleep(self.loop_interval)