# 文件: core/mq/rocketmq_consumer_wrapper.py (新文件) import asyncio import logging from typing import Callable, Awaitable from rocketmq.client import ClientConfiguration, Credentials, SimpleConsumer from rocketmq.model.message import Message from config import settings from core.utils.log.logger_manager import LoggerManager class RocketMQSimpleConsumerWrapper: """ rocketmq-client-python 的异步封装。 - 负责连接、订阅和优雅关闭。 - 提供一个异步运行的消费循环,该循环: 1. 以异步方式拉取消息。 2. 异步处理函数。 3. 在处理函数成功完成后,自动异步确认消息。 4. 如果处理函数失败,则不确认,等待消息重试。 """ def __init__(self, group_id: str, topic: str): self.logger = LoggerManager.get_logger() self.group_id = group_id self.topic = topic # 从配置中读取连接信息 credentials = Credentials(settings.ROCKETMQ_ACCESS_KEY_ID, settings.ROCKETMQ_ACCESS_KEY_SECRET) config = ClientConfiguration( settings.ROCKETMQ_ENDPOINT, credentials, settings.ROCKETMQ_INSTANCE_ID # 官方SDK的namespace参数用于实例ID ) # 阿里云商业版 topic 需要带上实例ID前缀 self.topic_with_instance = f"{settings.ROCKETMQ_INSTANCE_ID}%%{self.topic}" # wait_seconds 对应 receive 的 long-polling 时间 self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS # 消息不可见时长,先设置30分钟 self.invisible_duration_seconds = 1800 self._consumer = SimpleConsumer(config, self.group_id) self._is_running = False async def startup(self): """启动消费者""" if self._is_running: return self.logger.info(f"[{self.topic}] 消费者正在启动...") await asyncio.to_thread(self._consumer.startup) self._is_running = True self.logger.info(f"[{self.topic}] 消费者启动成功") async def shutdown(self): """关闭消费者""" if not self._is_running: return self._is_running = False self.logger.info(f"[{self.topic}] 消费者正在关闭...") await asyncio.to_thread(self._consumer.shutdown) self.logger.info(f"[{self.topic}] 消费者已关闭") async def subscribe(self): """订阅主题""" self.logger.info(f"[{self.topic}] 正在订阅主题: {self.topic_with_instance}") await asyncio.to_thread(self._consumer.subscribe, self.topic_with_instance) self.logger.info(f"[{self.topic}] 订阅成功") async def run(self, async_message_handler: Callable[[Message], Awaitable[None]], stop_event: asyncio.Event): """ 运行主消费循环。 :param async_message_handler: 用户定义的异步消息处理函数。 :param stop_event: 用于优雅停止的 asyncio 事件。 """ self.logger.info(f"[{self.topic}] 进入消费循环,消息不可见时间设置为 {self.invisible_duration_seconds} 秒") while not stop_event.is_set(): try: # 使用 to_thread 在异步环境中运行阻塞的 receive 方法 messages = await asyncio.to_thread(self._consumer.receive, 1, self.invisible_duration_seconds) if not messages: self.logger.debug(f"[{self.topic}] 长轮询超时,未拉取到消息,继续...") continue message = messages[0] self.logger.info(f"[{self.topic}] 拉取到消息 ID: {message.message_id}") try: # 调用业务逻辑处理器 await async_message_handler(message) # 业务逻辑成功后,异步确认消息 <--- 正确的ACK位置 self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} 处理成功,正在ACK...") await asyncio.to_thread(self._consumer.ack, message) self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} ACK成功") except Exception as e: # 业务逻辑失败,打印日志,不ACK,消息将超时后重投 self.logger.error( f"[{self.topic}] 处理消息 ID: {message.message_id} 失败,将不进行ACK,等待重投。错误: {e}", exc_info=True ) except Exception as e: # 拉取消息本身失败(如网络中断) self.logger.error(f"[{self.topic}] 消费循环发生严重错误: {e}", exc_info=True) # 等待一段时间避免频繁失败 await asyncio.sleep(5) self.logger.info(f"[{self.topic}] 消费循环已停止 (收到退出信号)")