base_spider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import asyncio
  2. import random
  3. import time
  4. import traceback
  5. import uuid
  6. from abc import ABC
  7. from typing import List, Dict, Optional
  8. import aiohttp
  9. from core.models.video_item import VideoItem
  10. from core.utils.helpers import generate_titles
  11. from core.utils.spider_config import SpiderConfig
  12. from core.utils.extractors import safe_extract, extract_fields
  13. from core.utils.log.logger_manager import LoggerManager
  14. from core.utils.template_resolver import resolve_request_body_template
  15. from services.async_mysql_service import AsyncMysqlService
  16. from services.pipeline import PiaoQuanPipeline
  17. from core.base.async_request_client import AsyncRequestClient
  18. from services.async_mq_producer import AsyncMQProducer
  19. import re
  20. PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
  21. class BaseSpider(ABC):
  22. """
  23. 通用爬虫基类:支持严格顺序执行流程
  24. """
  25. MAX_RETRIES = 3 # 单个请求最大重试次数
  26. TIMEOUT = 30 # 请求超时时间(秒)
  27. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  28. self.trace_id = trace_id
  29. self.env = env
  30. self.user_list = user_list
  31. self.rule_dict = rule_dict
  32. self.class_name = self.__class__.__name__ # 获取子类类名
  33. # 根据类名自动获取配置
  34. self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
  35. if not self.platform_config:
  36. raise ValueError(f"找不到对应配置: {self.class_name}")
  37. # 初始化日志和MQ
  38. self.platform = self.platform_config.platform
  39. self.mode = self.platform_config.mode
  40. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  41. self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
  42. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  43. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
  44. # 请求配置
  45. self.method = self.platform_config.method.upper()
  46. self.url = self.platform_config.url
  47. self.headers = self.platform_config.headers
  48. self.body = self.platform_config.request_body
  49. self.response =self.platform_config.response_parse
  50. self.field_map = self.response.get("fields", {})
  51. self.data_path = self.response.get("data_path")
  52. self.next_cursor_path = self.response.get("next_cursor")
  53. # 流程控制配置
  54. self.loop_times = self.platform_config.loop_times # 循环次数
  55. self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒)
  56. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  57. self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
  58. self.logger.info(
  59. f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
  60. self.session = None
  61. self.feishu_sheetid = self.platform_config.feishu_sheetid
  62. # 动态检测分页变量名
  63. self.cursor_variable_name = None
  64. body_str = str(self.body)
  65. match = PLACEHOLDER_PATTERN.search(body_str)
  66. if match:
  67. self.cursor_variable_name = match.group(1)
  68. self.logger.info(f"{self.trace_id}--检测到分页变量名: {self.cursor_variable_name}")
  69. else:
  70. self.logger.info(f"{self.trace_id}--未检测到分页变量名,默认不分页")
  71. self.current_cursor = None
  72. async def crawl_data(self,session,dynamic_variables=None) -> Optional[List[Dict]]:
  73. dynamic_variables = dynamic_variables or {}
  74. if self.cursor_variable_name:
  75. dynamic_variables[self.cursor_variable_name] = self.current_cursor or ""
  76. resolved_body = resolve_request_body_template(self.body, dynamic_variables)
  77. response = await self.request_client.request(
  78. session=session,
  79. method=self.method,
  80. url=self.url,
  81. headers=self.headers,
  82. json=resolved_body
  83. )
  84. if self.next_cursor_path:
  85. extracted_cursor = safe_extract(response, self.next_cursor_path)
  86. self.current_cursor = extracted_cursor if extracted_cursor else None
  87. self.logger.info(f"{self.trace_id}--解析到下一页 {self.cursor_variable_name}: {self.current_cursor}")
  88. data = safe_extract(response, self.data_path)
  89. return data if data else []
  90. async def filter_data(self, video: Dict) -> bool:
  91. """校验视频是否符合规则"""
  92. pipeline = PiaoQuanPipeline(
  93. platform=self.platform,
  94. mode=self.mode,
  95. rule_dict=self.rule_dict,
  96. env=self.env,
  97. item=video,
  98. trace_id=self.platform + str(uuid.uuid1())
  99. )
  100. return await pipeline.process_item()
  101. async def is_video_count_sufficient(self) -> bool:
  102. """
  103. 校验视频是否达到当日最大量
  104. :return:True False
  105. """
  106. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  107. if not rule_videos_cnt:
  108. return True
  109. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  110. video_count = await mysql.get_today_videos()
  111. if video_count >= rule_videos_cnt.get("min", 200):
  112. self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
  113. self.aliyun_logr.logging(
  114. code="1011",
  115. message=f"视频数量达到当日最大值",
  116. data=f"<今日视频数量>{video_count}"
  117. )
  118. return False
  119. self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
  120. return True
  121. async def process_video(self, video: Dict) -> Optional[Dict]:
  122. """
  123. 处理单条视频数据,字段映射关系,必要字段检验
  124. :param video:
  125. :return:
  126. """
  127. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  128. publish_user = random.choice(self.user_list)
  129. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
  130. item_kwargs["user_id"] = publish_user["uid"]
  131. item_kwargs["user_name"] = publish_user["nick_name"]
  132. item_kwargs["platform"] = self.platform
  133. item_kwargs["strategy"] = self.mode
  134. try:
  135. item = VideoItem(**item_kwargs)
  136. video_dict = await item.produce_item()
  137. if not video_dict:
  138. self.logger.warning(f"{self.trace_id} 校验失败")
  139. return None
  140. return video_dict
  141. except Exception as e:
  142. self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
  143. return None
  144. async def push_to_etl(self, item: Dict) -> bool:
  145. """推送数据到ETL(同步)"""
  146. self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
  147. try:
  148. await self.mq_producer.send_msg(item)
  149. self.aliyun_logr.logging(
  150. code="1009",
  151. message="成功发送至ETL",
  152. data=item,
  153. trace_id=self.trace_id
  154. )
  155. self.logger.info(f"{self.trace_id}--数据推送成功")
  156. return True
  157. except Exception as e:
  158. self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
  159. return False
  160. async def get_today_videos(self):
  161. """
  162. 查询每天的爬虫爬取到的视频数量
  163. :return:
  164. """
  165. video_count = await self.db_service.get_today_videos()
  166. return video_count
  167. async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
  168. """
  169. 视频处理
  170. :return:
  171. """
  172. await generate_titles(self.feishu_sheetid,video)
  173. async def run(self):
  174. """
  175. 异步运行爬虫任务,严格按顺序执行
  176. 1. 爬取
  177. 2. 处理每条数据,字段校验
  178. 3. 过滤(重复,平台规则,标题,发布时间)
  179. 4. 标题处理
  180. 5. 推送到ETL
  181. """
  182. try:
  183. total_success,total_failed= 0,0
  184. loop_start_time = time.time()
  185. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
  186. for loop_index in range(1, self.loop_times + 1):
  187. # 判断当日视频数量已达到最大量
  188. if not await self.is_video_count_sufficient():
  189. return
  190. self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
  191. # 请求视频列表
  192. video_list = await self.crawl_data(session)
  193. if not video_list:
  194. self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
  195. await self._wait_for_next_loop(loop_index)
  196. continue
  197. success_count = 0
  198. fail_count = 0
  199. for video in video_list:
  200. # 提取视频字段映射关系
  201. video_obj = await self.process_video(video)
  202. if not video_obj:
  203. self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
  204. fail_count += 1
  205. continue
  206. # 视频过滤规则
  207. if not await self.filter_data(video_obj):
  208. self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
  209. continue
  210. # 视频处理
  211. await self.integrated_video_handling(video_obj)
  212. if await self.push_to_etl(video_obj):
  213. success_count += 1
  214. else:
  215. fail_count += 1
  216. total_success += success_count
  217. total_failed += fail_count
  218. loop_duration = time.time() - loop_start_time
  219. self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
  220. f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
  221. await self._wait_for_next_loop(loop_index)
  222. # 全局指标日志
  223. self.aliyun_logr.logging(
  224. code="1003",
  225. message="爬虫执行指标汇总",
  226. data={
  227. "trace_id": self.trace_id,
  228. "classname": self.platform,
  229. "success_count": total_success,
  230. "fail_count": total_failed
  231. },
  232. trace_id=self.trace_id
  233. )
  234. self.logger.info(
  235. f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
  236. return True
  237. except Exception as e:
  238. self.logger.error(f"爬虫致命错误: {e}")
  239. raise
  240. async def _wait_for_next_loop(self, current_loop: int) -> None:
  241. """等待下一次循环请求"""
  242. if current_loop < self.loop_times and self.loop_interval > 0:
  243. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  244. await asyncio.sleep(self.loop_interval)