import asyncio import json import os from typing import List, Optional, Callable from mq_http_sdk.mq_client import MQClient from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.consumer import Message from utils.env_loader import load_env, get_env, get_int_env # 如果你有统一封装 # 确保环境加载 load_env() class AsyncRocketMQConsumer: """ 阿里云 RocketMQ HTTP 协议异步消费者封装类 - 支持自动读取环境变量 - 基于 asyncio 实现原生异步消费模型 - 手动确认消费 """ def __init__( self, topic_name: Optional[str], group_id: Optional[str], wait_seconds: Optional[int] = None, batch: Optional[int] = None, ): # 从环境变量读取配置 self.endpoint = get_env("ROCKETMQ_ENDPOINT") self.access_key_id = get_env("ROCKETMQ_AK") self.access_key_secret = get_env("ROCKETMQ_SK") self.instance_id = get_env("ROCKETMQ_INSTANCE_ID") self.wait_seconds = wait_seconds or get_int_env("ROCKETMQ_WAIT_SECONDS", 10) self.batch = batch or get_int_env("ROCKETMQ_BATCH", 1) # 初始化客户端 self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret) self.consumer = self.client.get_consumer(self.instance_id, topic_name, group_id) async def receive_messages(self) -> List[Message]: """异步封装消息拉取""" try: return await asyncio.to_thread( self.consumer.receive_message, self.batch, self.wait_seconds, ) except MQExceptionBase as e: if getattr(e, "type", "") == "MessageNotExist": return [] raise e async def ack_message(self, receipt_handle: str) -> None: """确认消费成功""" try: await asyncio.to_thread(self.consumer.ack_message, [receipt_handle]) except Exception as e: raise RuntimeError(f"确认消息失败: {e}") async def run_forever(self, handler: Callable[[Message], asyncio.Future]): """ 无限循环拉取消息并处理,适合开发调试或小批量任务 :param handler: 异步消息处理函数 async def handler(msg: Message) """ print(f"[AsyncRocketMQConsumer] 启动消费: Topic={self.topic_name}, Group={self.group_id}") while True: try: messages = await self.receive_messages() for msg in messages: try: await handler(msg) await self.ack_message(msg.receipt_handle) except Exception as e: print(f"[处理失败] {e}\n消息: {msg.message_body}") except Exception as e: print(f"[拉取失败] {e}") await asyncio.sleep(2)