Browse Source

Update agent_service: multiple process threads

StrayWarrior 4 weeks ago
parent
commit
da5a1b0878
1 changed files with 38 additions and 10 deletions
  1. 38 10
      pqai_agent/agent_service.py

+ 38 - 10
pqai_agent/agent_service.py

@@ -80,7 +80,7 @@ class AgentService:
         self.scheduler_queue = None
         self.scheduler_queue = None
         self.msg_scheduler_thread = None
         self.msg_scheduler_thread = None
         self.running = False
         self.running = False
-        self.process_thread = None
+        self.process_threads = []
         self._sigint_cnt = 0
         self._sigint_cnt = 0
 
 
         # Push相关
         # Push相关
@@ -148,27 +148,53 @@ class AgentService:
         agent.refresh_profile()
         agent.refresh_profile()
         return agent
         return agent
 
 
+    def create_queue_consumer(self) -> MessageQueueBackend:
+        # 只有在MQ模式下才需要创建多消费者
+        if not self.config.get('debug_flags', {}).get('use_aliyun_mq', False):
+            logger.warning("Do not create queue consumer in local mode")
+            return self.receive_queue
+        mq_config = self.config['mq']
+        consumer = AliyunRocketMQQueueBackend(
+            endpoints=mq_config['endpoints'],
+            instance_id=mq_config['instance_id'],
+            topic=mq_config['receive_topic'],
+            has_consumer=True,
+            has_producer=False,
+            group_id=mq_config['receive_group'],
+            topic_type='FIFO',
+            await_duration=10
+        )
+        return consumer
+
     def process_messages(self):
     def process_messages(self):
-        """持续处理接收队列消息"""
+        """持续处理接收队列消息,通过顺序消息的消息组保证同一<用户, 客服>的消费保序,可并发处理"""
+        receive_queue = self.create_queue_consumer()
+        # 消费者创建后等一会儿再开始消费,否则可能远端没准备好会报错
+        time.sleep(1)
         while self.running:
         while self.running:
-            message = self.receive_queue.consume()
+            message = receive_queue.consume()
             if message:
             if message:
                 try:
                 try:
                     self.process_single_message(message)
                     self.process_single_message(message)
-                    self.receive_queue.ack(message)
+                    receive_queue.ack(message)
                 except NoRetryException as e:
                 except NoRetryException as e:
                     logger.error("Error processing message and skip retry: {}".format(e))
                     logger.error("Error processing message and skip retry: {}".format(e))
-                    self.receive_queue.ack(message)
+                    receive_queue.ack(message)
                 except Exception as e:
                 except Exception as e:
                     error_stack = traceback.format_exc()
                     error_stack = traceback.format_exc()
                     logger.error("Error processing message: {}, {}".format(e, error_stack))
                     logger.error("Error processing message: {}, {}".format(e, error_stack))
-            time.sleep(0.5)
+            time.sleep(0.1)
+        receive_queue.shutdown()
         logger.info("Message processing thread exit")
         logger.info("Message processing thread exit")
 
 
     def start(self, blocking=False):
     def start(self, blocking=False):
         self.running = True
         self.running = True
-        self.process_thread = threading.Thread(target=self.process_messages)
-        self.process_thread.start()
+        max_reply_workers = self.config.get('system', {}).get('max_reply_workers', 1)
+        self.process_threads = []
+        for i in range(max_reply_workers):
+            thread = threading.Thread(target=self.process_messages)
+            thread.start()
+            self.process_threads.append(thread)
         self.setup_scheduler()
         self.setup_scheduler()
         # 只有企微场景需要主动发起
         # 只有企微场景需要主动发起
         if not self.config['debug_flags'].get('disable_active_conversation', False):
         if not self.config['debug_flags'].get('disable_active_conversation', False):
@@ -176,7 +202,8 @@ class AgentService:
             self.setup_initiative_conversations(schedule_param)
             self.setup_initiative_conversations(schedule_param)
         signal.signal(signal.SIGINT, self._handle_sigint)
         signal.signal(signal.SIGINT, self._handle_sigint)
         if blocking:
         if blocking:
-            self.process_thread.join()
+            for thread in self.process_threads:
+                thread.join()
 
 
     def shutdown(self, sync=True):
     def shutdown(self, sync=True):
         if not self.running:
         if not self.running:
@@ -184,7 +211,8 @@ class AgentService:
         self.running = False
         self.running = False
         self.scheduler.shutdown()
         self.scheduler.shutdown()
         if sync:
         if sync:
-            self.process_thread.join()
+            for thread in self.process_threads:
+                thread.join()
             self.receive_queue.shutdown()
             self.receive_queue.shutdown()
             self.send_queue.shutdown()
             self.send_queue.shutdown()
             if self.msg_scheduler_thread:
             if self.msg_scheduler_thread: