mq.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import json
  2. from mq_http_sdk.mq_exception import MQExceptionBase
  3. from mq_http_sdk.mq_producer import TopicMessage
  4. from mq_http_sdk.mq_client import MQClient
  5. import traceback
  6. from application.common.log import Local
  7. from application.common.log import AliyunLogger
  8. class MQ(object):
  9. """
  10. MQ Class
  11. """
  12. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  13. def __init__(self, topic_name) -> None:
  14. self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  15. "LTAI4G7puhXtLyHzHQpD6H7A",
  16. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
  17. topic_name = topic_name+"_v2"
  18. self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
  19. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  20. def send_msg(self, video_dict, max_retries = 3):
  21. """
  22. 发送 mq,并且记录 redis
  23. :param video_dict:
  24. """
  25. strategy = video_dict["strategy"]
  26. platform = video_dict["platform"]
  27. self.aliyun_log = AliyunLogger(mode=strategy, platform=platform)
  28. for retry in range(max_retries):
  29. try:
  30. msg = TopicMessage(json.dumps(video_dict))
  31. message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
  32. msg.set_message_key(message_key)
  33. re_msg = self.producer.publish_message(msg)
  34. Local.logger(platform,strategy).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
  35. (re_msg.message_id, re_msg.message_body_md5))
  36. return
  37. except MQExceptionBase as e:
  38. tb = traceback.format_exc()
  39. # 如果是最后一次重试失败,记录日志
  40. if retry == max_retries - 1:
  41. Local.logger(platform, strategy).error(
  42. f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}"
  43. )
  44. self.aliyun_log.logging(
  45. code="5005",
  46. message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}",
  47. data= tb
  48. )