123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import asyncio
- from typing import List
- from mq_http_sdk.mq_client import MQClient
- from mq_http_sdk.mq_consumer import Message
- from mq_http_sdk.mq_exception import MQExceptionBase
- class AsyncRocketMQConsumer:
- """
- 阿里云 RocketMQ HTTP 协议异步消费者封装类
- - 基于 asyncio 实现原生异步消费模型
- - 支持长轮询批量拉取消息
- - 手动确认消费
- """
- def __init__(
- self,
- endpoint: str,
- access_key_id: str,
- access_key_secret: str,
- instance_id: str,
- topic_name: str,
- group_id: str,
- wait_seconds: int = 3,
- batch: int = 1,
- ):
- self.endpoint = endpoint
- self.access_key_id = access_key_id
- self.access_key_secret = access_key_secret
- self.instance_id = instance_id
- self.topic_name = topic_name
- self.group_id = group_id
- self.wait_seconds = wait_seconds
- self.batch = batch
- # 初始化客户端
- self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
- self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
- async def receive_messages(self) -> List[Message]:
- """
- 异步方式拉取消息(内部调用同步 SDK,用 asyncio.to_thread 包装)
- """
- try:
- return await asyncio.to_thread(
- self.consumer.receive_message,
- self.batch,
- self.wait_seconds,
- )
- except MQExceptionBase as e:
- if hasattr(e, "type") and e.type == "MessageNotExist":
- return []
- else:
- raise e
- async def ack_message(self, receipt_handle: str) -> None:
- """
- 确认消息已成功消费
- """
- try:
- await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
- except Exception as e:
- raise RuntimeError(f"确认消息失败: {e}")
- async def run_forever(self, handler: callable):
- """
- 启动消费循环,不断拉取消息并调用处理函数
- :param handler: async 函数,接收参数 message: Message
- """
- print(f"[AsyncRocketMQConsumer] 开始消费 Topic={self.topic_name} Group={self.group_id}")
- while True:
- try:
- messages = await self.receive_messages()
- for msg in messages:
- try:
- await handler(msg)
- await self.ack_message(msg.receipt_handle)
- except Exception as e:
- print(f"处理消息失败: {e}\n消息内容: {msg.message_body}")
- except Exception as e:
- print(f"拉取消息异常: {e}")
- await asyncio.sleep(2)
|