mq_sender.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import os
  2. import sys
  3. sys.path.append(os.curdir)
  4. import configs
  5. from message import Message, MessageType, MessageChannel
  6. from message_queue_backend import AliyunRocketMQQueueBackend
  7. import time
  8. from argparse import ArgumentParser
  9. if __name__ == '__main__':
  10. parser = ArgumentParser(description="Send messages to Aliyun RocketMQ")
  11. parser.add_argument('--staff-id', type=str, help='Staff ID, as receiver')
  12. parser.add_argument('--user-id', type=str, help='User ID, as sender')
  13. args = parser.parse_args()
  14. config = configs.get()
  15. use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
  16. receive_queue = AliyunRocketMQQueueBackend(
  17. config['mq']['endpoints'],
  18. config['mq']['instance_id'],
  19. config['mq']['receive_topic'],
  20. has_consumer=False, has_producer=True,
  21. topic_type='FIFO'
  22. )
  23. message_id = 0
  24. while True:
  25. print("Input next message: ")
  26. text = sys.stdin.readline().strip()
  27. if not text:
  28. continue
  29. message_id += 1
  30. sender = args.user_id
  31. receiver = args.staff_id
  32. if text == MessageType.AGGREGATION_TRIGGER.name:
  33. message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
  34. sender, receiver, None, int(time.time() * 1000))
  35. else:
  36. message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  37. sender,receiver, text, int(time.time() * 1000)
  38. )
  39. message.msgId = message_id
  40. receive_queue.produce(message)