rocketmq_consumer.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import asyncio
  2. import json
  3. import os
  4. from typing import List, Optional, Callable
  5. from mq_http_sdk.mq_client import MQClient
  6. from mq_http_sdk.mq_exception import MQExceptionBase
  7. from mq_http_sdk.consumer import Message
  8. from utils.env_loader import load_env, get_env, get_int_env # 如果你有统一封装
  9. # 确保环境加载
  10. load_env()
  11. class AsyncRocketMQConsumer:
  12. """
  13. 阿里云 RocketMQ HTTP 协议异步消费者封装类
  14. - 支持自动读取环境变量
  15. - 基于 asyncio 实现原生异步消费模型
  16. - 手动确认消费
  17. """
  18. def __init__(
  19. self,
  20. topic_name: Optional[str],
  21. group_id: Optional[str],
  22. wait_seconds: Optional[int] = None,
  23. batch: Optional[int] = None,
  24. ):
  25. # 从环境变量读取配置
  26. self.endpoint = get_env("ROCKETMQ_ENDPOINT")
  27. self.access_key_id = get_env("ROCKETMQ_AK")
  28. self.access_key_secret = get_env("ROCKETMQ_SK")
  29. self.instance_id = get_env("ROCKETMQ_INSTANCE_ID")
  30. self.wait_seconds = wait_seconds or get_int_env("ROCKETMQ_WAIT_SECONDS", 10)
  31. self.batch = batch or get_int_env("ROCKETMQ_BATCH", 1)
  32. # 初始化客户端
  33. self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
  34. self.consumer = self.client.get_consumer(self.instance_id, topic_name, group_id)
  35. async def receive_messages(self) -> List[Message]:
  36. """异步封装消息拉取"""
  37. try:
  38. return await asyncio.to_thread(
  39. self.consumer.receive_message,
  40. self.batch,
  41. self.wait_seconds,
  42. )
  43. except MQExceptionBase as e:
  44. if getattr(e, "type", "") == "MessageNotExist":
  45. return []
  46. raise e
  47. async def ack_message(self, receipt_handle: str) -> None:
  48. """确认消费成功"""
  49. try:
  50. await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
  51. except Exception as e:
  52. raise RuntimeError(f"确认消息失败: {e}")
  53. async def run_forever(self, handler: Callable[[Message], asyncio.Future]):
  54. """
  55. 无限循环拉取消息并处理,适合开发调试或小批量任务
  56. :param handler: 异步消息处理函数 async def handler(msg: Message)
  57. """
  58. print(f"[AsyncRocketMQConsumer] 启动消费: Topic={self.topic_name}, Group={self.group_id}")
  59. while True:
  60. try:
  61. messages = await self.receive_messages()
  62. for msg in messages:
  63. try:
  64. await handler(msg)
  65. await self.ack_message(msg.receipt_handle)
  66. except Exception as e:
  67. print(f"[处理失败] {e}\n消息: {msg.message_body}")
  68. except Exception as e:
  69. print(f"[拉取失败] {e}")
  70. await asyncio.sleep(2)