basespider.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import asyncio
  2. import random
  3. import traceback
  4. import uuid
  5. from typing import List, Dict, Optional, Any
  6. from abc import ABC, abstractmethod
  7. from core.models.video_item import VideoItem
  8. from core.utils.helpers import generate_titles
  9. from core.utils.request_preparer import RequestPreparer
  10. from core.utils.spider_config import SpiderConfig
  11. from core.utils.extractors import safe_extract, extract_fields
  12. from core.utils.log.logger_manager import LoggerManager
  13. from services.async_mysql_service import AsyncMysqlService
  14. from services.pipeline import PiaoQuanPipeline
  15. from core.base.async_request_client import AsyncRequestClient
  16. from services.async_mq_producer import AsyncMQProducer
  17. class BaseSpider(ABC):
  18. """通用爬虫基类"""
  19. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
  20. request_client: AsyncRequestClient = None,
  21. db_service: AsyncMysqlService = None,
  22. mq_producer: AsyncMQProducer = None):
  23. self.rule_dict = rule_dict
  24. self.user_list = user_list
  25. self.env = env
  26. self.class_name = self.__class__.__name__.lower()
  27. # 初始化核心组件
  28. self._setup_configuration()
  29. self._setup_logging()
  30. self._setup_services(request_client, db_service, mq_producer)
  31. self._setup_state()
  32. # 通用状态
  33. self.total_success = 0
  34. self.total_fail = 0
  35. self.video = None
  36. def _setup_configuration(self):
  37. self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
  38. if not self.platform_config:
  39. raise ValueError(f"找不到爬虫配置: {self.class_name}")
  40. self.platform = self.platform_config.platform
  41. self.mode = self.platform_config.mode
  42. self.url = self.platform_config.url
  43. self.method = self.platform_config.method.upper()
  44. self.headers = self.platform_config.headers or {}
  45. self.request_body_template = self.platform_config.request_body or {}
  46. self.response_parse_config = self.platform_config.response_parse or {}
  47. self.data_path = self.response_parse_config.get("data_path")
  48. self.has_more = self.response_parse_config.get("has_more")
  49. self.field_map = self.response_parse_config.get("fields", {})
  50. self.next_cursor = self.response_parse_config.get("next_cursor") or ""
  51. self.loop_times = self.platform_config.loop_times or 100
  52. self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
  53. self.timeout = self.platform_config.request_timeout or 30
  54. self.max_retries = self.platform_config.max_retries or 3
  55. self.feishu_sheetid = self.platform_config.feishu_sheetid
  56. def _setup_logging(self):
  57. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  58. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  59. self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
  60. def _setup_services(self, request_client: AsyncRequestClient = None,
  61. db_service: AsyncMysqlService = None,
  62. mq_producer: AsyncMQProducer = None):
  63. """初始化服务组件"""
  64. self.request_client = request_client or AsyncRequestClient(
  65. logger=self.logger,
  66. aliyun_log=self.aliyun_log
  67. )
  68. self.db_service = db_service or AsyncMysqlService(
  69. platform=self.platform,
  70. mode=self.mode
  71. )
  72. self.mq_producer = mq_producer or AsyncMQProducer(
  73. topic_name="topic_crawler_etl_prod_v2",
  74. platform=self.platform,
  75. mode=self.mode
  76. )
  77. def _setup_state(self):
  78. self.last_response_data = {}
  79. self.request_preparer = RequestPreparer(
  80. response_parse_config=self.response_parse_config,
  81. logger=self.logger,
  82. aliyun_log=self.aliyun_log
  83. )
  84. # 核心入口(统一流程)
  85. async def run(self):
  86. """主流程:初始化→核心循环→收尾"""
  87. self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
  88. await self.before_run()
  89. try:
  90. await self.core_loop() # 子类实现具体模式逻辑
  91. except Exception as e:
  92. tb = traceback.format_exc()
  93. self.logger.exception(f"运行异常: {e},堆栈信息:{tb}")
  94. finally:
  95. await self.after_run()
  96. self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}")
  97. @abstractmethod
  98. async def core_loop(self):
  99. """子类必须实现:模式特有核心循环(推荐/账号)"""
  100. pass
  101. async def fetch_detail(self, item: Dict) -> Dict:
  102. """子类选择实现:补充详情(完全由子类控制)"""
  103. return item
  104. # 通用数据处理流程
  105. async def process_data(self, video_data: List[Dict]):
  106. """处理原始数据列表(清洗→过滤→推送)"""
  107. for item in video_data:
  108. try:
  109. # 补充详情(完全由子类实现)
  110. detail_data = await self.fetch_detail(item)
  111. # 处理并推送
  112. result = await self.process_and_push_video(detail_data)
  113. if result:
  114. self.total_success += 1
  115. else:
  116. self.total_fail += 1
  117. except Exception as e:
  118. self.logger.exception(f"处理单条数据失败: {e}")
  119. self.total_fail += 1
  120. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  121. try:
  122. self.video = video
  123. video_obj = await self.process_video(video)
  124. if not video_obj:
  125. return False
  126. if not await self.filter_data(video_obj):
  127. return False
  128. await self.integrated_video_handling(video_obj)
  129. return await self.push_to_etl(video_obj)
  130. except Exception as e:
  131. self.logger.exception(f"视频处理异常: {e}")
  132. return False
  133. async def publish_video_user(self) -> Dict[str, Any]:
  134. """获取随机发布用户"""
  135. if self.user_list:
  136. return random.choice(self.user_list)
  137. else:
  138. self.logger.error("未获取到用户列表数据")
  139. return None
  140. async def process_video(self, video: Dict) -> Optional[Dict]:
  141. """
  142. 字段映射
  143. 统一字段抽取及 VideoItem 初始化
  144. """
  145. self.logger.info(f"处理视频数据: {video}")
  146. publish_user = await self.publish_video_user()
  147. # 检查是否成功获取到发布用户
  148. if not publish_user:
  149. self.logger.error("无法获取发布用户信息")
  150. return None
  151. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
  152. item_kwargs.update({
  153. "user_id": publish_user.get("uid"),
  154. "user_name": publish_user.get("nick_name"),
  155. "platform": self.platform,
  156. "strategy": self.mode,
  157. })
  158. try:
  159. item = VideoItem(**item_kwargs)
  160. video_dict = await item.produce_item()
  161. if not video_dict:
  162. self.logger.warning("VideoItem 校验失败")
  163. return None
  164. return video_dict
  165. except Exception as e:
  166. self.logger.error(f"VideoItem 初始化失败: {e}")
  167. return None
  168. async def filter_data(self, video: Dict) -> bool:
  169. """
  170. 数据校验过滤,默认使用 PiaoQuanPipeline
  171. 子类可重写此方法实现自定义过滤
  172. """
  173. pipeline = PiaoQuanPipeline(
  174. platform=self.platform,
  175. mode=self.mode,
  176. rule_dict=self.rule_dict,
  177. env=self.env,
  178. item=video,
  179. trace_id=self.platform + str(uuid.uuid1())
  180. )
  181. return await pipeline.process_item()
  182. async def integrated_video_handling(self, video: Dict) -> None:
  183. """
  184. 钩子函数:可在此实现自动生成标题或其他业务逻辑
  185. """
  186. # 视频标题处理生成
  187. await generate_titles(self.feishu_sheetid, video,self.logger,self.aliyun_log)
  188. async def push_to_etl(self, video: Dict) -> bool:
  189. try:
  190. await self.mq_producer.send_msg(video)
  191. self.aliyun_log.logging(code="1009",
  192. message="推送ETL成功",
  193. data=video)
  194. self.logger.info(f"成功推送视频至ETL: {video}")
  195. return True
  196. except Exception as e:
  197. self.logger.exception(f"推送ETL失败: {e}")
  198. return False
  199. async def is_video_count_sufficient(self) -> bool:
  200. """
  201. 校验当日视频是否达到最大爬取量
  202. True未达到
  203. False达到最大量
  204. :return:True/False
  205. """
  206. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  207. if max_count <= 0:
  208. self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测")
  209. return True
  210. current_count = await self.db_service.get_today_videos()
  211. if current_count >= max_count:
  212. self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}")
  213. self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}")
  214. return False
  215. self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
  216. self.aliyun_log.logging(code="1012",
  217. message=f"目前入库量{current_count}",
  218. data=f"{current_count}/{max_count}"
  219. )
  220. return True
  221. async def wait(self):
  222. """等待随机时间间隔"""
  223. # 确保loop_interval包含min和max键
  224. min_time = self.loop_interval.get("min", 1)
  225. max_time = self.loop_interval.get("max", 5)
  226. wait_time = random.randint(min_time, max_time)
  227. self.logger.info(f"等待 {wait_time} 秒后继续")
  228. await asyncio.sleep(wait_time)
  229. async def before_run(self):
  230. """运行前钩子(子类可重写)"""
  231. pass
  232. async def after_run(self):
  233. """运行后钩子(子类可重写)"""
  234. pass