rocketmq_consumer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import asyncio
  2. from typing import List, Optional, Callable
  3. from mq_http_sdk.mq_client import MQClient
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. from mq_http_sdk.mq_consumer import Message
  6. from config import settings
  7. class AsyncRocketMQConsumer:
  8. """
  9. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  10. - 支持自动读取环境变量
  11. - 基于 asyncio 实现原生异步消费模型
  12. - 手动确认消费
  13. """
  14. def __init__(
  15. self,
  16. topic_name: Optional[str],
  17. group_id: Optional[str],
  18. wait_seconds: Optional[int] = None,
  19. batch: Optional[int] = None,
  20. ):
  21. # 从环境变量读取配置
  22. self.endpoint = settings.ROCKETMQ_ENDPOINT
  23. self.access_key_id = settings.ROCKETMQ_ACCESS_KEY_ID
  24. self.access_key_secret = settings.ROCKETMQ_ACCESS_KEY_SECRET
  25. self.instance_id = settings.ROCKETMQ_INSTANCE_ID
  26. self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS
  27. self.batch = batch or settings.ROCKETMQ_BATCH
  28. self.topic_name = topic_name
  29. self.group_id = group_id
  30. # 初始化客户端
  31. self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
  32. self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
  33. async def receive_messages(self) -> List[Message]:
  34. """异步封装消息拉取"""
  35. try:
  36. return await asyncio.to_thread(
  37. self.consumer.consume_message,
  38. self.batch,
  39. self.wait_seconds,
  40. )
  41. except MQExceptionBase as e:
  42. if getattr(e, "type", "") == "MessageNotExist":
  43. return []
  44. raise e
  45. async def ack_message(self, receipt_handle: str) -> None:
  46. """确认消费成功"""
  47. try:
  48. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  49. except Exception as e:
  50. raise RuntimeError(f"确认消息失败: {e}")
  51. async def run_forever(self, handler: Callable[[Message], asyncio.Future]):
  52. """
  53. 无限循环拉取消息并处理,适合开发调试或小批量任务
  54. :param handler: 异步消息处理函数 async def handler(msg: Message)
  55. """
  56. print(f"[AsyncRocketMQConsumer] 启动消费: Topic={self.topic_name}, Group={self.group_id}")
  57. while True:
  58. try:
  59. messages = await self.receive_messages()
  60. for msg in messages:
  61. try:
  62. await handler(msg)
  63. await self.ack_message(msg.receipt_handle)
  64. except Exception as e:
  65. print(f"[处理失败] {e}\n消息: {msg.message_body}")
  66. except Exception as e:
  67. print(f"[拉取失败] {e}")
  68. await asyncio.sleep(2)