basespider.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # spiders/basespider.py
  2. import asyncio
  3. import random
  4. import traceback
  5. from typing import List, Dict, Any, Optional
  6. from abc import ABC, abstractmethod
  7. import aiohttp
  8. from core.utils.request_preparer import RequestPreparer
  9. from core.utils.spider_config import SpiderConfig
  10. from core.utils.log.logger_manager import LoggerManager
  11. from core.video_processor import VideoProcessor
  12. from services.async_mysql_service import AsyncMysqlService
  13. from services.async_mq_producer import AsyncMQProducer
  14. from core.base.async_request_client import AsyncRequestClient
  15. from services.async_redis_service import AsyncRedisService
  16. class BaseSpider(ABC):
  17. """爬虫基类 - 简化版本,不包含循环逻辑"""
  18. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
  19. request_client: AsyncRequestClient = None,
  20. db_service: AsyncMysqlService = None,
  21. mq_producer: AsyncMQProducer = None,
  22. redis_service: AsyncRedisService = None):
  23. # 基础属性
  24. self.rule_dict = rule_dict
  25. self.user_list = user_list
  26. self.env = env
  27. # 服务依赖
  28. self.request_client = request_client
  29. self.db_service = db_service
  30. self.mq_producer = mq_producer
  31. self.redis_service = redis_service
  32. # 通过类名获取配置
  33. class_name = self.__class__.__name__.lower()
  34. self.config = SpiderConfig.get_platform_config(class_name)
  35. self._setup_from_config()
  36. # 日志服务
  37. self.logger = LoggerManager.get_logger(
  38. platform=self.config.platform,
  39. mode=self.config.mode
  40. )
  41. self.aliyun_log = LoggerManager.get_aliyun_logger(
  42. platform=self.config.platform,
  43. mode=self.config.mode
  44. )
  45. # 请求准备器
  46. self.request_preparer = RequestPreparer(
  47. response_parse_config=self.config.response_parse,
  48. logger=self.logger,
  49. aliyun_log=self.aliyun_log
  50. )
  51. # 状态跟踪
  52. self.stats = {
  53. 'success': 0,
  54. 'fail': 0,
  55. 'start_time': None,
  56. 'end_time': None
  57. }
  58. # 如果没有传入服务,则创建默认实例
  59. self._setup_default_services()
  60. # 初始化视频处理器
  61. self.video_processor = VideoProcessor(
  62. platform=self.platform,
  63. mode=self.mode,
  64. field_map=self.field_map,
  65. feishu_sheetid=self.feishu_sheetid,
  66. logger=self.logger,
  67. aliyun_log=self.aliyun_log
  68. )
  69. def _setup_default_services(self):
  70. """设置默认服务实例"""
  71. if not self.request_client:
  72. self.request_client = AsyncRequestClient(
  73. logger=self.logger,
  74. aliyun_log=self.aliyun_log
  75. )
  76. if not self.db_service:
  77. self.db_service = AsyncMysqlService(
  78. platform=self.config.platform,
  79. mode=self.config.mode
  80. )
  81. if not self.mq_producer:
  82. self.mq_producer = AsyncMQProducer(
  83. topic_name="topic_crawler_etl_prod_v2",
  84. platform=self.config.platform,
  85. mode=self.config.mode
  86. )
  87. if not self.redis_service:
  88. # RedisManager.init(redis_url=settings.redis_url)
  89. self.redis_service = AsyncRedisService()
  90. def _setup_from_config(self):
  91. """从配置中设置属性"""
  92. self.platform = self.config.platform
  93. self.mode = self.config.mode
  94. self.url = self.config.url
  95. self.method = self.config.method.upper()
  96. self.headers = self.config.headers or {}
  97. self.request_body_template = self.config.request_body or {}
  98. self.loop_times = self.config.loop_times or 100
  99. self.timeout = self.config.request_timeout or 30
  100. self.feishu_sheetid = self.config.feishu_sheetid
  101. # 响应解析配置
  102. response_parse = self.config.response_parse or {}
  103. self.data_path = response_parse.get("data_path")
  104. self.has_more = response_parse.get("has_more")
  105. self.next_cursor = response_parse.get("next_cursor")
  106. self.field_map = response_parse.get("fields", {})
  107. async def run(self):
  108. """主运行流程 - 子类实现完整循环逻辑"""
  109. self.stats['start_time'] = asyncio.get_event_loop().time()
  110. self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
  111. try:
  112. await self.before_run()
  113. await self.execute() # 子类实现完整的执行逻辑
  114. except Exception as e:
  115. self.logger.error(f"爬虫执行异常: {e}\n{traceback.format_exc()}")
  116. finally:
  117. await self.after_run()
  118. self._log_final_stats()
  119. @abstractmethod
  120. async def execute(self):
  121. """执行核心逻辑 - 子类必须实现完整循环逻辑"""
  122. pass
  123. async def process_data(self, data: List[Dict]):
  124. """处理数据"""
  125. # 处理data为None的情况
  126. if data is None:
  127. data_length = 0
  128. else:
  129. data_length = len(data)
  130. self.aliyun_log.logging(
  131. code="1001",
  132. message=f"获取到的列表长度:{data_length}",
  133. data=data_length,
  134. )
  135. if not data:
  136. return 0
  137. success_count = 0
  138. for item in data:
  139. self.aliyun_log.logging(
  140. code="1001",
  141. message=f"获取到一条数据",
  142. data=item
  143. )
  144. try:
  145. if await self.process_single_item(item):
  146. success_count += 1
  147. self.stats['success'] += 1
  148. else:
  149. self.stats['fail'] += 1
  150. except Exception as e:
  151. self.logger.error(f"处理单条数据失败: {e}")
  152. self.stats['fail'] += 1
  153. self.logger.info(f"批次处理完成: 成功抓取 {success_count}/{data_length}")
  154. return success_count
  155. async def process_single_item(self, item: Dict) -> bool:
  156. """处理单条数据"""
  157. # 1. 补充详情
  158. detail_item = await self.fetch_detail(item)
  159. # 2. 使用视频处理器进行完整处理
  160. video_obj = await self.video_processor.process_single_video(
  161. raw_data=detail_item,
  162. user_info=await self.select_publish_user(),
  163. rule_dict=self.rule_dict,
  164. env=self.env
  165. )
  166. if not video_obj:
  167. return False
  168. # 3. 推送数据
  169. return await self.push_video(video_obj)
  170. async def select_publish_user(self) -> Dict:
  171. """选择发布用户"""
  172. if self.user_list:
  173. return random.choice(self.user_list)
  174. return {}
  175. async def fetch_detail(self, item: Dict) -> Dict:
  176. """补充详情 - 子类可重写"""
  177. return item
  178. async def push_video(self, video: Dict) -> bool:
  179. """推送视频数据"""
  180. try:
  181. await self.mq_producer.send_msg(video)
  182. self.aliyun_log.logging(
  183. code="1002",
  184. message="推送ETL成功",
  185. data=video
  186. )
  187. return True
  188. except Exception as e:
  189. self.logger.error(f"推送视频失败: {e}")
  190. return False
  191. async def is_video_count_sufficient(self) -> bool:
  192. """
  193. 检查视频数量是否足够
  194. 未达到 True 反之 False
  195. """
  196. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  197. if max_count <= 0:
  198. return True
  199. current_count = await self.db_service.get_today_videos()
  200. self.logger.info(f"已抓取数量: {current_count}/{max_count}")
  201. if current_count >= max_count:
  202. self.aliyun_log.logging(
  203. code="1011",
  204. message="视频数量达到最大值",
  205. data={
  206. "current_count": current_count,
  207. "max_count": max_count
  208. }
  209. )
  210. return False
  211. return True
  212. async def wait_between_iterations(self, wait_time: int = None):
  213. """等待间隔"""
  214. if wait_time is None:
  215. interval_config = getattr(self.config, 'loop_interval', {})
  216. min_time = interval_config.get('min', 1)
  217. max_time = interval_config.get('max', 5)
  218. wait_time = random.randint(min_time, max_time)
  219. self.logger.info(f"等待 {wait_time} 秒")
  220. await asyncio.sleep(wait_time)
  221. async def make_request(self, request_body: Dict) -> Optional[Dict]:
  222. """发送请求"""
  223. async with aiohttp.ClientSession(
  224. timeout=aiohttp.ClientTimeout(total=self.timeout)
  225. ) as session:
  226. return await self.request_client.request(
  227. session=session,
  228. method=self.method,
  229. url=self.url,
  230. headers=self.headers,
  231. json=request_body
  232. )
  233. async def before_run(self):
  234. """运行前准备 - 子类可重写"""
  235. pass
  236. async def after_run(self):
  237. """运行后清理 - 子类可重写"""
  238. pass
  239. def _log_final_stats(self):
  240. """记录最终统计"""
  241. self.stats['end_time'] = asyncio.get_event_loop().time()
  242. duration = 0
  243. if self.stats['start_time'] is not None:
  244. duration = self.stats['end_time'] - self.stats['start_time']
  245. self.logger.info(
  246. f"爬虫执行完成: 成功 {self.stats['success']}, "
  247. f"失败 {self.stats['fail']}, 耗时 {duration:.2f}秒"
  248. )