async_mq_consumer.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import asyncio
  2. import logging
  3. from typing import List, Optional, Callable, Any
  4. from mq_http_sdk.mq_client import MQClient
  5. from mq_http_sdk.mq_consumer import Message
  6. from mq_http_sdk.mq_exception import MQExceptionBase
  7. from config import settings
  8. from core.utils.log.logger_manager import LoggerManager
  9. class AsyncRocketMQConsumer:
  10. """
  11. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  12. - 支持自动读取环境变量
  13. - 基于 asyncio 实现原生异步消费模型
  14. - 手动确认消费
  15. - 提供单条消息处理模式
  16. """
  17. def __init__(
  18. self,
  19. topic_name: Optional[str],
  20. group_id: Optional[str],
  21. wait_seconds: Optional[int] = None,
  22. ):
  23. # 从环境变量读取配置
  24. self.endpoint = settings.ROCKETMQ_ENDPOINT
  25. self.access_key_id = settings.ROCKETMQ_ACCESS_KEY_ID
  26. self.access_key_secret = settings.ROCKETMQ_ACCESS_KEY_SECRET
  27. self.instance_id = settings.ROCKETMQ_INSTANCE_ID
  28. self.wait_seconds = wait_seconds or settings.ROCKETMQ_WAIT_SECONDS
  29. self.topic_name = topic_name
  30. self.group_id = group_id
  31. # 初始化客户端
  32. self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
  33. self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
  34. self.logger = LoggerManager.get_logger()
  35. self.aliyun_logger = LoggerManager.get_aliyun_logger()
  36. async def receive_message(self) -> Optional[Message]:
  37. """异步拉取单条消息"""
  38. try:
  39. self.logger.debug(f"开始拉取单条消息,等待时间: {self.wait_seconds}秒")
  40. messages = await asyncio.to_thread(
  41. self.consumer.consume_message,
  42. settings.ROCKETMQ_BATCH,
  43. self.wait_seconds,
  44. )
  45. if messages:
  46. self.logger.debug(f"成功拉取到1条消息")
  47. return messages[0]
  48. return None
  49. except MQExceptionBase as e:
  50. if getattr(e, "type", "") == "MessageNotExist":
  51. # 更友好的日志输出,使用INFO级别而非ERROR
  52. self.logger.info("当前没有可消费的消息,继续等待...")
  53. return None
  54. # 其他类型的异常仍按错误处理
  55. self.logger.error(f"拉取消息失败: {e}")
  56. raise e
  57. async def ack_message(self, receipt_handle: str) -> None:
  58. """确认消费成功"""
  59. try:
  60. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  61. self.logger.debug(f"消息确认成功")
  62. except Exception as e:
  63. self.logger.error(f"确认消息失败: {e}")
  64. raise RuntimeError(f"确认消息失败: {e}")
  65. async def process_single_message(self, handler: Callable[[Message], Any]) -> bool:
  66. """
  67. 处理单条消息的完整流程:拉取、处理、确认
  68. 返回值:是否成功处理并确认消息
  69. """
  70. message = await self.receive_message()
  71. if not message:
  72. return False
  73. try:
  74. self.logger.info(f"收到消息 ID: {message.message_id}")
  75. # 执行消息处理任务
  76. await handler(message)
  77. # 任务成功后确认消息
  78. await self.ack_message(message.receipt_handle)
  79. self.logger.info(f"消息 ID: {message.message_id} 处理并确认成功")
  80. return True
  81. except Exception as e:
  82. self.logger.error(f"处理消息失败: {e}", exc_info=True)
  83. # 消息处理失败,不会确认消息,RocketMQ会在可见时间后重新投递
  84. return False
  85. async def run_single_threaded(self, handler: Callable[[Message], Any], max_retries: int = 3):
  86. """
  87. 单线程模式处理消息:获取一条消息,处理完成后再获取下一条
  88. :param handler: 异步消息处理函数 async def handler(msg: Message)
  89. :param max_retries: 消息处理失败后的最大重试次数
  90. """
  91. self.logger.info(f"[AsyncRocketMQConsumer] 启动单线程消费模式: Topic={self.topic_name}, Group={self.group_id}")
  92. while True:
  93. try:
  94. success = await self.process_single_message(handler)
  95. if not success:
  96. # 没有消息或处理失败,适当等待避免频繁请求
  97. await asyncio.sleep(1)
  98. except Exception as e:
  99. self.logger.error(f"消费循环异常: {e}", exc_info=True)
  100. await asyncio.sleep(5) # 发生异常时等待较长时间
  101. async def process_message(message: Message) -> None:
  102. """示例消息处理函数"""
  103. self.logger.info(f"开始处理消息: {message.message_body}")
  104. # 模拟处理耗时操作
  105. await asyncio.sleep(2)
  106. self.logger.info(f"消息处理完成")
  107. if __name__ == '__main__':
  108. async def run_consumer():
  109. consumer = AsyncRocketMQConsumer(
  110. topic_name="ynfqmm_recommend_prod",
  111. group_id="ynfqmm_recommend_prod",
  112. wait_seconds=10, # 长轮询等待时间
  113. )
  114. await consumer.run_single_threaded(process_message)
  115. asyncio.run(run_consumer())