async_mq_consumer.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import asyncio
  2. import logging
  3. import traceback
  4. from typing import List, Optional, Callable, Any
  5. from mq_http_sdk.mq_client import MQClient
  6. from mq_http_sdk.mq_consumer import Message
  7. from mq_http_sdk.mq_exception import MQExceptionBase, MQClientNetworkException
  8. from config import settings
  9. from core.utils.log.logger_manager import LoggerManager
  10. class AsyncRocketMQConsumer:
  11. """
  12. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  13. - 支持自动读取环境变量
  14. - 基于 asyncio 实现原生异步消费模型
  15. - 手动确认消费
  16. - 提供单条消息处理模式
  17. """
  18. def __init__(
  19. self,
  20. topic_name: Optional[str],
  21. group_id: Optional[str],
  22. wait_seconds: Optional[int] = None,
  23. ):
  24. # 从环境变量读取配置
  25. self.endpoint = settings.ROCKETMQ_ENDPOINT
  26. self.access_key_id = settings.ROCKETMQ_ACCESS_KEY_ID
  27. self.access_key_secret = settings.ROCKETMQ_ACCESS_KEY_SECRET
  28. self.instance_id = settings.ROCKETMQ_INSTANCE_ID
  29. self.wait_seconds = wait_seconds or settings.ROCKETMQ_WAIT_SECONDS
  30. self.topic_name = topic_name
  31. self.group_id = group_id
  32. self.logger = LoggerManager.get_logger()
  33. self.aliyun_logger = LoggerManager.get_aliyun_logger()
  34. self._create_consumer()
  35. def _create_consumer(self):
  36. self.client = MQClient(
  37. self.endpoint,
  38. self.access_key_id,
  39. self.access_key_secret
  40. )
  41. self.consumer = self.client.get_consumer(
  42. self.instance_id,
  43. self.topic_name,
  44. self.group_id
  45. )
  46. self.logger.info(f"[{self.topic_name}] MQ Consumer 初始化完成")
  47. async def receive_message(self) -> Optional[Message]:
  48. """异步拉取单条消息"""
  49. try:
  50. self.logger.info(f"[{self.topic_name}]开始拉取单条消息,等待时间: {self.wait_seconds}秒")
  51. messages = await asyncio.to_thread(
  52. self.consumer.consume_message,
  53. settings.ROCKETMQ_BATCH,
  54. self.wait_seconds,
  55. )
  56. if messages:
  57. self.logger.info(f"[{self.topic_name}]成功拉取到1条消息")
  58. return messages[0]
  59. return None
  60. except MQClientNetworkException as e:
  61. self.logger.error(f"[{self.topic_name}] MQClientNetworkException 发生,将自动重连: {e}")
  62. await self._handle_reconnect()
  63. return None
  64. except MQExceptionBase as e:
  65. if e.type == "MessageNotExist":
  66. # 没有新消息
  67. self.logger.warning(f"[{self.topic_name}] 消息为空")
  68. await asyncio.sleep(5)
  69. return None
  70. else:
  71. self.logger.error(f"[{self.topic_name}] MQExceptionBase 错误: {e} \n {traceback.extract_tb()}")
  72. return None
  73. except Exception as e:
  74. self.logger.error(f"[{self.topic_name}] 未知异常: {e} \n {traceback.extract_tb()}")
  75. return None
  76. async def _handle_reconnect(self):
  77. """
  78. 在网络断开时自动重建 MQConsumer 实例
  79. """
  80. try:
  81. self.logger.warning(f"[{self.topic_name}] 正在重新初始化 MQ Consumer 实例以重连...")
  82. self._create_consumer()
  83. self.logger.warning(f"[{self.topic_name}] MQ Consumer 重连成功")
  84. except Exception as e:
  85. self.logger.error(f"[{self.topic_name}] MQ Consumer 重连失败: {e}", exc_info=True)
  86. await asyncio.sleep(5)
  87. async def ack_message(self, receipt_handle: str) -> None:
  88. """确认消费成功"""
  89. try:
  90. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  91. self.logger.debug(f"[{self.topic_name}]消息确认成功")
  92. except Exception as e:
  93. self.logger.error(f"[{self.topic_name}]确认消息失败: {e}")
  94. raise RuntimeError(f"[{self.topic_name}]确认消息失败: {e}")
  95. async def process_single_message(self, handler: Callable[[Message], Any]) -> bool:
  96. """
  97. 处理单条消息的完整流程:拉取、处理、确认
  98. 返回值:是否成功处理并确认消息
  99. """
  100. message = await self.receive_message()
  101. if not message:
  102. return False
  103. try:
  104. self.logger.info(f"[{self.topic_name}]收到消息 ID: {message.message_id}")
  105. # 执行消息处理任务
  106. await handler(message)
  107. # 任务成功后确认消息
  108. await self.ack_message(message.receipt_handle)
  109. self.logger.info(f"[{self.topic_name}]消息 ID: {message.message_id} 处理并确认成功")
  110. return True
  111. except Exception as e:
  112. self.logger.error(f"[{self.topic_name}]处理消息失败: {e}", exc_info=True)
  113. # 消息处理失败,不会确认消息,RocketMQ会在可见时间后重新投递
  114. return False
  115. async def run_single_threaded(self, handler: Callable[[Message], Any], max_retries: int = 3):
  116. """
  117. 单线程模式处理消息:获取一条消息,处理完成后再获取下一条
  118. :param handler: 异步消息处理函数 async def handler(msg: Message)
  119. :param max_retries: 消息处理失败后的最大重试次数
  120. """
  121. self.logger.info(f"[AsyncRocketMQConsumer] 启动单线程消费模式: Topic={self.topic_name}, Group={self.group_id}")
  122. while True:
  123. try:
  124. success = await self.process_single_message(handler)
  125. if not success:
  126. # 没有消息或处理失败,适当等待避免频繁请求
  127. await asyncio.sleep(1)
  128. except Exception as e:
  129. self.logger.error(f"消费循环异常: {e}", exc_info=True)
  130. await asyncio.sleep(5) # 发生异常时等待较长时间
  131. async def process_message(message: Message) -> None:
  132. """示例消息处理函数"""
  133. # self.logger.info(f"开始处理消息: {message.message_body}")
  134. # 模拟处理耗时操作
  135. await asyncio.sleep(2)
  136. # self.logger.info(f"消息处理完成")
  137. if __name__ == '__main__':
  138. async def run_consumer():
  139. consumer = AsyncRocketMQConsumer(
  140. topic_name="ynfqmm_recommend_prod",
  141. group_id="ynfqmm_recommend_prod",
  142. wait_seconds=30, # 长轮询等待时间
  143. )
  144. await consumer.run_single_threaded(process_message)
  145. asyncio.run(run_consumer())