# import asyncio # import logging # from typing import Callable, Awaitable # # from rocketmq.client import PushConsumer, Message # from core.utils.log.logger_manager import LoggerManager # # logger = LoggerManager.get_logger() # aliyun_logger = LoggerManager.get_aliyun_logger() # # # class RocketMQAsyncPullConsumer: # def __init__(self, topic: str, group_id: str, max_workers: int = 5): # """ # RocketMQ TCP SDK 异步拉取消费者封装 # - 拉取消息后执行任务,执行完成再确认 ack # - 使用 asyncio + Semaphore 控制并发执行 # """ # self.topic = topic # self.group_id = group_id # self.max_workers = max_workers # self.semaphore = asyncio.Semaphore(max_workers) # self.shutdown_event = asyncio.Event() # # # 初始化 RocketMQ Pull Consumer # self.consumer = PushConsumer(group_id) # self.consumer.set_name_server_address('') # self.consumer.subscribe(topic) # # async def process_message(self, msg: Message, handler: Callable[[Message], Awaitable[None]]): # async with self.semaphore: # try: # logger.info(f"[{self.topic}] 收到消息: {msg.id}") # await handler(msg) # msg.ack() # logger.info(f"[{self.topic}] 消息 {msg.id} 执行完毕并已确认 ack") # except Exception as e: # logger.error(f"[{self.topic}] 消息处理失败: {e}", exc_info=True) # aliyun_logger.logging( # code="9001", # message=f"[{self.topic}] 消息处理失败: {e}", # data={"msg_id": msg.id, "body": msg.body.decode()}, # ) # # async def _consume_loop(self, handler: Callable[[Message], Awaitable[None]]): # loop = asyncio.get_running_loop() # # def callback(msg: Message): # asyncio.run_coroutine_threadsafe(self.process_message(msg, handler), loop) # return None # # self.consumer.register_message_listener(callback) # self.consumer.start() # # logger.info(f"[{self.topic}] RocketMQ PullConsumer 已启动,开始消费消息...") # aliyun_logger.logging( # code="1500", # message=f"[{self.topic}] RocketMQ PullConsumer 已启动", # ) # # await self.shutdown_event.wait() # # self.consumer.shutdown() # logger.info(f"[{self.topic}] PullConsumer 已优雅退出") # aliyun_logger.logging( # code="1602", # message=f"[{self.topic}] PullConsumer 已优雅退出", # ) # # def stop(self): # """外部调用以优雅退出""" # self.shutdown_event.set() # # async def run(self, handler: Callable[[Message], Awaitable[None]]): # await self._consume_loop(handler) # # # # === 使用示例 === # if __name__ == '__main__': # import signal # # async def handle_message(msg: Message): # logger.info(f"处理消息内容: {msg.body.decode()}") # await asyncio.sleep(2) # 模拟执行耗时任务 # # consumer = RocketMQAsyncPullConsumer( # topic="ynfqmm_recommend_prod", # group_id="ynfqmm_recommend_prod", # max_workers=5 # ) # # loop = asyncio.get_event_loop() # # def shutdown_handler(): # logger.warning("收到退出信号,准备优雅关闭消费者...") # consumer.stop() # # for sig in [signal.SIGINT, signal.SIGTERM]: # loop.add_signal_handler(sig, shutdown_handler) # # loop.run_until_complete(consumer.run(handle_message))