mq_sender.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import os
  2. import sys
  3. sys.path.append(os.curdir)
  4. import configs
  5. from message import Message, MessageType, MessageChannel
  6. from message_queue_backend import AliyunRocketMQQueueBackend
  7. import time
  8. from argparse import ArgumentParser
  9. if __name__ == '__main__':
  10. parser = ArgumentParser(description="Send messages to Aliyun RocketMQ")
  11. parser.add_argument('--staff-id', type=str, help='Staff ID, as receiver')
  12. parser.add_argument('--user-id', type=str, help='User ID, as sender')
  13. args = parser.parse_args()
  14. config = configs.get()
  15. use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
  16. receive_queue = AliyunRocketMQQueueBackend(
  17. config['mq']['endpoints'],
  18. config['mq']['instance_id'],
  19. config['mq']['receive_topic'],
  20. has_consumer=False, has_producer=True
  21. )
  22. message_id = 0
  23. while True:
  24. print("Input next message: ")
  25. text = sys.stdin.readline().strip()
  26. if not text:
  27. continue
  28. message_id += 1
  29. sender = args.user_id
  30. receiver = args.staff_id
  31. if text == MessageType.AGGREGATION_TRIGGER.name:
  32. message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
  33. sender, receiver, None, int(time.time() * 1000))
  34. else:
  35. message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  36. sender,receiver, text, int(time.time() * 1000)
  37. )
  38. message.msgId = message_id
  39. receive_queue.produce(message)