Parcourir la source

发送etl增加三次重试

zhangliang il y a 3 semaines
Parent
commit
0e00cd4113
1 fichiers modifiés avec 26 ajouts et 11 suppressions
  1. 26 11
      application/common/messageQueue/mq.py

+ 26 - 11
application/common/messageQueue/mq.py

@@ -2,8 +2,9 @@ import json
 from mq_http_sdk.mq_exception import MQExceptionBase
 from mq_http_sdk.mq_producer import TopicMessage
 from mq_http_sdk.mq_client import MQClient
-
+import traceback
 from application.common.log import Local
+from application.common.log import AliyunLogger
 
 
 class MQ(object):
@@ -18,20 +19,34 @@ class MQ(object):
                                   "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
         topic_name = topic_name+"_v2"
         self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
+        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
 
-    def send_msg(self, video_dict):
+    def send_msg(self, video_dict, max_retries = 3):
         """
         发送 mq,并且记录 redis
         :param video_dict:
         """
         strategy = video_dict["strategy"]
         platform = video_dict["platform"]
-        try:
-            msg = TopicMessage(json.dumps(video_dict))
-            message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
-            msg.set_message_key(message_key)
-            re_msg = self.producer.publish_message(msg)
-            Local.logger(platform,strategy).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
-                                                  (re_msg.message_id, re_msg.message_body_md5))
-        except MQExceptionBase as e:
-            Local.logger(platform,strategy).error("Publish Message Fail. Exception:%s\n" % e)
+        self.aliyun_log = AliyunLogger(mode=strategy, platform=platform)
+        for retry in range(max_retries):
+            try:
+                msg = TopicMessage(json.dumps(video_dict))
+                message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
+                msg.set_message_key(message_key)
+                re_msg = self.producer.publish_message(msg)
+                Local.logger(platform,strategy).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
+                                                      (re_msg.message_id, re_msg.message_body_md5))
+                return
+            except MQExceptionBase as e:
+                tb = traceback.format_exc()
+                # 如果是最后一次重试失败,记录日志
+                if retry == max_retries - 1:
+                    Local.logger(platform, strategy).error(
+                        f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}"
+                    )
+                    self.aliyun_log.logging(
+                        code="5005",
+                        message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}",
+                        data= tb
+                    )