async_rocketmq_consumer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import asyncio
  2. from typing import List
  3. from mq_http_sdk.mq_client import MQClient
  4. from mq_http_sdk.mq_consumer import Message
  5. from mq_http_sdk.mq_exception import MQExceptionBase
  6. class AsyncRocketMQConsumer:
  7. """
  8. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  9. - 基于 asyncio 实现原生异步消费模型
  10. - 支持长轮询批量拉取消息
  11. - 手动确认消费
  12. """
  13. def __init__(
  14. self,
  15. endpoint: str,
  16. access_key_id: str,
  17. access_key_secret: str,
  18. instance_id: str,
  19. topic_name: str,
  20. group_id: str,
  21. wait_seconds: int = 3,
  22. batch: int = 1,
  23. ):
  24. self.endpoint = endpoint
  25. self.access_key_id = access_key_id
  26. self.access_key_secret = access_key_secret
  27. self.instance_id = instance_id
  28. self.topic_name = topic_name
  29. self.group_id = group_id
  30. self.wait_seconds = wait_seconds
  31. self.batch = batch
  32. # 初始化客户端
  33. self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
  34. self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
  35. async def receive_messages(self) -> List[Message]:
  36. """
  37. 异步方式拉取消息(内部调用同步 SDK,用 asyncio.to_thread 包装)
  38. """
  39. try:
  40. return await asyncio.to_thread(
  41. self.consumer.receive_message,
  42. self.batch,
  43. self.wait_seconds,
  44. )
  45. except MQExceptionBase as e:
  46. if hasattr(e, "type") and e.type == "MessageNotExist":
  47. return []
  48. else:
  49. raise e
  50. async def ack_message(self, receipt_handle: str) -> None:
  51. """
  52. 确认消息已成功消费
  53. """
  54. try:
  55. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  56. except Exception as e:
  57. raise RuntimeError(f"确认消息失败: {e}")
  58. async def run_forever(self, handler: callable):
  59. """
  60. 启动消费循环,不断拉取消息并调用处理函数
  61. :param handler: async 函数,接收参数 message: Message
  62. """
  63. print(f"[AsyncRocketMQConsumer] 开始消费 Topic={self.topic_name} Group={self.group_id}")
  64. while True:
  65. try:
  66. messages = await self.receive_messages()
  67. for msg in messages:
  68. try:
  69. await handler(msg)
  70. await self.ack_message(msg.receipt_handle)
  71. except Exception as e:
  72. print(f"处理消息失败: {e}\n消息内容: {msg.message_body}")
  73. except Exception as e:
  74. print(f"拉取消息异常: {e}")
  75. await asyncio.sleep(2)