1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- import json
- import asyncio
- import traceback
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.mq_producer import TopicMessage
- from mq_http_sdk.mq_client import MQClient
- from config import settings
- from core.utils.log.logger_manager import LoggerManager
- class AsyncMQProducer:
- """
- 异步 MQ 推送封装类
- """
- instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
- def __init__(self, topic_name: str, platform: str, mode: str) -> None:
- self.mq_client = MQClient(
- host=settings.ROCKETMQ_ENDPOINT,
- access_id=settings.ROCKETMQ_ACCESS_KEY_ID,
- access_key=settings.ROCKETMQ_ACCESS_KEY_SECRET,
- )
- self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
- self.platform = platform
- self.mode = mode
- self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform,mode=mode)
- self.loger = LoggerManager.get_logger(platform=platform,mode=mode)
- async def send_msg(self, video_dict: dict, max_retries: int = 3):
- """
- 异步发送 MQ 消息,自动重试并记录日志
- """
- for retry in range(max_retries):
- try:
- # 放入线程池中执行同步 publish_message,避免阻塞事件循环
- msg = TopicMessage(json.dumps(video_dict))
- message_key = "{}-{}-{}".format(self.platform, self.mode, video_dict['out_video_id'])
- msg.set_message_key(message_key)
- loop = asyncio.get_running_loop()
- re_msg = await loop.run_in_executor(
- None,
- lambda: self.producer.publish_message(msg)
- )
- self.loger.info(
- f"Publish Message Succeed. MessageID:{re_msg.message_id}, BodyMD5:{re_msg.message_body_md5}"
- )
- return True
- except MQExceptionBase as e:
- tb = traceback.format_exc()
- if retry == max_retries - 1:
- self.loger.error(
- f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}"
- )
- self.aliyun_log.logging(
- code="5005",
- message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}",
- data=tb
- )
- else:
- await asyncio.sleep(2 ** retry) # 退避重试
- return False
|