Procházet zdrojové kódy

Update agent_service: support graceful shutdown

StrayWarrior před 1 týdnem
rodič
revize
ad11bffc20
1 změnil soubory, kde provedl 35 přidání a 6 odebrání
  1. 35 6
      agent_service.py

+ 35 - 6
agent_service.py

@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
 import re
+import signal
 import sys
 import time
 import random
@@ -70,8 +71,10 @@ class AgentService:
 
         # 定时任务调度器
         self.scheduler = BackgroundScheduler()
-        self.scheduler.start()
         self.limit_initiative_conversation_rate = True
+        self.running = False
+        self.process_thread = None
+        self._sigint_cnt = 0
 
     def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None):
         if not schedule_params:
@@ -91,7 +94,7 @@ class AgentService:
 
     def process_messages(self):
         """持续处理接收队列消息"""
-        while True:
+        while self.running:
             message = self.receive_queue.consume()
             if message:
                 try:
@@ -101,6 +104,33 @@ class AgentService:
                     logger.error("Error processing message: {}".format(e))
                     traceback.print_exc()
             time.sleep(1)
+        logger.info("Message processing thread exit")
+
+    def start(self, blocking=False):
+        self.running = True
+        self.process_thread = threading.Thread(target=service.process_messages)
+        self.process_thread.start()
+        self.scheduler.start()
+        signal.signal(signal.SIGINT, self._handle_sigint)
+        if blocking:
+            self.process_thread.join()
+
+    def shutdown(self, sync=True):
+        if not self.running:
+            raise Exception("Service is not running")
+        self.running = False
+        self.scheduler.shutdown()
+        if sync:
+            self.process_thread.join()
+
+    def _handle_sigint(self, signum, frame):
+        self._sigint_cnt += 1
+        if self._sigint_cnt == 1:
+            logger.warning("Try to shutdown gracefully...")
+            self.shutdown(sync=True)
+        else:
+            logger.warning("Forcing exit")
+            sys.exit(0)
 
     def _update_user_profile(self, user_id, user_profile, recent_dialogue: List[Dict]):
         profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, recent_dialogue)
@@ -322,12 +352,12 @@ if __name__ == "__main__":
         schedule_param = config['agent_behavior'].get('schedule_param', None)
         service.setup_initiative_conversations(schedule_param)
 
-    process_thread = threading.Thread(target=service.process_messages)
-    process_thread.start()
 
     if not config['debug_flags'].get('console_input', False):
-        process_thread.join()
+        service.start(blocking=True)
         sys.exit(0)
+    else:
+        service.start()
 
     message_id = 0
     while True:
@@ -348,4 +378,3 @@ if __name__ == "__main__":
         message.msgId = message_id
         receive_queue.produce(message)
         time.sleep(0.1)
-    process_thread.join()