mq_sender.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import os
  2. import sys
  3. sys.path.append(os.curdir)
  4. from pqai_agent import configs
  5. from pqai_agent.message import Message, MessageType, MessageChannel
  6. from pqai_agent.message_queue_backend import AliyunRocketMQQueueBackend
  7. import time
  8. from argparse import ArgumentParser
  9. if __name__ == '__main__':
  10. parser = ArgumentParser(description="Send messages to Aliyun RocketMQ")
  11. parser.add_argument('--staff-id', type=str, help='Staff ID, as receiver')
  12. parser.add_argument('--user-id', type=str, help='User ID, as sender')
  13. args = parser.parse_args()
  14. config = configs.get()
  15. use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
  16. receive_queue = AliyunRocketMQQueueBackend(
  17. config['mq']['endpoints'],
  18. config['mq']['instance_id'],
  19. config['mq']['receive_topic'],
  20. has_consumer=False, has_producer=True,
  21. topic_type='FIFO'
  22. )
  23. message_id = 0
  24. while True:
  25. print("Input next message: ")
  26. text = sys.stdin.readline().strip()
  27. if not text:
  28. continue
  29. message_id += 1
  30. sender = args.user_id
  31. receiver = args.staff_id
  32. if text in (MessageType.AGGREGATION_TRIGGER.name,
  33. MessageType.HUMAN_INTERVENTION_END.name):
  34. message = Message.build(
  35. MessageType.__members__.get(text),
  36. MessageChannel.CORP_WECHAT,
  37. sender, receiver, None, int(time.time() * 1000))
  38. else:
  39. message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  40. sender,receiver, text, int(time.time() * 1000)
  41. )
  42. message.msgId = message_id
  43. receive_queue.produce(message)