12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import asyncio
- import json
- import os
- from typing import List, Optional, Callable
- from mq_http_sdk.mq_client import MQClient
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.consumer import Message
- from utils.env_loader import load_env, get_env, get_int_env # 如果你有统一封装
- # 确保环境加载
- load_env()
- class AsyncRocketMQConsumer:
- """
- 阿里云 RocketMQ HTTP 协议异步消费者封装类
- - 支持自动读取环境变量
- - 基于 asyncio 实现原生异步消费模型
- - 手动确认消费
- """
- def __init__(
- self,
- topic_name: Optional[str],
- group_id: Optional[str],
- wait_seconds: Optional[int] = None,
- batch: Optional[int] = None,
- ):
- # 从环境变量读取配置
- self.endpoint = get_env("ROCKETMQ_ENDPOINT")
- self.access_key_id = get_env("ROCKETMQ_AK")
- self.access_key_secret = get_env("ROCKETMQ_SK")
- self.instance_id = get_env("ROCKETMQ_INSTANCE_ID")
- self.wait_seconds = wait_seconds or get_int_env("ROCKETMQ_WAIT_SECONDS", 10)
- self.batch = batch or get_int_env("ROCKETMQ_BATCH", 1)
- # 初始化客户端
- self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
- self.consumer = self.client.get_consumer(self.instance_id, topic_name, group_id)
- async def receive_messages(self) -> List[Message]:
- """异步封装消息拉取"""
- try:
- return await asyncio.to_thread(
- self.consumer.receive_message,
- self.batch,
- self.wait_seconds,
- )
- except MQExceptionBase as e:
- if getattr(e, "type", "") == "MessageNotExist":
- return []
- raise e
- async def ack_message(self, receipt_handle: str) -> None:
- """确认消费成功"""
- try:
- await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
- except Exception as e:
- raise RuntimeError(f"确认消息失败: {e}")
- async def run_forever(self, handler: Callable[[Message], asyncio.Future]):
- """
- 无限循环拉取消息并处理,适合开发调试或小批量任务
- :param handler: 异步消息处理函数 async def handler(msg: Message)
- """
- print(f"[AsyncRocketMQConsumer] 启动消费: Topic={self.topic_name}, Group={self.group_id}")
- while True:
- try:
- messages = await self.receive_messages()
- for msg in messages:
- try:
- await handler(msg)
- await self.ack_message(msg.receipt_handle)
- except Exception as e:
- print(f"[处理失败] {e}\n消息: {msg.message_body}")
- except Exception as e:
- print(f"[拉取失败] {e}")
- await asyncio.sleep(2)
|