Prechádzať zdrojové kódy

Update message_queue_backend: support different message group for sequence message

StrayWarrior 3 týždňov pred
rodič
commit
fba247eab3
1 zmenil súbory, kde vykonal 6 pridanie a 4 odobranie
  1. 6 4
      pqai_agent/message_queue_backend.py

+ 6 - 4
pqai_agent/message_queue_backend.py

@@ -25,7 +25,7 @@ class MessageQueueBackend(abc.ABC):
         pass
 
     @abc.abstractmethod
-    def produce(self, message: Message) -> None:
+    def produce(self, message: Message, msg_group: Optional[str] = None) -> None:
         pass
 
     @abc.abstractmethod
@@ -43,7 +43,7 @@ class MemoryQueueBackend(MessageQueueBackend):
     def ack(self, message: Message):
         return
 
-    def produce(self, message: Message):
+    def produce(self, message: Message, msg_group: Optional[str] = None):
         self._queue.append(message)
 
     def shutdown(self):
@@ -104,7 +104,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
         logger.debug("[{}]ack message: {}".format(self.topic, message))
         self.consumer.ack(message._rmq_message)
 
-    def produce(self, message: Message) -> None:
+    def produce(self, message: Message, msg_group: Optional[str] = None) -> None:
         if not self.has_producer:
             raise Exception("Producer not initialized.")
         message.model_config['use_enum_values'] = False
@@ -114,7 +114,9 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
         rmq_message.body = json_str.encode('utf-8')
         if self.topic_type == 'FIFO':
             # 顺序消息队列必须指定消息组
-            rmq_message.message_group = "agent_system"
+            if msg_group is None:
+                msg_group = f"private:{message.sender}:{message.receiver}"
+            rmq_message.message_group = msg_group
         elif self.topic_type == 'DELAY':
             # 延时消息队列必须指定投递时间(秒)
             rmq_message.delivery_timestamp = int(message.sendTime / 1000)