Pārlūkot izejas kodu

Update agent_service: use aliyun mq

StrayWarrior 2 nedēļas atpakaļ
vecāks
revīzija
4afc65d3ba
1 mainītis faili ar 32 papildinājumiem un 7 dzēšanām
  1. 32 7
      agent_service.py

+ 32 - 7
agent_service.py

@@ -12,13 +12,14 @@ import apscheduler.triggers.cron
 from apscheduler.schedulers.background import BackgroundScheduler
 
 import chat_service
+import configs
 import global_flags
 import logging_service
 from chat_service import CozeChat, ChatServiceType
 from dialogue_manager import DialogueManager, DialogueState
 from user_manager import UserManager, LocalUserManager
 from openai import OpenAI
-from message_queue_backend import MessageQueueBackend, MemoryQueueBackend
+from message_queue_backend import MessageQueueBackend, MemoryQueueBackend, AliyunRocketMQQueueBackend
 from user_profile_extractor import UserProfileExtractor
 import threading
 from message import MessageType, Message, MessageChannel
@@ -80,8 +81,13 @@ class AgentService:
         while True:
             message = self.receive_queue.consume()
             if message:
-                self.process_single_message(message)
-            time.sleep(1)  # 避免CPU空转
+                try:
+                    self.process_single_message(message)
+                except Exception as e:
+                    logging.error("Error processing message: {}".format(e))
+                # 无论处理是否有异常,都ACK消息
+                self.receive_queue.ack(message)
+            time.sleep(1)
 
     def _update_user_profile(self, user_id, user_profile, message: str):
         profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, message)
@@ -97,7 +103,8 @@ class AgentService:
         logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec))
         message_ts = int((time.time() + delay_sec) * 1000)
         message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, user_id, staff_id, None, message_ts)
-        message.msgId = -MessageType.AGGREGATION_TRIGGER.code
+        # 系统消息使用特定的msgId,无实际意义
+        message.msgId = -MessageType.AGGREGATION_TRIGGER.value
         self.scheduler.add_job(lambda: self.receive_queue.produce(message),
                                'date',
                                run_date=datetime.now() + timedelta(seconds=delay_sec))
@@ -188,13 +195,31 @@ class AgentService:
         return response
 
 if __name__ == "__main__":
+    config = configs.get()
     logging_service.setup_root_logger()
     scheduler_logger = logging.getLogger('apscheduler')
     scheduler_logger.setLevel(logging.WARNING)
 
+    use_aliyun_mq = config['use_aliyun_mq']
+
     # 初始化不同队列的后端
-    receive_queue = MemoryQueueBackend()
-    send_queue = MemoryQueueBackend()
+    if use_aliyun_mq:
+        receive_queue = AliyunRocketMQQueueBackend(
+            config['mq']['endpoints'],
+            config['mq']['instance_id'],
+            config['mq']['receive_topic'],
+            has_consumer=True, has_producer=True,
+            group_id=config['mq']['receive_group']
+        )
+        send_queue = AliyunRocketMQQueueBackend(
+            config['mq']['endpoints'],
+            config['mq']['instance_id'],
+            config['mq']['send_topic'],
+            has_consumer=False, has_producer=True
+        )
+    else:
+        receive_queue = MemoryQueueBackend()
+        send_queue = MemoryQueueBackend()
     human_queue = MemoryQueueBackend()
 
     # 初始化用户管理服务
@@ -211,7 +236,7 @@ if __name__ == "__main__":
         chat_service_type=ChatServiceType.COZE_CHAT
     )
     # 只有企微场景需要主动发起
-    service.setup_initiative_conversations({'second': '5,35'})
+    # service.setup_initiative_conversations({'second': '5,35'})
 
     process_thread = threading.Thread(target=service.process_messages)
     process_thread.start()