async_rocketmq_consumer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import asyncio
  2. import json
  3. from typing import List, Optional
  4. from mq_http_sdk.mq_client import MQClient
  5. from mq_http_sdk.mq_exception import MQExceptionBase
  6. from mq_http_sdk.mq_consumer import Message
  7. class AsyncRocketMQConsumer:
  8. """
  9. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  10. - 基于 asyncio 实现原生异步消费模型
  11. - 支持长轮询批量拉取消息
  12. - 手动确认消费
  13. """
  14. def __init__(
  15. self,
  16. endpoint: str,
  17. access_key_id: str,
  18. access_key_secret: str,
  19. instance_id: str,
  20. topic_name: str,
  21. group_id: str,
  22. wait_seconds: int = 3,
  23. batch: int = 1,
  24. ):
  25. self.endpoint = endpoint
  26. self.access_key_id = access_key_id
  27. self.access_key_secret = access_key_secret
  28. self.instance_id = instance_id
  29. self.topic_name = topic_name
  30. self.group_id = group_id
  31. self.wait_seconds = wait_seconds
  32. self.batch = batch
  33. # 初始化客户端
  34. self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
  35. self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
  36. async def receive_messages(self) -> List[Message]:
  37. """
  38. 异步方式拉取消息(内部调用同步 SDK,用 asyncio.to_thread 包装)
  39. """
  40. try:
  41. return await asyncio.to_thread(
  42. self.consumer.receive_message,
  43. self.batch,
  44. self.wait_seconds,
  45. )
  46. except MQExceptionBase as e:
  47. if hasattr(e, "type") and e.type == "MessageNotExist":
  48. return []
  49. else:
  50. raise e
  51. async def ack_message(self, receipt_handle: str) -> None:
  52. """
  53. 确认消息已成功消费
  54. """
  55. try:
  56. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  57. except Exception as e:
  58. raise RuntimeError(f"确认消息失败: {e}")
  59. async def run_forever(self, handler: callable):
  60. """
  61. 启动消费循环,不断拉取消息并调用处理函数
  62. :param handler: async 函数,接收参数 message: Message
  63. """
  64. print(f"[AsyncRocketMQConsumer] 开始消费 Topic={self.topic_name} Group={self.group_id}")
  65. while True:
  66. try:
  67. messages = await self.receive_messages()
  68. for msg in messages:
  69. try:
  70. await handler(msg)
  71. await self.ack_message(msg.receipt_handle)
  72. except Exception as e:
  73. print(f"处理消息失败: {e}\n消息内容: {msg.message_body}")
  74. except Exception as e:
  75. print(f"拉取消息异常: {e}")
  76. await asyncio.sleep(2)