rocketmq_consumer_wrapper.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # 文件: core/mq/rocketmq_consumer_wrapper.py (新文件)
  2. import asyncio
  3. import logging
  4. from typing import Callable, Awaitable
  5. from rocketmq.client import ClientConfiguration, Credentials, SimpleConsumer
  6. from rocketmq.model.message import Message
  7. from config import settings
  8. from core.utils.log.logger_manager import LoggerManager
  9. class RocketMQSimpleConsumerWrapper:
  10. """
  11. rocketmq-client-python 的异步封装。
  12. - 负责连接、订阅和优雅关闭。
  13. - 提供一个异步运行的消费循环,该循环:
  14. 1. 以异步方式拉取消息。
  15. 2. 异步处理函数。
  16. 3. 在处理函数成功完成后,自动异步确认消息。
  17. 4. 如果处理函数失败,则不确认,等待消息重试。
  18. """
  19. def __init__(self, group_id: str, topic: str):
  20. self.logger = LoggerManager.get_logger()
  21. self.group_id = group_id
  22. self.topic = topic
  23. # 从配置中读取连接信息
  24. credentials = Credentials(settings.ROCKETMQ_ACCESS_KEY_ID, settings.ROCKETMQ_ACCESS_KEY_SECRET)
  25. config = ClientConfiguration(
  26. settings.ROCKETMQ_ENDPOINT,
  27. credentials,
  28. settings.ROCKETMQ_INSTANCE_ID # 官方SDK的namespace参数用于实例ID
  29. )
  30. # 阿里云商业版 topic 需要带上实例ID前缀
  31. self.topic_with_instance = f"{settings.ROCKETMQ_INSTANCE_ID}%%{self.topic}"
  32. # wait_seconds 对应 receive 的 long-polling 时间
  33. self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS
  34. # 消息不可见时长,先设置30分钟
  35. self.invisible_duration_seconds = 1800
  36. self._consumer = SimpleConsumer(config, self.group_id)
  37. self._is_running = False
  38. async def startup(self):
  39. """启动消费者"""
  40. if self._is_running:
  41. return
  42. self.logger.info(f"[{self.topic}] 消费者正在启动...")
  43. await asyncio.to_thread(self._consumer.startup)
  44. self._is_running = True
  45. self.logger.info(f"[{self.topic}] 消费者启动成功")
  46. async def shutdown(self):
  47. """关闭消费者"""
  48. if not self._is_running:
  49. return
  50. self._is_running = False
  51. self.logger.info(f"[{self.topic}] 消费者正在关闭...")
  52. await asyncio.to_thread(self._consumer.shutdown)
  53. self.logger.info(f"[{self.topic}] 消费者已关闭")
  54. async def subscribe(self):
  55. """订阅主题"""
  56. self.logger.info(f"[{self.topic}] 正在订阅主题: {self.topic_with_instance}")
  57. await asyncio.to_thread(self._consumer.subscribe, self.topic_with_instance)
  58. self.logger.info(f"[{self.topic}] 订阅成功")
  59. async def run(self, async_message_handler: Callable[[Message], Awaitable[None]], stop_event: asyncio.Event):
  60. """
  61. 运行主消费循环。
  62. :param async_message_handler: 用户定义的异步消息处理函数。
  63. :param stop_event: 用于优雅停止的 asyncio 事件。
  64. """
  65. self.logger.info(f"[{self.topic}] 进入消费循环,消息不可见时间设置为 {self.invisible_duration_seconds} 秒")
  66. while not stop_event.is_set():
  67. try:
  68. # 使用 to_thread 在异步环境中运行阻塞的 receive 方法
  69. messages = await asyncio.to_thread(self._consumer.receive, 1, self.invisible_duration_seconds)
  70. if not messages:
  71. self.logger.debug(f"[{self.topic}] 长轮询超时,未拉取到消息,继续...")
  72. continue
  73. message = messages[0]
  74. self.logger.info(f"[{self.topic}] 拉取到消息 ID: {message.message_id}")
  75. try:
  76. # 调用业务逻辑处理器
  77. await async_message_handler(message)
  78. # 业务逻辑成功后,异步确认消息 <--- 正确的ACK位置
  79. self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} 处理成功,正在ACK...")
  80. await asyncio.to_thread(self._consumer.ack, message)
  81. self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} ACK成功")
  82. except Exception as e:
  83. # 业务逻辑失败,打印日志,不ACK,消息将超时后重投
  84. self.logger.error(
  85. f"[{self.topic}] 处理消息 ID: {message.message_id} 失败,将不进行ACK,等待重投。错误: {e}",
  86. exc_info=True
  87. )
  88. except Exception as e:
  89. # 拉取消息本身失败(如网络中断)
  90. self.logger.error(f"[{self.topic}] 消费循环发生严重错误: {e}", exc_info=True)
  91. # 等待一段时间避免频繁失败
  92. await asyncio.sleep(5)
  93. self.logger.info(f"[{self.topic}] 消费循环已停止 (收到退出信号)")