import asyncio import logging from typing import List, Optional, Callable, Any from mq_http_sdk.mq_client import MQClient from mq_http_sdk.mq_consumer import Message from mq_http_sdk.mq_exception import MQExceptionBase from config import settings from core.utils.log.logger_manager import LoggerManager class AsyncRocketMQConsumer: """ 阿里云 RocketMQ HTTP 协议异步消费者封装类 - 支持自动读取环境变量 - 基于 asyncio 实现原生异步消费模型 - 手动确认消费 - 提供单条消息处理模式 """ def __init__( self, topic_name: Optional[str], group_id: Optional[str], wait_seconds: Optional[int] = None, ): # 从环境变量读取配置 self.endpoint = settings.ROCKETMQ_ENDPOINT self.access_key_id = settings.ROCKETMQ_ACCESS_KEY_ID self.access_key_secret = settings.ROCKETMQ_ACCESS_KEY_SECRET self.instance_id = settings.ROCKETMQ_INSTANCE_ID self.wait_seconds = wait_seconds or settings.ROCKETMQ_WAIT_SECONDS self.topic_name = topic_name self.group_id = group_id # 初始化客户端 self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret) self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id) self.logger = LoggerManager.get_logger() self.aliyun_logger = LoggerManager.get_aliyun_logger() async def receive_message(self) -> Optional[Message]: """异步拉取单条消息""" try: self.logger.debug(f"[{self.topic_name}]开始拉取单条消息,等待时间: {self.wait_seconds}秒") messages = await asyncio.to_thread( self.consumer.consume_message, settings.ROCKETMQ_BATCH, self.wait_seconds, ) if messages: self.logger.debug(f"[{self.topic_name}]成功拉取到1条消息") return messages[0] return None except MQExceptionBase as e: if getattr(e, "type", "") == "MessageNotExist": # 更友好的日志输出,使用INFO级别而非ERROR self.logger.info(f"[{self.topic_name}]当前没有可消费的消息,继续等待...") return None # 其他类型的异常仍按错误处理 self.logger.error(f"[{self.topic_name}]拉取消息失败: {e}") raise e async def ack_message(self, receipt_handle: str) -> None: """确认消费成功""" try: await asyncio.to_thread(self.consumer.ack_message, [receipt_handle]) self.logger.debug(f"[{self.topic_name}]消息确认成功") except Exception as e: self.logger.error(f"[{self.topic_name}]确认消息失败: {e}") raise RuntimeError(f"[{self.topic_name}]确认消息失败: {e}") async def process_single_message(self, handler: Callable[[Message], Any]) -> bool: """ 处理单条消息的完整流程:拉取、处理、确认 返回值:是否成功处理并确认消息 """ message = await self.receive_message() if not message: return False try: self.logger.info(f"[{self.topic_name}]收到消息 ID: {message.message_id}") # 执行消息处理任务 await handler(message) # 任务成功后确认消息 await self.ack_message(message.receipt_handle) self.logger.info(f"[{self.topic_name}]消息 ID: {message.message_id} 处理并确认成功") return True except Exception as e: self.logger.error(f"[{self.topic_name}]处理消息失败: {e}", exc_info=True) # 消息处理失败,不会确认消息,RocketMQ会在可见时间后重新投递 return False async def run_single_threaded(self, handler: Callable[[Message], Any], max_retries: int = 3): """ 单线程模式处理消息:获取一条消息,处理完成后再获取下一条 :param handler: 异步消息处理函数 async def handler(msg: Message) :param max_retries: 消息处理失败后的最大重试次数 """ self.logger.info(f"[AsyncRocketMQConsumer] 启动单线程消费模式: Topic={self.topic_name}, Group={self.group_id}") while True: try: success = await self.process_single_message(handler) if not success: # 没有消息或处理失败,适当等待避免频繁请求 await asyncio.sleep(1) except Exception as e: self.logger.error(f"消费循环异常: {e}", exc_info=True) await asyncio.sleep(5) # 发生异常时等待较长时间 async def process_message(message: Message) -> None: """示例消息处理函数""" # self.logger.info(f"开始处理消息: {message.message_body}") # 模拟处理耗时操作 await asyncio.sleep(2) # self.logger.info(f"消息处理完成") if __name__ == '__main__': async def run_consumer(): consumer = AsyncRocketMQConsumer( topic_name="ynfqmm_recommend_prod", group_id="ynfqmm_recommend_prod", wait_seconds=30, # 长轮询等待时间 ) await consumer.run_single_threaded(process_message) asyncio.run(run_consumer())