base_spider.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import asyncio
  2. import random
  3. import time
  4. import traceback
  5. import uuid
  6. from abc import ABC
  7. from typing import List, Dict, Optional
  8. import aiohttp
  9. from core.models.video_item import VideoItem
  10. from core.utils.spider_config import SpiderConfig
  11. from core.utils.extractors import safe_extract, extract_fields
  12. from core.utils.log.logger_manager import LoggerManager
  13. from services.async_mysql_service import AsyncMysqlService
  14. from services.pipeline import PiaoQuanPipeline
  15. from core.base.async_request_client import AsyncRequestClient
  16. class BaseSpider(ABC):
  17. """
  18. 通用爬虫基类:支持严格顺序执行流程
  19. """
  20. MAX_RETRIES = 3 # 单个请求最大重试次数
  21. TIMEOUT = 30 # 请求超时时间(秒)
  22. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  23. self.trace_id = trace_id
  24. self.env = env
  25. self.user_list = user_list
  26. self.rule_dict = rule_dict
  27. self.class_name = self.__class__.__name__ # 获取子类类名
  28. # 根据类名自动获取配置
  29. self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
  30. if not self.platform_config:
  31. raise ValueError(f"找不到对应配置: {self.class_name}")
  32. # 初始化日志和MQ
  33. self.platform = self.platform_config.platform
  34. self.mode = self.platform_config.mode
  35. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  36. self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
  37. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  38. self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
  39. # 请求配置
  40. self.method = self.platform_config.method.upper()
  41. self.url = self.platform_config.url
  42. self.headers = self.platform_config.headers
  43. self.body = self.platform_config.request_body
  44. self.response =self.platform_config.response_parse
  45. self.field_map = self.response.get("fields", {})
  46. self.data_path = self.response.get("data_path")
  47. # 流程控制配置
  48. self.loop_times = self.platform_config.loop_times # 循环次数
  49. self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒)
  50. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  51. self.request_client = AsyncRequestClient(logger=self.logger)
  52. self.logger.info(
  53. f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
  54. self.session = None
  55. async def crawl_data(self,session) -> Optional[List[Dict]]:
  56. response = await self.request_client.request(
  57. session=session,
  58. method=self.method,
  59. url=self.url,
  60. headers=self.headers,
  61. json=self.body
  62. )
  63. data = safe_extract(response, self.data_path)
  64. return data if data else []
  65. async def filter_data(self, video: Dict) -> bool:
  66. """校验视频是否符合规则"""
  67. pipeline = PiaoQuanPipeline(
  68. platform=self.platform,
  69. mode=self.mode,
  70. rule_dict=self.rule_dict,
  71. env=self.env,
  72. item=video,
  73. trace_id=self.platform + str(uuid.uuid1())
  74. )
  75. return await pipeline.process_item()
  76. async def is_video_count_sufficient(self) -> bool:
  77. """
  78. 校验视频是否达到当日最大量
  79. :return:True False
  80. """
  81. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  82. if not rule_videos_cnt:
  83. return True
  84. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  85. video_count = await mysql.get_today_videos()
  86. if video_count >= rule_videos_cnt.get("min", 200):
  87. self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}")
  88. return False
  89. return True
  90. async def process_video(self, video: Dict) -> Optional[Dict]:
  91. """
  92. 处理单条视频数据,字段映射关系,必要字段检验
  93. :param video:
  94. :return:
  95. """
  96. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  97. publish_user = random.choice(self.user_list)
  98. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id)
  99. item_kwargs["user_id"] = publish_user["uid"]
  100. item_kwargs["user_name"] = publish_user["nick_name"]
  101. item_kwargs["platform"] = self.platform
  102. item_kwargs["strategy"] = self.mode
  103. try:
  104. item = VideoItem(**item_kwargs)
  105. video_dict = await item.produce_item()
  106. if not video_dict:
  107. self.logger.warning(f"{self.trace_id} 校验失败")
  108. return None
  109. return video_dict
  110. except Exception as e:
  111. self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
  112. return None
  113. async def push_to_etl(self, item: Dict) -> bool:
  114. """推送数据到ETL(同步)"""
  115. self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
  116. try:
  117. self.mq.send_msg(item)
  118. self.aliyun_logr.logging(
  119. code="1009",
  120. message="成功发送至ETL",
  121. data=item,
  122. trace_id=self.trace_id
  123. )
  124. self.logger.info(f"{self.trace_id}--数据推送成功")
  125. return True
  126. except Exception as e:
  127. self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
  128. return False
  129. async def get_today_videos(self):
  130. """
  131. 查询每天的爬虫爬取到的视频数量
  132. :return:
  133. """
  134. video_count = await self.db_service.get_today_videos()
  135. return video_count
  136. async def integrated_video_handling(self):
  137. """
  138. 视频处理
  139. :return:
  140. """
  141. pass
  142. async def run(self):
  143. """
  144. 异步运行爬虫任务,严格按顺序执行
  145. 1. 爬取
  146. 2. 处理每条数据,字段校验
  147. 3. 过滤(重复,平台规则,标题,发布时间)
  148. 4. 标题处理
  149. 5. 推送到ETL
  150. """
  151. try:
  152. total_success,total_failed= 0,0
  153. loop_start_time = time.time()
  154. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
  155. for loop_index in range(1, self.loop_times + 1):
  156. self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
  157. video_list = await self.crawl_data(session)
  158. if not video_list:
  159. self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
  160. await self._wait_for_next_loop(loop_index)
  161. continue
  162. success_count = 0
  163. fail_count = 0
  164. for video in video_list:
  165. video_obj = await self.process_video(video)
  166. if not video_obj:
  167. self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
  168. fail_count += 1
  169. continue
  170. if not await self.filter_data(video_obj):
  171. self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
  172. continue
  173. await self.integrated_video_handling()
  174. if await self.push_to_etl(video_obj):
  175. success_count += 1
  176. else:
  177. fail_count += 1
  178. total_success += success_count
  179. total_failed += fail_count
  180. loop_duration = time.time() - loop_start_time
  181. self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
  182. f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
  183. await self._wait_for_next_loop(loop_index)
  184. # 全局指标日志
  185. self.aliyun_logr.logging(
  186. code="1003",
  187. message="爬虫执行指标汇总",
  188. data={
  189. "trace_id": self.trace_id,
  190. "classname": self.platform,
  191. "success_count": total_success,
  192. "fail_count": total_failed
  193. },
  194. trace_id=self.trace_id
  195. )
  196. self.logger.info(
  197. f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
  198. return True
  199. except Exception as e:
  200. self.logger.error(f"爬虫致命错误: {e}")
  201. raise
  202. async def _wait_for_next_loop(self, current_loop: int) -> None:
  203. """等待下一次循环请求"""
  204. if current_loop < self.loop_times and self.loop_interval > 0:
  205. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  206. await asyncio.sleep(self.loop_interval)