base_spider.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import asyncio
  2. import uuid
  3. from typing import List, Dict, Optional, Any
  4. import aiohttp
  5. from core.models.video_item import VideoItem
  6. from core.utils.helpers import generate_titles
  7. from core.utils.request_preparer import RequestPreparer
  8. from core.utils.spider_config import SpiderConfig
  9. from core.utils.extractors import safe_extract, extract_fields
  10. from core.utils.log.logger_manager import LoggerManager
  11. from services.async_mysql_service import AsyncMysqlService
  12. from services.pipeline import PiaoQuanPipeline
  13. from core.base.async_request_client import AsyncRequestClient
  14. from services.async_mq_producer import AsyncMQProducer
  15. class BaseSpider:
  16. """
  17. 通用爬虫基类,支持:
  18. - 依赖请求参数动态替换(cursor 或其它参数)
  19. - 支持单请求和依赖请求的分页抓取
  20. - 统一日志、MQ推送、异常捕获、异步请求
  21. 子类只需根据业务重写少量方法,如 process_video/process_item。
  22. """
  23. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  24. self.rule_dict = rule_dict
  25. self.user_list = user_list
  26. self.env = env
  27. self.class_name = self.__class__.__name__.lower()
  28. # --- 1. 初始化核心组件 ---
  29. self._setup_configuration()
  30. self._setup_logging()
  31. self._setup_services()
  32. self._setup_state()
  33. # 初始化辅助方法
  34. def _setup_configuration(self):
  35. """加载并设置爬虫的核心配置。"""
  36. self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
  37. if not self.platform_config:
  38. raise ValueError(f"找不到爬虫配置: {self.class_name}")
  39. self.platform = self.platform_config.platform
  40. self.mode = self.platform_config.mode
  41. self.url = self.platform_config.url
  42. self.method = self.platform_config.method.upper()
  43. self.headers = self.platform_config.headers or {}
  44. # 请求和解析相关的配置
  45. self.request_body_template = self.platform_config.request_body or {}
  46. self.response_parse_config = self.platform_config.response_parse or {}
  47. self.data_path = self.response_parse_config.get("data_path")
  48. # self.next_cursor_path = self.response_parse_config.get("next_cursor")
  49. self.field_map = self.response_parse_config.get("fields", {})
  50. # 爬取行为相关的配置
  51. self.loop_times = self.platform_config.loop_times or 100
  52. self.loop_interval = self.platform_config.loop_interval or 5
  53. self.timeout = self.platform_config.request_timeout or 30
  54. self.max_retries = self.platform_config.max_retries or 3
  55. self.feishu_sheetid = self.platform_config.feishu_sheetid
  56. def _setup_logging(self):
  57. """初始化日志记录器。"""
  58. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  59. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  60. self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
  61. self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔: {self.loop_interval}s")
  62. def _setup_services(self):
  63. """初始化外部服务客户端。"""
  64. self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
  65. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  66. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform,
  67. mode=self.mode)
  68. def _setup_state(self):
  69. """初始化爬虫的内部状态。"""
  70. self.last_response_data = {}
  71. self.request_preparer = RequestPreparer(
  72. response_parse_config=self.response_parse_config,
  73. logger=self.logger,
  74. aliyun_log=self.aliyun_log
  75. )
  76. async def run(self):
  77. """ 爬虫主流程 """
  78. await self.before_run()
  79. total_success, total_fail = 0, 0
  80. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  81. for loop_index in range(self.loop_times):
  82. if not await self.is_video_count_sufficient():
  83. self.logger.info(f"视频抓取数量已达上限,停止爬取")
  84. return
  85. succ, fail = await self.run_single_loop(session)
  86. total_success += succ
  87. total_fail += fail
  88. await self._wait_for_next_loop(loop_index + 1)
  89. self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
  90. async def run_single_loop(self, session) -> (int, int):
  91. """
  92. 执行单轮的请求、解析和处理。
  93. 返回: (本轮成功处理的数量, 本轮失败处理的数量)
  94. """
  95. success_count, fail_count = 0, 0
  96. try:
  97. # 爬取数据
  98. videos = await self.crawl_data(session)
  99. if not videos:
  100. self.logger.info(f"无数据返回,停止本轮")
  101. return success_count, fail_count
  102. for video in videos:
  103. # 依赖接口请求
  104. video_obj = await self.fetch_dependent_data(video)
  105. res = await self.process_and_push_video(video_obj)
  106. if res:
  107. success_count += 1
  108. else:
  109. fail_count += 1
  110. self.logger.info(f"接口返回<{len(videos)}>条视频,处理成功<{success_count}>条,处理失败:<{fail_count}>")
  111. await self.after_run()
  112. except Exception as e:
  113. self.logger.exception(f"运行异常: {e}")
  114. return success_count, fail_count
  115. async def fetch_dependent_data(self, video: Dict) -> Dict:
  116. """
  117. 可在子类重写以实现依赖请求,用返回结果补充原有 video。
  118. 默认不做处理。
  119. """
  120. return video
  121. async def crawl_data(self, session) -> Optional[List[Dict]]:
  122. """
  123. 请求接口,自动渲染动态参数,自动更新游标
  124. 支持单请求和多请求(分页)逻辑。
  125. """
  126. request_body = self.request_preparer.prepare(self.request_body_template,
  127. self.last_response_data)
  128. # 发送请求
  129. response = await self.request_client.request(
  130. session=session,
  131. method=self.method,
  132. url=self.url,
  133. headers=self.headers,
  134. json = request_body
  135. )
  136. if not response:
  137. self.logger.error(f"响应为空")
  138. return
  139. self.last_response_data = response
  140. # 解析数据列表
  141. data_list = safe_extract(response, self.data_path)
  142. if not data_list:
  143. self.logger.info(f"接口返回视频列表为空{response}")
  144. self.aliyun_log.logging(
  145. code="9021",
  146. message="接口返回视频列表为空",
  147. data= response
  148. )
  149. return
  150. return data_list
  151. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  152. """
  153. 数据处理完整流程(字段映射 -> 校验 -> 推送)
  154. 子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
  155. """
  156. try:
  157. # 字段映射
  158. video_obj = await self.process_video(video)
  159. if not video_obj:
  160. return False
  161. if not await self.filter_data(video_obj):
  162. return False
  163. await self.integrated_video_handling(video_obj)
  164. pushed = await self.push_to_etl(video_obj)
  165. return pushed
  166. except Exception as e:
  167. self.logger.exception(f"视频处理异常: {e}")
  168. return False
  169. async def process_video(self, video: Dict) -> Optional[Dict]:
  170. """
  171. 统一字段抽取及 VideoItem 初始化
  172. 子类可重写或扩展以定制字段映射、过滤等
  173. """
  174. self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}")
  175. if self.user_list:
  176. import random
  177. publish_user = random.choice(self.user_list)
  178. else:
  179. self.logger.error(f"未获取到用户列表数据{self.user_list}")
  180. return
  181. item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log)
  182. item_kwargs.update({
  183. "user_id": publish_user.get("uid"),
  184. "user_name": publish_user.get("nick_name"),
  185. "platform": self.platform,
  186. "strategy": self.mode,
  187. })
  188. try:
  189. item = VideoItem(**item_kwargs)
  190. video_dict = await item.produce_item()
  191. if not video_dict:
  192. self.logger.warning(f"VideoItem 校验失败")
  193. return None
  194. return video_dict
  195. except Exception as e:
  196. self.logger.error(f"VideoItem 初始化失败: {e}")
  197. return None
  198. async def filter_data(self, video: Dict) -> bool:
  199. """
  200. 数据校验过滤,默认使用 PiaoQuanPipeline
  201. 子类可重写此方法实现自定义过滤
  202. """
  203. pipeline = PiaoQuanPipeline(
  204. platform=self.platform,
  205. mode=self.mode,
  206. rule_dict=self.rule_dict,
  207. env=self.env,
  208. item=video,
  209. trace_id=self.platform + str(uuid.uuid1())
  210. )
  211. return await pipeline.process_item()
  212. async def integrated_video_handling(self, video: Dict) -> None:
  213. """
  214. 钩子函数:可在此实现自动生成标题或其他业务逻辑
  215. """
  216. await generate_titles(self.feishu_sheetid, video)
  217. async def push_to_etl(self, video: Dict) -> bool:
  218. """
  219. 推送消息到消息队列ETL
  220. """
  221. try:
  222. await self.mq_producer.send_msg(video)
  223. self.logger.info(f"成功推送视频至ETL")
  224. return True
  225. except Exception as e:
  226. self.logger.exception(f"推送ETL失败: {e}")
  227. return False
  228. async def is_video_count_sufficient(self) -> bool:
  229. """
  230. 判断当天抓取的视频是否已达到上限,达到则停止继续抓取
  231. """
  232. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  233. if max_count <= 0:
  234. return True
  235. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  236. current_count = await mysql.get_today_videos()
  237. if current_count >= max_count:
  238. self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
  239. return False
  240. self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
  241. self.aliyun_log.logging(code="1012",
  242. message=f"目前入库量{current_count}",
  243. data=f"{current_count}/{max_count}"
  244. )
  245. return True
  246. async def _wait_for_next_loop(self, current_loop: int) -> None:
  247. """等待下次循环"""
  248. if current_loop < self.loop_times and self.loop_interval > 0:
  249. self.logger.info(f"等待 {self.loop_interval} 秒后进行下一次请求")
  250. await asyncio.sleep(self.loop_interval)
  251. async def before_run(self):
  252. """运行前预处理钩子,子类可覆盖"""
  253. pass
  254. async def after_run(self):
  255. """运行后处理钩子,子类可覆盖"""
  256. pass