basespider.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import asyncio
  2. import random
  3. import traceback
  4. import uuid
  5. from typing import List, Dict, Optional, Any
  6. from abc import ABC, abstractmethod
  7. from core.models.video_item import VideoItem
  8. from core.utils.helpers import generate_titles
  9. from core.utils.request_preparer import RequestPreparer
  10. from core.utils.spider_config import SpiderConfig
  11. from core.utils.extractors import safe_extract, extract_fields
  12. from core.utils.log.logger_manager import LoggerManager
  13. from services.async_mysql_service import AsyncMysqlService
  14. from services.pipeline import PiaoQuanPipeline
  15. from core.base.async_request_client import AsyncRequestClient
  16. from services.async_mq_producer import AsyncMQProducer
  17. class BaseSpider(ABC):
  18. """通用爬虫基类"""
  19. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  20. self.rule_dict = rule_dict
  21. self.user_list = user_list
  22. self.env = env
  23. self.class_name = self.__class__.__name__.lower()
  24. print(self.class_name)
  25. # 初始化核心组件
  26. self._setup_configuration()
  27. self._setup_logging()
  28. self._setup_services()
  29. self._setup_state()
  30. # 通用状态
  31. self.total_success = 0
  32. self.total_fail = 0
  33. def _setup_configuration(self):
  34. self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
  35. if not self.platform_config:
  36. raise ValueError(f"找不到爬虫配置: {self.class_name}")
  37. self.platform = self.platform_config.platform
  38. self.mode = self.platform_config.mode
  39. self.url = self.platform_config.url
  40. self.method = self.platform_config.method.upper()
  41. self.headers = self.platform_config.headers or {}
  42. self.request_body_template = self.platform_config.request_body or {}
  43. self.response_parse_config = self.platform_config.response_parse or {}
  44. self.data_path = self.response_parse_config.get("data_path")
  45. self.has_more = self.response_parse_config.get("has_more")
  46. self.field_map = self.response_parse_config.get("fields", {})
  47. self.loop_times = self.platform_config.loop_times or 100
  48. self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
  49. self.timeout = self.platform_config.request_timeout or 30
  50. self.max_retries = self.platform_config.max_retries or 3
  51. self.feishu_sheetid = self.platform_config.feishu_sheetid
  52. def _setup_logging(self):
  53. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  54. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  55. self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
  56. def _setup_services(self):
  57. self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
  58. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  59. self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
  60. def _setup_state(self):
  61. self.last_response_data = {}
  62. self.request_preparer = RequestPreparer(
  63. response_parse_config=self.response_parse_config,
  64. logger=self.logger,
  65. aliyun_log=self.aliyun_log
  66. )
  67. # 核心入口(统一流程)
  68. async def run(self):
  69. """主流程:初始化→核心循环→收尾"""
  70. self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
  71. await self.before_run()
  72. try:
  73. await self.core_loop() # 子类实现具体模式逻辑
  74. except Exception as e:
  75. tb = traceback.format_exc()
  76. self.logger.exception(f"运行异常: {e},堆栈信息:{tb}")
  77. finally:
  78. await self.after_run()
  79. self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}")
  80. @abstractmethod
  81. async def core_loop(self):
  82. """子类必须实现:模式特有核心循环(推荐/账号)"""
  83. pass
  84. async def fetch_detail(self, item: Dict) -> Dict:
  85. """子类选择实现:补充详情(完全由子类控制)"""
  86. return item
  87. # 通用数据处理流程
  88. async def process_raw_data(self, raw_data: List[Dict]):
  89. """处理原始数据列表(清洗→过滤→推送)"""
  90. for item in raw_data:
  91. try:
  92. # 补充详情(完全由子类实现)
  93. detail_data = await self.fetch_detail(item)
  94. # 处理并推送
  95. result = await self.process_and_push_video(detail_data)
  96. if result:
  97. self.total_success += 1
  98. else:
  99. self.total_fail += 1
  100. except Exception as e:
  101. self.logger.exception(f"处理单条数据失败: {e}")
  102. self.total_fail += 1
  103. async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
  104. try:
  105. video_obj = await self.process_video(video)
  106. if not video_obj:
  107. return False
  108. if not await self.filter_data(video_obj):
  109. return False
  110. await self.integrated_video_handling(video_obj)
  111. return await self.push_to_etl(video_obj)
  112. except Exception as e:
  113. self.logger.exception(f"视频处理异常: {e}")
  114. return False
  115. async def process_video(self, video: Dict) -> Optional[Dict]:
  116. """
  117. 字段映射
  118. 统一字段抽取及 VideoItem 初始化
  119. """
  120. self.logger.info(f"处理视频数据: {video}")
  121. if self.user_list:
  122. publish_user = random.choice(self.user_list)
  123. else:
  124. self.logger.error(f"未获取到用户列表数据{self.user_list}")
  125. return
  126. item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
  127. item_kwargs.update({
  128. "user_id": publish_user.get("uid"),
  129. "user_name": publish_user.get("nick_name"),
  130. "platform": self.platform,
  131. "strategy": self.mode,
  132. })
  133. try:
  134. item = VideoItem(**item_kwargs)
  135. video_dict = await item.produce_item()
  136. if not video_dict:
  137. self.logger.warning(f"VideoItem 校验失败")
  138. return None
  139. return video_dict
  140. except Exception as e:
  141. self.logger.error(f"VideoItem 初始化失败: {e}")
  142. return None
  143. async def filter_data(self, video: Dict) -> bool:
  144. """
  145. 数据校验过滤,默认使用 PiaoQuanPipeline
  146. 子类可重写此方法实现自定义过滤
  147. """
  148. pipeline = PiaoQuanPipeline(
  149. platform=self.platform,
  150. mode=self.mode,
  151. rule_dict=self.rule_dict,
  152. env=self.env,
  153. item=video,
  154. trace_id=self.platform + str(uuid.uuid1())
  155. )
  156. return await pipeline.process_item()
  157. async def integrated_video_handling(self, video: Dict) -> None:
  158. """
  159. 钩子函数:可在此实现自动生成标题或其他业务逻辑
  160. """
  161. await generate_titles(self.feishu_sheetid, video)
  162. async def push_to_etl(self, video: Dict) -> bool:
  163. try:
  164. await self.mq_producer.send_msg(video)
  165. self.aliyun_log.logging(code="1009",
  166. message="推送ETL成功",
  167. data=video)
  168. self.logger.info(f"成功推送视频至ETL: {video}")
  169. return True
  170. except Exception as e:
  171. self.logger.exception(f"推送ETL失败: {e}")
  172. return False
  173. async def is_video_count_sufficient(self) -> bool:
  174. """
  175. 校验当日视频是否达到最大爬取量
  176. True未达到
  177. False达到最大量
  178. :return:True/False
  179. """
  180. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  181. if max_count <= 0:
  182. self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测")
  183. return True
  184. current_count = await self.db_service.get_today_videos()
  185. if current_count >= max_count:
  186. self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}")
  187. self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}")
  188. return False
  189. self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
  190. self.aliyun_log.logging(code="1012",
  191. message=f"目前入库量{current_count}",
  192. data=f"{current_count}/{max_count}"
  193. )
  194. return True
  195. async def wait(self):
  196. wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
  197. self.logger.info(f"等待 {wait_time} 秒后继续")
  198. await asyncio.sleep(wait_time)
  199. async def before_run(self):
  200. """运行前钩子(子类可重写)"""
  201. pass
  202. async def after_run(self):
  203. """运行后钩子(子类可重写)"""
  204. pass