123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # 文件: core/mq/rocketmq_consumer_wrapper.py (新文件)
- import asyncio
- import logging
- from typing import Callable, Awaitable
- from rocketmq.client import ClientConfiguration, Credentials, SimpleConsumer
- from rocketmq.model.message import Message
- from config import settings
- from core.utils.log.logger_manager import LoggerManager
- class RocketMQSimpleConsumerWrapper:
- """
- rocketmq-client-python 的异步封装。
- - 负责连接、订阅和优雅关闭。
- - 提供一个异步运行的消费循环,该循环:
- 1. 以异步方式拉取消息。
- 2. 异步处理函数。
- 3. 在处理函数成功完成后,自动异步确认消息。
- 4. 如果处理函数失败,则不确认,等待消息重试。
- """
- def __init__(self, group_id: str, topic: str):
- self.logger = LoggerManager.get_logger()
- self.group_id = group_id
- self.topic = topic
- # 从配置中读取连接信息
- credentials = Credentials(settings.ROCKETMQ_ACCESS_KEY_ID, settings.ROCKETMQ_ACCESS_KEY_SECRET)
- config = ClientConfiguration(
- settings.ROCKETMQ_ENDPOINT,
- credentials,
- settings.ROCKETMQ_INSTANCE_ID # 官方SDK的namespace参数用于实例ID
- )
- # 阿里云商业版 topic 需要带上实例ID前缀
- self.topic_with_instance = f"{settings.ROCKETMQ_INSTANCE_ID}%%{self.topic}"
- # wait_seconds 对应 receive 的 long-polling 时间
- self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS
- # 消息不可见时长,先设置30分钟
- self.invisible_duration_seconds = 1800
- self._consumer = SimpleConsumer(config, self.group_id)
- self._is_running = False
- async def startup(self):
- """启动消费者"""
- if self._is_running:
- return
- self.logger.info(f"[{self.topic}] 消费者正在启动...")
- await asyncio.to_thread(self._consumer.startup)
- self._is_running = True
- self.logger.info(f"[{self.topic}] 消费者启动成功")
- async def shutdown(self):
- """关闭消费者"""
- if not self._is_running:
- return
- self._is_running = False
- self.logger.info(f"[{self.topic}] 消费者正在关闭...")
- await asyncio.to_thread(self._consumer.shutdown)
- self.logger.info(f"[{self.topic}] 消费者已关闭")
- async def subscribe(self):
- """订阅主题"""
- self.logger.info(f"[{self.topic}] 正在订阅主题: {self.topic_with_instance}")
- await asyncio.to_thread(self._consumer.subscribe, self.topic_with_instance)
- self.logger.info(f"[{self.topic}] 订阅成功")
- async def run(self, async_message_handler: Callable[[Message], Awaitable[None]], stop_event: asyncio.Event):
- """
- 运行主消费循环。
- :param async_message_handler: 用户定义的异步消息处理函数。
- :param stop_event: 用于优雅停止的 asyncio 事件。
- """
- self.logger.info(f"[{self.topic}] 进入消费循环,消息不可见时间设置为 {self.invisible_duration_seconds} 秒")
- while not stop_event.is_set():
- try:
- # 使用 to_thread 在异步环境中运行阻塞的 receive 方法
- messages = await asyncio.to_thread(self._consumer.receive, 1, self.invisible_duration_seconds)
- if not messages:
- self.logger.debug(f"[{self.topic}] 长轮询超时,未拉取到消息,继续...")
- continue
- message = messages[0]
- self.logger.info(f"[{self.topic}] 拉取到消息 ID: {message.message_id}")
- try:
- # 调用业务逻辑处理器
- await async_message_handler(message)
- # 业务逻辑成功后,异步确认消息 <--- 正确的ACK位置
- self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} 处理成功,正在ACK...")
- await asyncio.to_thread(self._consumer.ack, message)
- self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} ACK成功")
- except Exception as e:
- # 业务逻辑失败,打印日志,不ACK,消息将超时后重投
- self.logger.error(
- f"[{self.topic}] 处理消息 ID: {message.message_id} 失败,将不进行ACK,等待重投。错误: {e}",
- exc_info=True
- )
- except Exception as e:
- # 拉取消息本身失败(如网络中断)
- self.logger.error(f"[{self.topic}] 消费循环发生严重错误: {e}", exc_info=True)
- # 等待一段时间避免频繁失败
- await asyncio.sleep(5)
- self.logger.info(f"[{self.topic}] 消费循环已停止 (收到退出信号)")
|