import asyncio from typing import List from mq_http_sdk.mq_client import MQClient from mq_http_sdk.mq_consumer import Message from mq_http_sdk.mq_exception import MQExceptionBase class AsyncRocketMQConsumer: """ 阿里云 RocketMQ HTTP 协议异步消费者封装类 - 基于 asyncio 实现原生异步消费模型 - 支持长轮询批量拉取消息 - 手动确认消费 """ def __init__( self, endpoint: str, access_key_id: str, access_key_secret: str, instance_id: str, topic_name: str, group_id: str, wait_seconds: int = 3, batch: int = 1, ): self.endpoint = endpoint self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.instance_id = instance_id self.topic_name = topic_name self.group_id = group_id self.wait_seconds = wait_seconds self.batch = batch # 初始化客户端 self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret) self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id) async def receive_messages(self) -> List[Message]: """ 异步方式拉取消息(内部调用同步 SDK,用 asyncio.to_thread 包装) """ try: return await asyncio.to_thread( self.consumer.receive_message, self.batch, self.wait_seconds, ) except MQExceptionBase as e: if hasattr(e, "type") and e.type == "MessageNotExist": return [] else: raise e async def ack_message(self, receipt_handle: str) -> None: """ 确认消息已成功消费 """ try: await asyncio.to_thread(self.consumer.ack_message, [receipt_handle]) except Exception as e: raise RuntimeError(f"确认消息失败: {e}") async def run_forever(self, handler: callable): """ 启动消费循环,不断拉取消息并调用处理函数 :param handler: async 函数,接收参数 message: Message """ print(f"[AsyncRocketMQConsumer] 开始消费 Topic={self.topic_name} Group={self.group_id}") while True: try: messages = await self.receive_messages() for msg in messages: try: await handler(msg) await self.ack_message(msg.receipt_handle) except Exception as e: print(f"处理消息失败: {e}\n消息内容: {msg.message_body}") except Exception as e: print(f"拉取消息异常: {e}") await asyncio.sleep(2)