123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # import asyncio
- # import logging
- # from typing import Callable, Awaitable
- #
- # from rocketmq.client import PushConsumer, Message
- # from core.utils.log.logger_manager import LoggerManager
- #
- # logger = LoggerManager.get_logger()
- # aliyun_logger = LoggerManager.get_aliyun_logger()
- #
- #
- # class RocketMQAsyncPullConsumer:
- # def __init__(self, topic: str, group_id: str, max_workers: int = 5):
- # """
- # RocketMQ TCP SDK 异步拉取消费者封装
- # - 拉取消息后执行任务,执行完成再确认 ack
- # - 使用 asyncio + Semaphore 控制并发执行
- # """
- # self.topic = topic
- # self.group_id = group_id
- # self.max_workers = max_workers
- # self.semaphore = asyncio.Semaphore(max_workers)
- # self.shutdown_event = asyncio.Event()
- #
- # # 初始化 RocketMQ Pull Consumer
- # self.consumer = PushConsumer(group_id)
- # self.consumer.set_name_server_address('')
- # self.consumer.subscribe(topic)
- #
- # async def process_message(self, msg: Message, handler: Callable[[Message], Awaitable[None]]):
- # async with self.semaphore:
- # try:
- # logger.info(f"[{self.topic}] 收到消息: {msg.id}")
- # await handler(msg)
- # msg.ack()
- # logger.info(f"[{self.topic}] 消息 {msg.id} 执行完毕并已确认 ack")
- # except Exception as e:
- # logger.error(f"[{self.topic}] 消息处理失败: {e}", exc_info=True)
- # aliyun_logger.logging(
- # code="9001",
- # message=f"[{self.topic}] 消息处理失败: {e}",
- # data={"msg_id": msg.id, "body": msg.body.decode()},
- # )
- #
- # async def _consume_loop(self, handler: Callable[[Message], Awaitable[None]]):
- # loop = asyncio.get_running_loop()
- #
- # def callback(msg: Message):
- # asyncio.run_coroutine_threadsafe(self.process_message(msg, handler), loop)
- # return None
- #
- # self.consumer.register_message_listener(callback)
- # self.consumer.start()
- #
- # logger.info(f"[{self.topic}] RocketMQ PullConsumer 已启动,开始消费消息...")
- # aliyun_logger.logging(
- # code="1500",
- # message=f"[{self.topic}] RocketMQ PullConsumer 已启动",
- # )
- #
- # await self.shutdown_event.wait()
- #
- # self.consumer.shutdown()
- # logger.info(f"[{self.topic}] PullConsumer 已优雅退出")
- # aliyun_logger.logging(
- # code="1602",
- # message=f"[{self.topic}] PullConsumer 已优雅退出",
- # )
- #
- # def stop(self):
- # """外部调用以优雅退出"""
- # self.shutdown_event.set()
- #
- # async def run(self, handler: Callable[[Message], Awaitable[None]]):
- # await self._consume_loop(handler)
- #
- #
- # # === 使用示例 ===
- # if __name__ == '__main__':
- # import signal
- #
- # async def handle_message(msg: Message):
- # logger.info(f"处理消息内容: {msg.body.decode()}")
- # await asyncio.sleep(2) # 模拟执行耗时任务
- #
- # consumer = RocketMQAsyncPullConsumer(
- # topic="ynfqmm_recommend_prod",
- # group_id="ynfqmm_recommend_prod",
- # max_workers=5
- # )
- #
- # loop = asyncio.get_event_loop()
- #
- # def shutdown_handler():
- # logger.warning("收到退出信号,准备优雅关闭消费者...")
- # consumer.stop()
- #
- # for sig in [signal.SIGINT, signal.SIGTERM]:
- # loop.add_signal_handler(sig, shutdown_handler)
- #
- # loop.run_until_complete(consumer.run(handle_message))
|