mq_sender.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import sys
  2. from pqai_agent import configs
  3. from pqai_agent.mq_message import MqMessage, MessageType, MessageChannel
  4. from pqai_agent.message_queue_backend import AliyunRocketMQQueueBackend
  5. import time
  6. from argparse import ArgumentParser
  7. if __name__ == '__main__':
  8. parser = ArgumentParser(description="Send messages to Aliyun RocketMQ")
  9. parser.add_argument('--staff-id', type=str, help='Staff ID, as receiver')
  10. parser.add_argument('--user-id', type=str, help='User ID, as sender')
  11. args = parser.parse_args()
  12. config = configs.get()
  13. use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
  14. receive_queue = AliyunRocketMQQueueBackend(
  15. config['mq']['endpoints'],
  16. config['mq']['instance_id'],
  17. config['mq']['receive_topic'],
  18. has_consumer=False, has_producer=True,
  19. topic_type='FIFO'
  20. )
  21. message_id = 0
  22. while True:
  23. print("Input next message: ")
  24. text = sys.stdin.readline().strip()
  25. if not text:
  26. continue
  27. message_id += 1
  28. sender = args.user_id
  29. receiver = args.staff_id
  30. if text in (MessageType.AGGREGATION_TRIGGER.name,
  31. MessageType.HUMAN_INTERVENTION_END.name):
  32. message = MqMessage.build(
  33. MessageType.__members__.get(text),
  34. MessageChannel.CORP_WECHAT,
  35. sender, receiver, None, int(time.time() * 1000))
  36. else:
  37. message = MqMessage.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  38. sender, receiver, text, int(time.time() * 1000)
  39. )
  40. message.msgId = message_id
  41. receive_queue.produce(message)