base_spider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import asyncio
  2. import uuid
  3. from typing import List, Dict, Optional, Any
  4. import aiohttp
  5. from core.models.video_item import VideoItem
  6. from core.utils.helpers import generate_titles
  7. from core.utils.spider_config import SpiderConfig
  8. from core.utils.extractors import safe_extract, extract_fields
  9. from core.utils.log.logger_manager import LoggerManager
  10. from core.utils.template_resolver import resolve_request_body_template
  11. from services.async_mysql_service import AsyncMysqlService
  12. from services.pipeline import PiaoQuanPipeline
  13. from core.base.async_request_client import AsyncRequestClient
  14. from services.async_mq_producer import AsyncMQProducer
  15. class BaseSpider:
  16. """
  17. 通用爬虫基类,支持:
  18. - 依赖请求参数动态替换(cursor 或其它参数)
  19. - 支持单请求和依赖请求的分页抓取
  20. - 统一日志、MQ推送、异常捕获、异步请求
  21. 子类只需根据业务重写少量方法,如 process_video/process_item。
  22. """
  23. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  24. self.trace_id = trace_id
  25. self.env = env
  26. self.user_list = user_list
  27. self.rule_dict = rule_dict
  28. self.class_name = self.__class__.__name__.lower()
  29. # 读取配置
  30. self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
  31. if not self.platform_config:
  32. raise ValueError(f"找不到对应配置: {self.class_name}")
  33. self.platform = self.platform_config.platform
  34. self.mode = self.platform_config.mode
  35. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  36. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  37. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
  38. self.method = self.platform_config.method.upper()
  39. self.url = self.platform_config.url
  40. self.headers = self.platform_config.headers or {}
  41. self.request_body_template = self.platform_config.request_body or {}
  42. self.response_parse = self.platform_config.response_parse or {}
  43. self.next_cursor_path = self.response_parse.get("next_cursor")
  44. self.data_path = self.response_parse.get("data_path")
  45. self.field_map = self.response_parse.get("fields", {})
  46. self.loop_times = self.platform_config.loop_times or 100
  47. self.loop_interval = self.platform_config.loop_interval or 5
  48. self.feishu_sheetid = self.platform_config.feishu_sheetid
  49. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  50. self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_logr)
  51. self.timeout = 30
  52. self.max_retries = 3
  53. # 当前分页游标,默认空字符串,支持动态替换request_body中任何字段(如cursor)
  54. self.dynamic_params = {key: "" for key in self.request_body_template.keys()}
  55. # 允许子类重写,支持多游标等复杂情况
  56. self.current_cursor = ""
  57. self.download_cnt = 0
  58. self.limit_flag = False
  59. async def run(self):
  60. """ 爬虫主流程 """
  61. await self.before_run()
  62. total_success, total_fail = 0, 0
  63. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  64. for loop_index in range(self.loop_times):
  65. if self.limit_flag:
  66. self.logger.info(f"{self.trace_id} 已达到抓取限制,停止爬虫")
  67. break
  68. if not await self.is_video_count_sufficient():
  69. self.logger.info(f"{self.trace_id} 视频抓取数量已达上限,提前结束")
  70. break
  71. succ, fail = await self.run_single_loop(session)
  72. total_success += succ
  73. total_fail += fail
  74. await self._wait_for_next_loop(loop_index + 1)
  75. await self.after_run()
  76. self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_fail}")
  77. async def run_single_loop(self, session) -> (int, int):
  78. """
  79. 单轮请求与处理
  80. """
  81. success_count, fail_count = 0, 0
  82. try:
  83. videos = await self.crawl_data(session)
  84. if not videos:
  85. self.logger.info(f"{self.trace_id} 无数据返回,停止本轮")
  86. return success_count, fail_count
  87. for video in videos:
  88. # 依赖接口请求
  89. video_obj = await self.fetch_dependent_data(video)
  90. res = await self.process_and_push_video(video_obj)
  91. if res:
  92. success_count += 1
  93. else:
  94. fail_count += 1
  95. if self.limit_flag:
  96. break
  97. except Exception as e:
  98. self.logger.exception(f"{self.trace_id} 运行异常: {e}")
  99. return success_count, fail_count
  100. async def fetch_dependent_data(self, video: Dict) -> Dict:
  101. """
  102. 可在子类重写以实现依赖请求,用返回结果补充原有 video。
  103. 默认不做处理。
  104. """
  105. return video
  106. async def crawl_data(self, session) -> Optional[List[Dict]]:
  107. """
  108. 请求接口,自动渲染动态参数,自动更新游标
  109. 支持单请求和多请求(分页)逻辑。
  110. """
  111. # 动态渲染请求体
  112. # resolved_body = self._render_request_body()
  113. # 发送请求
  114. response = await self.request_client.request(
  115. session=session,
  116. method=self.method,
  117. url=self.url,
  118. headers=self.headers,
  119. json= self.dynamic_params
  120. )
  121. if not response:
  122. self.logger.error(f"{self.trace_id} 响应为空")
  123. return []
  124. # 更新游标(支持动态参数更新)
  125. if self.next_cursor_path:
  126. next_cursor = safe_extract(response, self.next_cursor_path) or ""
  127. self._update_cursor(next_cursor)
  128. # 解析数据列表
  129. data_list = safe_extract(response, self.data_path)
  130. if not data_list:
  131. self.logger.info(f"{self.trace_id} 未获取到有效数据")
  132. return []
  133. return data_list
  134. def _render_request_body(self) -> Dict:
  135. """
  136. 用当前动态参数渲染请求体模板,支持多参数动态替换
  137. """
  138. body = {}
  139. for k, v in self.request_body_template.items():
  140. if isinstance(v, str) and v.startswith("{{") and v.endswith("}}"):
  141. key = v.strip("{} ")
  142. body[k] = self.dynamic_params.get(key, "")
  143. else:
  144. body[k] = v
  145. return body
  146. def _update_cursor(self, cursor_value: str):
  147. """
  148. 更新分页游标并动态参数,方便下一次请求使用
  149. """
  150. self.current_cursor = cursor_value
  151. # 如果配置的游标字段在请求体中,更新动态参数
  152. if "cursor" in self.dynamic_params:
  153. self.dynamic_params["cursor"] = cursor_value
  154. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  155. """
  156. 数据处理完整流程(字段映射 -> 校验 -> 推送)
  157. 子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
  158. """
  159. try:
  160. video_obj = await self.process_video(video)
  161. if not video_obj:
  162. return False
  163. if not await self.filter_data(video_obj):
  164. return False
  165. await self.integrated_video_handling(video_obj)
  166. pushed = await self.push_to_etl(video_obj)
  167. # 达到下载上限,停止继续抓取
  168. if self.rule_dict.get("videos_cnt", {}).get("min") and \
  169. self.download_cnt >= self.rule_dict["videos_cnt"]["min"]:
  170. self.limit_flag = True
  171. if pushed:
  172. self.download_cnt += 1
  173. return pushed
  174. except Exception as e:
  175. self.logger.exception(f"{self.trace_id} 视频处理异常: {e}")
  176. return False
  177. async def process_video(self, video: Dict) -> Optional[Dict]:
  178. """
  179. 统一字段抽取及 VideoItem 初始化
  180. 子类可重写或扩展以定制字段映射、过滤等
  181. """
  182. self.logger.debug(f"{self.trace_id} 处理视频数据: {video.get('title', '无标题')}")
  183. publish_user = None
  184. if self.user_list:
  185. import random
  186. publish_user = random.choice(self.user_list)
  187. else:
  188. publish_user = {"uid": "default", "nick_name": "default_user"}
  189. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id, aliyun_log=self.aliyun_logr)
  190. item_kwargs.update({
  191. "user_id": publish_user.get("uid"),
  192. "user_name": publish_user.get("nick_name"),
  193. "platform": self.platform,
  194. "strategy": self.mode
  195. })
  196. try:
  197. item = VideoItem(**item_kwargs)
  198. video_dict = await item.produce_item()
  199. if not video_dict:
  200. self.logger.warning(f"{self.trace_id} VideoItem 校验失败")
  201. return None
  202. return video_dict
  203. except Exception as e:
  204. self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
  205. return None
  206. async def filter_data(self, video: Dict) -> bool:
  207. """
  208. 数据校验过滤,默认使用 PiaoQuanPipeline
  209. 子类可重写此方法实现自定义过滤
  210. """
  211. pipeline = PiaoQuanPipeline(
  212. platform=self.platform,
  213. mode=self.mode,
  214. rule_dict=self.rule_dict,
  215. env=self.env,
  216. item=video,
  217. trace_id=self.platform + str(uuid.uuid1())
  218. )
  219. return await pipeline.process_item()
  220. async def integrated_video_handling(self, video: Dict) -> None:
  221. """
  222. 钩子函数:可在此实现自动生成标题或其他业务逻辑
  223. """
  224. await generate_titles(self.feishu_sheetid, video)
  225. async def push_to_etl(self, video: Dict) -> bool:
  226. """
  227. 推送消息到消息队列ETL
  228. """
  229. try:
  230. await self.mq_producer.send_msg(video)
  231. self.logger.info(f"{self.trace_id} 成功推送视频至ETL")
  232. return True
  233. except Exception as e:
  234. self.logger.exception(f"{self.trace_id} 推送ETL失败: {e}")
  235. return False
  236. async def is_video_count_sufficient(self) -> bool:
  237. """
  238. 判断当天抓取的视频是否已达到上限,达到则停止继续抓取
  239. """
  240. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  241. if max_count <= 0:
  242. return True
  243. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  244. current_count = await mysql.get_today_videos()
  245. if current_count >= max_count:
  246. self.logger.info(f"{self.trace_id} 今日视频已达上限: {current_count}")
  247. self.aliyun_logr.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
  248. return False
  249. return True
  250. async def _wait_for_next_loop(self, current_loop: int) -> None:
  251. """等待下次循环"""
  252. if current_loop < self.loop_times and self.loop_interval > 0:
  253. self.logger.info(f"{self.trace_id} 等待 {self.loop_interval} 秒后进行下一次请求")
  254. await asyncio.sleep(self.loop_interval)
  255. async def before_run(self):
  256. """运行前预处理钩子,子类可覆盖"""
  257. pass
  258. async def after_run(self):
  259. """运行后处理钩子,子类可覆盖"""
  260. pass