import json import asyncio import traceback from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import TopicMessage from mq_http_sdk.mq_client import MQClient from config import settings from core.utils.log.logger_manager import LoggerManager class AsyncMQProducer: """ 异步 MQ 推送封装类 """ instance_id = "MQ_INST_1894469520484605_BXhXuzkZ" def __init__(self, topic_name: str, platform: str, mode: str) -> None: self.mq_client = MQClient( host=settings.ROCKETMQ_ENDPOINT, access_id=settings.ROCKETMQ_ACCESS_KEY_ID, access_key=settings.ROCKETMQ_ACCESS_KEY_SECRET, ) self.producer = self.mq_client.get_producer(self.instance_id, topic_name) self.platform = platform self.mode = mode self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform,mode=mode) self.loger = LoggerManager.get_logger(platform=platform,mode=mode) async def send_msg(self, video_dict: dict, max_retries: int = 3): """ 异步发送 MQ 消息,自动重试并记录日志 """ for retry in range(max_retries): try: # 放入线程池中执行同步 publish_message,避免阻塞事件循环 msg = TopicMessage(json.dumps(video_dict)) message_key = "{}-{}-{}".format(self.platform, self.mode, video_dict['out_video_id']) msg.set_message_key(message_key) loop = asyncio.get_running_loop() re_msg = await loop.run_in_executor( None, lambda: self.producer.publish_message(msg) ) self.loger.info( f"Publish Message Succeed. MessageID:{re_msg.message_id}, BodyMD5:{re_msg.message_body_md5}" ) return True except MQExceptionBase as e: tb = traceback.format_exc() if retry == max_retries - 1: self.loger.error( f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}" ) self.aliyun_log.logging( code="5005", message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}", data=tb ) else: await asyncio.sleep(2 ** retry) # 退避重试 return False