base_spider.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import asyncio
  2. import json
  3. import random
  4. import time
  5. import traceback
  6. import uuid
  7. from abc import ABC
  8. from typing import List, Dict, Optional, Any
  9. import aiohttp
  10. from core.models.video_item import VideoItem
  11. from core.utils.helpers import generate_titles
  12. from core.utils.spider_config import SpiderConfig
  13. from core.utils.extractors import safe_extract, extract_fields
  14. from core.utils.log.logger_manager import LoggerManager
  15. from core.utils.template_resolver import resolve_request_body_template
  16. from services.async_mysql_service import AsyncMysqlService
  17. from services.pipeline import PiaoQuanPipeline
  18. from core.base.async_request_client import AsyncRequestClient
  19. from services.async_mq_producer import AsyncMQProducer
  20. class BaseSpider(ABC):
  21. """
  22. 通用爬虫基类:支持严格顺序执行流程
  23. """
  24. MAX_RETRIES = 3 # 单个请求最大重试次数
  25. TIMEOUT = 30 # 请求超时时间(秒)
  26. def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
  27. self.trace_id = trace_id
  28. self.env = env
  29. self.user_list = user_list
  30. self.rule_dict = rule_dict
  31. self.class_name = self.__class__.__name__ # 获取子类类名
  32. # 根据类名自动获取配置
  33. self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
  34. if not self.platform_config:
  35. raise ValueError(f"找不到对应配置: {self.class_name}")
  36. # 平台信息与日志初始化
  37. self.platform = self.platform_config.platform
  38. self.mode = self.platform_config.mode
  39. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  40. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  41. # MQ用于推送至ETL
  42. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
  43. # 请求配置
  44. self.method = self.platform_config.method.upper()
  45. self.url = self.platform_config.url
  46. self.headers = self.platform_config.headers
  47. self.request_body = self.platform_config.request_body
  48. # 响应解析配置
  49. self.response =self.platform_config.response_parse
  50. self.field_map = self.response.get("fields", {})
  51. self.data_path = self.response.get("data_path")
  52. self.next_cursor_path = self.response.get("next_cursor")
  53. self.response_data = self.response.get("data")
  54. # 流程控制配置
  55. self.loop_times = self.platform_config.loop_times # 循环次数
  56. self.loop_interval = self.platform_config.loop_interval # 循环间隔(秒)
  57. # 数据库与请求客户端
  58. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  59. self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
  60. self.feishu_sheetid = self.platform_config.feishu_sheetid
  61. self.timeout = 30
  62. self.max_retries = 3
  63. self.resolved_body = resolve_request_body_template(self.request_body)
  64. async def run(self):
  65. """
  66. 爬虫入口,执行完整循环(抓取、处理、推送)
  67. """
  68. await self.before_run()
  69. total_success, total_failed = 0, 0
  70. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  71. for loop_index in range(1, self.loop_times + 1):
  72. # 判断是否已达今日抓取上限
  73. if not await self.is_video_count_sufficient():
  74. return
  75. success_count, fail_count = await self.run_single_loop(session)
  76. total_success += success_count
  77. total_failed += fail_count
  78. if loop_index < self.loop_times:
  79. await asyncio.sleep(self.loop_interval)
  80. await self.after_run()
  81. self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_failed}")
  82. async def run_single_loop(self, session) -> (int, int):
  83. """
  84. 单次抓取循环,抓取视频列表并处理推送
  85. """
  86. success_count, fail_count = 0, 0
  87. video_list = await self.crawl_data(session)
  88. if not video_list:
  89. self.logger.info(f"{self.trace_id} 未获取到视频")
  90. return success_count, fail_count
  91. for video in video_list:
  92. result = await self.process_and_push_video(video)
  93. if result:
  94. success_count += 1
  95. else:
  96. fail_count += 1
  97. return success_count, fail_count
  98. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  99. """
  100. 单条视频处理流程:字段提取 -> 校验过滤 -> 标题处理 -> 推送ETL
  101. """
  102. try:
  103. video_obj = await self.process_video(video)
  104. if not video_obj:
  105. return False
  106. if not await self.filter_data(video_obj):
  107. return False
  108. await self.integrated_video_handling(video_obj)
  109. return await self.push_to_etl(video_obj)
  110. except Exception as e:
  111. self.logger.exception(f"{self.trace_id} 视频处理异常 {e}")
  112. return False
  113. async def crawl_data(self,session) -> Optional[List[Dict]]:
  114. """
  115. 抓取数据,自动重试,自动分页
  116. :param session:
  117. :param dynamic_variables:
  118. :return:
  119. """
  120. response = await self.request_client.request(
  121. session=session,
  122. method=self.method,
  123. url=self.url,
  124. headers=self.headers,
  125. json=self.resolved_body
  126. )
  127. print(safe_extract(response, self.response_data))
  128. self.resolved_body = resolve_request_body_template(self.request_body,safe_extract(response, self.response_data) )
  129. data = safe_extract(response, self.data_path)
  130. return data if data else []
  131. async def filter_data(self, video: Dict) -> bool:
  132. """校验视频是否符合规则"""
  133. pipeline = PiaoQuanPipeline(
  134. platform=self.platform,
  135. mode=self.mode,
  136. rule_dict=self.rule_dict,
  137. env=self.env,
  138. item=video,
  139. trace_id=self.platform + str(uuid.uuid1())
  140. )
  141. return await pipeline.process_item()
  142. async def is_video_count_sufficient(self) -> bool:
  143. """
  144. 校验视频是否达到当日最大量
  145. :return:True False
  146. """
  147. rule_videos_cnt = self.rule_dict.get("videos_cnt")
  148. if not rule_videos_cnt:
  149. return True
  150. async with AsyncMysqlService(self.platform, self.mode) as mysql:
  151. video_count = await mysql.get_today_videos()
  152. if video_count >= rule_videos_cnt.get("min", 200):
  153. self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
  154. self.aliyun_logr.logging(
  155. code="1011",
  156. message=f"视频数量达到当日最大值",
  157. data=f"<今日视频数量>{video_count}"
  158. )
  159. return False
  160. self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
  161. return True
  162. async def process_video(self, video: Dict) -> Optional[Dict]:
  163. """
  164. 处理单条视频数据,字段映射关系,必要字段检验
  165. :param video:
  166. :return:
  167. """
  168. self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
  169. publish_user = random.choice(self.user_list)
  170. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
  171. item_kwargs["user_id"] = publish_user["uid"]
  172. item_kwargs["user_name"] = publish_user["nick_name"]
  173. item_kwargs["platform"] = self.platform
  174. item_kwargs["strategy"] = self.mode
  175. try:
  176. item = VideoItem(**item_kwargs)
  177. video_dict = await item.produce_item()
  178. if not video_dict:
  179. self.logger.warning(f"{self.trace_id} 校验失败")
  180. return None
  181. return video_dict
  182. except Exception as e:
  183. self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
  184. return None
  185. async def push_to_etl(self, video: Dict[str, Any]) -> bool:
  186. """
  187. 推送处理完毕的视频到 ETL
  188. """
  189. try:
  190. await self.mq_producer.send_msg(video)
  191. return True
  192. except Exception as e:
  193. self.logger.exception(f"{self.trace_id} 推送ETL失败 {e}")
  194. return False
  195. async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
  196. """
  197. 视频处理
  198. :return:
  199. """
  200. await generate_titles(self.feishu_sheetid,video)
  201. async def _wait_for_next_loop(self, current_loop: int) -> None:
  202. """等待下一次循环请求"""
  203. if current_loop < self.loop_times and self.loop_interval > 0:
  204. self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
  205. await asyncio.sleep(self.loop_interval)
  206. async def before_run(self):
  207. """
  208. 可覆写钩子:在运行前执行,如拉取Token等
  209. """
  210. pass
  211. async def after_run(self):
  212. """
  213. 可覆写钩子:在运行后执行,如统计汇报等
  214. """
  215. pass