import json from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import TopicMessage from mq_http_sdk.mq_client import MQClient import traceback from application.common.log import Local from application.common.log import AliyunLogger class MQ(object): """ MQ Class """ instance_id = "MQ_INST_1894469520484605_BXhXuzkZ" def __init__(self, topic_name) -> None: self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com", "LTAI4G7puhXtLyHzHQpD6H7A", "nEbq3xWNQd1qLpdy2u71qFweHkZjSG") topic_name = topic_name+"_v2" self.producer = self.mq_client.get_producer(self.instance_id, topic_name) def send_msg(self, video_dict, max_retries = 3): """ 发送 mq,并且记录 redis :param video_dict: """ strategy = video_dict["strategy"] platform = video_dict["platform"] self.aliyun_log = AliyunLogger(mode=strategy, platform=platform) for retry in range(max_retries): try: msg = TopicMessage(json.dumps(video_dict)) message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id']) msg.set_message_key(message_key) re_msg = self.producer.publish_message(msg) Local.logger(platform,strategy).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" % (re_msg.message_id, re_msg.message_body_md5)) return except MQExceptionBase as e: tb = traceback.format_exc() # 如果是最后一次重试失败,记录日志 if retry == max_retries - 1: Local.logger(platform, strategy).error( f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}" ) self.aliyun_log.logging( code="5005", message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}", data= tb )