mq.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import json
  2. from mq_http_sdk.mq_exception import MQExceptionBase
  3. from mq_http_sdk.mq_producer import TopicMessage
  4. from mq_http_sdk.mq_client import MQClient
  5. from application.common.log import Local
  6. class MQ(object):
  7. """
  8. MQ Class
  9. """
  10. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  11. def __init__(self, topic_name) -> None:
  12. self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  13. "LTAI4G7puhXtLyHzHQpD6H7A",
  14. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
  15. topic_name = topic_name+"_v2"
  16. self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
  17. def send_msg(self, video_dict):
  18. """
  19. 发送 mq,并且记录 redis
  20. :param video_dict:
  21. """
  22. strategy = video_dict["strategy"]
  23. platform = video_dict["platform"]
  24. try:
  25. msg = TopicMessage(json.dumps(video_dict))
  26. message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
  27. msg.set_message_key(message_key)
  28. re_msg = self.producer.publish_message(msg)
  29. Local.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
  30. (re_msg.message_id, re_msg.message_body_md5))
  31. except MQExceptionBase as e:
  32. Local.logger(strategy, platform).error("Publish Message Fail. Exception:%s\n" % e)