base_spider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import asyncio
  2. import random
  3. import uuid
  4. from typing import List, Dict, Optional, Any
  5. import aiohttp
  6. from core.models.video_item import VideoItem
  7. from core.utils.helpers import generate_titles
  8. from core.utils.request_preparer import RequestPreparer
  9. from core.utils.spider_config import SpiderConfig
  10. from core.utils.extractors import safe_extract, extract_fields
  11. from core.utils.log.logger_manager import LoggerManager
  12. from services.async_mysql_service import AsyncMysqlService
  13. from services.pipeline import PiaoQuanPipeline
  14. from core.base.async_request_client import AsyncRequestClient
  15. from services.async_mq_producer import AsyncMQProducer
  16. class BaseSpider:
  17. """
  18. 通用爬虫基类,支持:
  19. - 依赖请求参数动态替换(cursor 或其它参数)
  20. - 支持单请求和依赖请求的分页抓取
  21. - 统一日志、MQ推送、异常捕获、异步请求
  22. 子类只需根据业务重写少量方法,如 process_video/process_item。
  23. """
  24. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  25. self.rule_dict = rule_dict
  26. self.user_list = user_list
  27. self.env = env
  28. self.class_name = self.__class__.__name__.lower()
  29. # --- 1. 初始化核心组件 ---
  30. self._setup_configuration()
  31. self._setup_logging()
  32. self._setup_services()
  33. self._setup_state()
  34. # 初始化辅助方法
  35. def _setup_configuration(self):
  36. """加载并设置爬虫的核心配置。"""
  37. self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
  38. if not self.platform_config:
  39. raise ValueError(f"找不到爬虫配置: {self.class_name}")
  40. self.platform = self.platform_config.platform
  41. self.mode = self.platform_config.mode
  42. self.url = self.platform_config.url
  43. self.method = self.platform_config.method.upper()
  44. self.headers = self.platform_config.headers or {}
  45. # 请求和解析相关的配置
  46. self.request_body_template = self.platform_config.request_body or {}
  47. self.response_parse_config = self.platform_config.response_parse or {}
  48. self.data_path = self.response_parse_config.get("data_path")
  49. # self.next_cursor_path = self.response_parse_config.get("next_cursor")
  50. self.field_map = self.response_parse_config.get("fields", {})
  51. # 爬取行为相关的配置
  52. self.loop_times = self.platform_config.loop_times or 100
  53. self.loop_interval = self.platform_config.loop_interval
  54. self.timeout = self.platform_config.request_timeout or 30
  55. self.max_retries = self.platform_config.max_retries or 3
  56. self.feishu_sheetid = self.platform_config.feishu_sheetid
  57. def _setup_logging(self):
  58. """初始化日志记录器。"""
  59. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  60. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  61. self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
  62. self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔时间: {self.loop_interval}")
  63. def _setup_services(self):
  64. """初始化外部服务客户端。"""
  65. self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
  66. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  67. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform,
  68. mode=self.mode)
  69. def _setup_state(self):
  70. """初始化爬虫的内部状态。"""
  71. self.last_response_data = {}
  72. self.request_preparer = RequestPreparer(
  73. response_parse_config=self.response_parse_config,
  74. logger=self.logger,
  75. aliyun_log=self.aliyun_log
  76. )
  77. async def run(self):
  78. """ 爬虫主流程 """
  79. await self.before_run()
  80. total_success, total_fail = 0, 0
  81. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  82. for loop_index in range(self.loop_times):
  83. if not await self.is_video_count_sufficient():
  84. self.logger.info(f"视频抓取数量已达上限,停止爬取")
  85. return
  86. succ, fail = await self.run_single_loop(session)
  87. total_success += succ
  88. total_fail += fail
  89. await self._wait_for_next_loop(loop_index + 1)
  90. self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
  91. async def run_single_loop(self, session) -> (int, int):
  92. """
  93. 执行单轮的请求、解析和处理。
  94. 返回: (本轮成功处理的数量, 本轮失败处理的数量)
  95. """
  96. success_count, fail_count = 0, 0
  97. try:
  98. # 爬取数据
  99. videos = await self.crawl_data(session)
  100. if not videos:
  101. self.logger.info(f"无数据返回,停止本轮")
  102. return success_count, fail_count
  103. for video in videos:
  104. # 依赖接口请求
  105. video_obj = await self.fetch_dependent_data(video)
  106. res = await self.process_and_push_video(video_obj)
  107. if res:
  108. success_count += 1
  109. else:
  110. fail_count += 1
  111. self.logger.info(f"接口返回<{len(videos)}>条视频,处理成功<{success_count}>条,处理失败:<{fail_count}>")
  112. await self.after_run()
  113. except Exception as e:
  114. self.logger.exception(f"运行异常: {e}")
  115. return success_count, fail_count
  116. async def fetch_dependent_data(self, video: Dict) -> Dict:
  117. """
  118. 可在子类重写以实现依赖请求,用返回结果补充原有 video。
  119. 默认不做处理。
  120. """
  121. return video
  122. async def crawl_data(self, session) -> Optional[List[Dict]]:
  123. """
  124. 请求接口,自动渲染动态参数,自动更新游标
  125. 支持单请求和多请求(分页)逻辑。
  126. """
  127. request_body = self.request_preparer.prepare(self.request_body_template,
  128. self.last_response_data)
  129. # 发送请求
  130. response = await self.request_client.request(
  131. session=session,
  132. method=self.method,
  133. url=self.url,
  134. headers=self.headers,
  135. json = request_body
  136. )
  137. if not response:
  138. self.logger.error(f"响应为空")
  139. return
  140. self.last_response_data = response
  141. # 解析数据列表
  142. data_list = safe_extract(response, self.data_path)
  143. if not data_list:
  144. self.logger.info(f"接口返回视频列表为空{response}")
  145. self.aliyun_log.logging(
  146. code="9021",
  147. message="接口返回视频列表为空",
  148. data= response
  149. )
  150. return
  151. return data_list
  152. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  153. """
  154. 数据处理完整流程(字段映射 -> 校验 -> 推送)
  155. 子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
  156. """
  157. try:
  158. # 字段映射
  159. video_obj = await self.process_video(video)
  160. if not video_obj:
  161. return False
  162. if not await self.filter_data(video_obj):
  163. return False
  164. await self.integrated_video_handling(video_obj)
  165. pushed = await self.push_to_etl(video_obj)
  166. return pushed
  167. except Exception as e:
  168. self.logger.exception(f"视频处理异常: {e}")
  169. return False
  170. async def process_video(self, video: Dict) -> Optional[Dict]:
  171. """
  172. 统一字段抽取及 VideoItem 初始化
  173. 子类可重写或扩展以定制字段映射、过滤等
  174. """
  175. self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}")
  176. if self.user_list:
  177. import random
  178. publish_user = random.choice(self.user_list)
  179. else:
  180. self.logger.error(f"未获取到用户列表数据{self.user_list}")
  181. return
  182. item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log)
  183. item_kwargs.update({
  184. "user_id": publish_user.get("uid"),
  185. "user_name": publish_user.get("nick_name"),
  186. "platform": self.platform,
  187. "strategy": self.mode,
  188. })
  189. try:
  190. item = VideoItem(**item_kwargs)
  191. video_dict = await item.produce_item()
  192. if not video_dict:
  193. self.logger.warning(f"VideoItem 校验失败")
  194. return None
  195. return video_dict
  196. except Exception as e:
  197. self.logger.error(f"VideoItem 初始化失败: {e}")
  198. return None
  199. async def filter_data(self, video: Dict) -> bool:
  200. """
  201. 数据校验过滤,默认使用 PiaoQuanPipeline
  202. 子类可重写此方法实现自定义过滤
  203. """
  204. pipeline = PiaoQuanPipeline(
  205. platform=self.platform,
  206. mode=self.mode,
  207. rule_dict=self.rule_dict,
  208. env=self.env,
  209. item=video,
  210. trace_id=self.platform + str(uuid.uuid1())
  211. )
  212. return await pipeline.process_item()
  213. async def integrated_video_handling(self, video: Dict) -> None:
  214. """
  215. 钩子函数:可在此实现自动生成标题或其他业务逻辑
  216. """
  217. await generate_titles(self.feishu_sheetid, video)
  218. async def push_to_etl(self, video: Dict) -> bool:
  219. """
  220. 推送消息到消息队列ETL
  221. """
  222. try:
  223. await self.mq_producer.send_msg(video)
  224. self.logger.info(f"成功推送视频至ETL")
  225. return True
  226. except Exception as e:
  227. self.logger.exception(f"推送ETL失败: {e}")
  228. return False
  229. async def is_video_count_sufficient(self) -> bool:
  230. """
  231. 判断当天抓取的视频是否已达到上限,达到则停止继续抓取
  232. """
  233. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  234. if max_count <= 0:
  235. return True
  236. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  237. current_count = await mysql.get_today_videos()
  238. if current_count >= max_count:
  239. self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
  240. return False
  241. self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
  242. self.aliyun_log.logging(code="1012",
  243. message=f"目前入库量{current_count}",
  244. data=f"{current_count}/{max_count}"
  245. )
  246. return True
  247. async def _wait_for_next_loop(self, current_loop: int) -> None:
  248. """等待下次循环"""
  249. if current_loop < self.loop_times:
  250. wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
  251. self.logger.info(f"等待 {wait_time} 秒后进行下一次请求")
  252. await asyncio.sleep(wait_time)
  253. async def before_run(self):
  254. """运行前预处理钩子,子类可覆盖"""
  255. pass
  256. async def after_run(self):
  257. """运行后处理钩子,子类可覆盖"""
  258. pass