async_mq_producer.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import json
  2. import asyncio
  3. import traceback
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. from mq_http_sdk.mq_producer import TopicMessage
  6. from mq_http_sdk.mq_client import MQClient
  7. from config import settings
  8. from core.utils.log.logger_manager import LoggerManager
  9. class AsyncMQProducer:
  10. """
  11. 异步 MQ 推送封装类
  12. """
  13. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  14. def __init__(self, topic_name: str, platform: str, mode: str) -> None:
  15. self.mq_client = MQClient(
  16. host=settings.ROCKETMQ_ENDPOINT,
  17. access_id=settings.ROCKETMQ_ACCESS_KEY_ID,
  18. access_key=settings.ROCKETMQ_ACCESS_KEY_SECRET,
  19. )
  20. self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
  21. self.platform = platform
  22. self.mode = mode
  23. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform,mode=mode)
  24. self.loger = LoggerManager.get_logger(platform=platform,mode=mode)
  25. async def send_msg(self, video_dict: dict, max_retries: int = 3):
  26. """
  27. 异步发送 MQ 消息,自动重试并记录日志
  28. """
  29. for retry in range(max_retries):
  30. try:
  31. # 放入线程池中执行同步 publish_message,避免阻塞事件循环
  32. msg = TopicMessage(json.dumps(video_dict))
  33. message_key = "{}-{}-{}".format(self.platform, self.mode, video_dict['out_video_id'])
  34. msg.set_message_key(message_key)
  35. loop = asyncio.get_running_loop()
  36. re_msg = await loop.run_in_executor(
  37. None,
  38. lambda: self.producer.publish_message(msg)
  39. )
  40. self.loger.info(
  41. f"Publish Message Succeed. MessageID:{re_msg.message_id}, BodyMD5:{re_msg.message_body_md5}"
  42. )
  43. return True
  44. except MQExceptionBase as e:
  45. tb = traceback.format_exc()
  46. if retry == max_retries - 1:
  47. self.loger.error(
  48. f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}"
  49. )
  50. self.aliyun_log.logging(
  51. code="5005",
  52. message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}",
  53. data=tb
  54. )
  55. else:
  56. await asyncio.sleep(2 ** retry) # 退避重试
  57. return False