Forráskód Böngészése

Update scripts/mq_sender

StrayWarrior 3 hete
szülő
commit
d46330c839
1 módosított fájl, 43 hozzáadás és 0 törlés
  1. 43 0
      scripts/mq_sender.py

+ 43 - 0
scripts/mq_sender.py

@@ -0,0 +1,43 @@
+import os
+import sys
+sys.path.append(os.curdir)
+import configs
+from message import Message, MessageType, MessageChannel
+from message_queue_backend import AliyunRocketMQQueueBackend
+import time
+from argparse import ArgumentParser
+
+if __name__ == '__main__':
+    parser = ArgumentParser(description="Send messages to Aliyun RocketMQ")
+    parser.add_argument('--staff-id', type=str, help='Staff ID, as receiver')
+    parser.add_argument('--user-id', type=str, help='User ID, as sender')
+    args = parser.parse_args()
+
+    config = configs.get()
+
+    use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
+    receive_queue = AliyunRocketMQQueueBackend(
+        config['mq']['endpoints'],
+        config['mq']['instance_id'],
+        config['mq']['receive_topic'],
+        has_consumer=False, has_producer=True
+    )
+
+    message_id = 0
+    while True:
+        print("Input next message: ")
+        text = sys.stdin.readline().strip()
+        if not text:
+            continue
+        message_id += 1
+        sender = args.user_id
+        receiver = args.staff_id
+        if text == MessageType.AGGREGATION_TRIGGER.name:
+            message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
+                                    sender, receiver, None, int(time.time() * 1000))
+        else:
+            message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
+                                    sender,receiver, text, int(time.time() * 1000)
+                                    )
+        message.msgId = message_id
+        receive_queue.produce(message)