basespider.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # spiders/basespider.py
  2. import asyncio
  3. import random
  4. import traceback
  5. from typing import List, Dict, Any, Optional
  6. from abc import ABC, abstractmethod
  7. import aiohttp
  8. from core.utils.request_preparer import RequestPreparer
  9. from core.utils.spider_config import SpiderConfig
  10. from core.utils.log.logger_manager import LoggerManager
  11. from core.video_processor import VideoProcessor
  12. from services.async_mysql_service import AsyncMysqlService
  13. from services.async_mq_producer import AsyncMQProducer
  14. from core.base.async_request_client import AsyncRequestClient
  15. class BaseSpider(ABC):
  16. """爬虫基类 - 简化版本,不包含循环逻辑"""
  17. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
  18. request_client: AsyncRequestClient = None,
  19. db_service: AsyncMysqlService = None,
  20. mq_producer: AsyncMQProducer = None):
  21. # 基础属性
  22. self.rule_dict = rule_dict
  23. self.user_list = user_list
  24. self.env = env
  25. # 服务依赖
  26. self.request_client = request_client
  27. self.db_service = db_service
  28. self.mq_producer = mq_producer
  29. # 通过类名获取配置
  30. class_name = self.__class__.__name__.lower()
  31. self.config = SpiderConfig.get_platform_config(class_name)
  32. self._setup_from_config()
  33. # 日志服务
  34. self.logger = LoggerManager.get_logger(
  35. platform=self.config.platform,
  36. mode=self.config.mode
  37. )
  38. self.aliyun_log = LoggerManager.get_aliyun_logger(
  39. platform=self.config.platform,
  40. mode=self.config.mode
  41. )
  42. # 请求准备器
  43. self.request_preparer = RequestPreparer(
  44. response_parse_config=self.config.response_parse,
  45. logger=self.logger,
  46. aliyun_log=self.aliyun_log
  47. )
  48. # 状态跟踪
  49. self.stats = {
  50. 'success': 0,
  51. 'fail': 0,
  52. 'start_time': None,
  53. 'end_time': None
  54. }
  55. # 如果没有传入服务,则创建默认实例
  56. self._setup_default_services()
  57. # 初始化视频处理器
  58. self.video_processor = VideoProcessor(
  59. platform=self.platform,
  60. mode=self.mode,
  61. field_map=self.field_map,
  62. feishu_sheetid=self.feishu_sheetid,
  63. logger=self.logger,
  64. aliyun_log=self.aliyun_log
  65. )
  66. def _setup_default_services(self):
  67. """设置默认服务实例"""
  68. if not self.request_client:
  69. self.request_client = AsyncRequestClient(
  70. logger=self.logger,
  71. aliyun_log=self.aliyun_log
  72. )
  73. if not self.db_service:
  74. self.db_service = AsyncMysqlService(
  75. platform=self.config.platform,
  76. mode=self.config.mode
  77. )
  78. if not self.mq_producer:
  79. self.mq_producer = AsyncMQProducer(
  80. topic_name="topic_crawler_etl_prod_v2",
  81. platform=self.config.platform,
  82. mode=self.config.mode
  83. )
  84. def _setup_from_config(self):
  85. """从配置中设置属性"""
  86. self.platform = self.config.platform
  87. self.mode = self.config.mode
  88. self.url = self.config.url
  89. self.method = self.config.method.upper()
  90. self.headers = self.config.headers or {}
  91. self.request_body_template = self.config.request_body or {}
  92. self.loop_times = self.config.loop_times or 100
  93. self.timeout = self.config.request_timeout or 30
  94. self.feishu_sheetid = self.config.feishu_sheetid
  95. # 响应解析配置
  96. response_parse = self.config.response_parse or {}
  97. self.data_path = response_parse.get("data_path")
  98. self.has_more = response_parse.get("has_more")
  99. self.next_cursor = response_parse.get("next_cursor")
  100. self.field_map = response_parse.get("fields", {})
  101. async def run(self):
  102. """主运行流程 - 子类实现完整循环逻辑"""
  103. self.stats['start_time'] = asyncio.get_event_loop().time()
  104. self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
  105. try:
  106. await self.before_run()
  107. await self.execute() # 子类实现完整的执行逻辑
  108. except Exception as e:
  109. self.logger.error(f"爬虫执行异常: {e}\n{traceback.format_exc()}")
  110. finally:
  111. await self.after_run()
  112. self._log_final_stats()
  113. @abstractmethod
  114. async def execute(self):
  115. """执行核心逻辑 - 子类必须实现完整循环逻辑"""
  116. pass
  117. async def process_data(self, data: List[Dict]):
  118. """处理数据"""
  119. success_count = 0
  120. for item in data:
  121. self.aliyun_log.logging(
  122. code="1001",
  123. message=f"获取到一条数据",
  124. data=item
  125. )
  126. try:
  127. if await self.process_single_item(item):
  128. success_count += 1
  129. self.stats['success'] += 1
  130. else:
  131. self.stats['fail'] += 1
  132. except Exception as e:
  133. self.logger.error(f"处理单条数据失败: {e}")
  134. self.stats['fail'] += 1
  135. self.logger.info(f"批次处理完成: 成功 {success_count}/{len(data)}")
  136. return success_count
  137. async def process_single_item(self, item: Dict) -> bool:
  138. """处理单条数据"""
  139. # 1. 补充详情
  140. detail_item = await self.fetch_detail(item)
  141. # 2. 使用视频处理器进行完整处理
  142. video_obj = await self.video_processor.process_single_video(
  143. raw_data=detail_item,
  144. user_info=await self.select_publish_user(),
  145. rule_dict=self.rule_dict,
  146. env=self.env
  147. )
  148. if not video_obj:
  149. return False
  150. # 3. 推送数据
  151. return await self.push_video(video_obj)
  152. async def select_publish_user(self) -> Dict:
  153. """选择发布用户"""
  154. if self.user_list:
  155. return random.choice(self.user_list)
  156. return {}
  157. async def fetch_detail(self, item: Dict) -> Dict:
  158. """补充详情 - 子类可重写"""
  159. return item
  160. async def push_video(self, video: Dict) -> bool:
  161. """推送视频数据"""
  162. try:
  163. await self.mq_producer.send_msg(video)
  164. self.aliyun_log.logging(
  165. code="1002",
  166. message="推送ETL成功",
  167. data=video
  168. )
  169. return True
  170. except Exception as e:
  171. self.logger.error(f"推送视频失败: {e}")
  172. return False
  173. async def is_video_count_sufficient(self) -> bool:
  174. """
  175. 检查视频数量是否足够
  176. 未达到 True 反之 False
  177. """
  178. max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
  179. if max_count <= 0:
  180. return True
  181. current_count = await self.db_service.get_today_videos()
  182. if current_count >= max_count:
  183. self.logger.info(f"视频数量达到当日最大值: {current_count}/{max_count}")
  184. self.aliyun_log.logging(
  185. code="1011",
  186. message="视频数量达到最大值",
  187. data={
  188. "current_count": current_count,
  189. "max_count": max_count
  190. }
  191. )
  192. return False
  193. return True
  194. async def wait_between_iterations(self, wait_time: int = None):
  195. """等待间隔"""
  196. if wait_time is None:
  197. interval_config = getattr(self.config, 'loop_interval', {})
  198. min_time = interval_config.get('min', 1)
  199. max_time = interval_config.get('max', 5)
  200. wait_time = random.randint(min_time, max_time)
  201. self.logger.info(f"等待 {wait_time} 秒")
  202. await asyncio.sleep(wait_time)
  203. async def make_request(self, request_body: Dict) -> Optional[Dict]:
  204. """发送请求"""
  205. async with aiohttp.ClientSession(
  206. timeout=aiohttp.ClientTimeout(total=self.timeout)
  207. ) as session:
  208. return await self.request_client.request(
  209. session=session,
  210. method=self.method,
  211. url=self.url,
  212. headers=self.headers,
  213. json=request_body
  214. )
  215. async def before_run(self):
  216. """运行前准备 - 子类可重写"""
  217. pass
  218. async def after_run(self):
  219. """运行后清理 - 子类可重写"""
  220. pass
  221. def _log_final_stats(self):
  222. """记录最终统计"""
  223. self.stats['end_time'] = asyncio.get_event_loop().time()
  224. duration = 0
  225. if self.stats['start_time'] is not None:
  226. duration = self.stats['end_time'] - self.stats['start_time']
  227. self.logger.info(
  228. f"爬虫执行完成: 成功 {self.stats['success']}, "
  229. f"失败 {self.stats['fail']}, 耗时 {duration:.2f}秒"
  230. )