rocket_mq_pullconsumer.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # import asyncio
  2. # import logging
  3. # from typing import Callable, Awaitable
  4. #
  5. # from rocketmq.client import PushConsumer, Message
  6. # from core.utils.log.logger_manager import LoggerManager
  7. #
  8. # logger = LoggerManager.get_logger()
  9. # aliyun_logger = LoggerManager.get_aliyun_logger()
  10. #
  11. #
  12. # class RocketMQAsyncPullConsumer:
  13. # def __init__(self, topic: str, group_id: str, max_workers: int = 5):
  14. # """
  15. # RocketMQ TCP SDK 异步拉取消费者封装
  16. # - 拉取消息后执行任务,执行完成再确认 ack
  17. # - 使用 asyncio + Semaphore 控制并发执行
  18. # """
  19. # self.topic = topic
  20. # self.group_id = group_id
  21. # self.max_workers = max_workers
  22. # self.semaphore = asyncio.Semaphore(max_workers)
  23. # self.shutdown_event = asyncio.Event()
  24. #
  25. # # 初始化 RocketMQ Pull Consumer
  26. # self.consumer = PushConsumer(group_id)
  27. # self.consumer.set_name_server_address('')
  28. # self.consumer.subscribe(topic)
  29. #
  30. # async def process_message(self, msg: Message, handler: Callable[[Message], Awaitable[None]]):
  31. # async with self.semaphore:
  32. # try:
  33. # logger.info(f"[{self.topic}] 收到消息: {msg.id}")
  34. # await handler(msg)
  35. # msg.ack()
  36. # logger.info(f"[{self.topic}] 消息 {msg.id} 执行完毕并已确认 ack")
  37. # except Exception as e:
  38. # logger.error(f"[{self.topic}] 消息处理失败: {e}", exc_info=True)
  39. # aliyun_logger.logging(
  40. # code="9001",
  41. # message=f"[{self.topic}] 消息处理失败: {e}",
  42. # data={"msg_id": msg.id, "body": msg.body.decode()},
  43. # )
  44. #
  45. # async def _consume_loop(self, handler: Callable[[Message], Awaitable[None]]):
  46. # loop = asyncio.get_running_loop()
  47. #
  48. # def callback(msg: Message):
  49. # asyncio.run_coroutine_threadsafe(self.process_message(msg, handler), loop)
  50. # return None
  51. #
  52. # self.consumer.register_message_listener(callback)
  53. # self.consumer.start()
  54. #
  55. # logger.info(f"[{self.topic}] RocketMQ PullConsumer 已启动,开始消费消息...")
  56. # aliyun_logger.logging(
  57. # code="1500",
  58. # message=f"[{self.topic}] RocketMQ PullConsumer 已启动",
  59. # )
  60. #
  61. # await self.shutdown_event.wait()
  62. #
  63. # self.consumer.shutdown()
  64. # logger.info(f"[{self.topic}] PullConsumer 已优雅退出")
  65. # aliyun_logger.logging(
  66. # code="1602",
  67. # message=f"[{self.topic}] PullConsumer 已优雅退出",
  68. # )
  69. #
  70. # def stop(self):
  71. # """外部调用以优雅退出"""
  72. # self.shutdown_event.set()
  73. #
  74. # async def run(self, handler: Callable[[Message], Awaitable[None]]):
  75. # await self._consume_loop(handler)
  76. #
  77. #
  78. # # === 使用示例 ===
  79. # if __name__ == '__main__':
  80. # import signal
  81. #
  82. # async def handle_message(msg: Message):
  83. # logger.info(f"处理消息内容: {msg.body.decode()}")
  84. # await asyncio.sleep(2) # 模拟执行耗时任务
  85. #
  86. # consumer = RocketMQAsyncPullConsumer(
  87. # topic="ynfqmm_recommend_prod",
  88. # group_id="ynfqmm_recommend_prod",
  89. # max_workers=5
  90. # )
  91. #
  92. # loop = asyncio.get_event_loop()
  93. #
  94. # def shutdown_handler():
  95. # logger.warning("收到退出信号,准备优雅关闭消费者...")
  96. # consumer.stop()
  97. #
  98. # for sig in [signal.SIGINT, signal.SIGTERM]:
  99. # loop.add_signal_handler(sig, shutdown_handler)
  100. #
  101. # loop.run_until_complete(consumer.run(handle_message))