jihuaqiang vor 2 Wochen
Ursprung
Commit
d43a29800f
4 geänderte Dateien mit 252 neuen und 6 gelöschten Zeilen
  1. 59 1
      agent.py
  2. 69 0
      cleanup_multiprocess.sh
  3. 31 5
      start_service.sh
  4. 93 0
      test_process_cleanup.py

+ 59 - 1
agent.py

@@ -15,6 +15,7 @@ import concurrent.futures
 import fcntl
 import errno
 import multiprocessing
+import signal
 from typing import Any, Dict, List, Optional, TypedDict, Annotated
 from contextlib import asynccontextmanager
 
@@ -78,6 +79,41 @@ class ExtractRequest(BaseModel):
 identify_tool = None
 # 全局线程池
 THREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=20)
+# 活跃的进程池列表
+ACTIVE_POOLS = []
+POOLS_LOCK = threading.Lock()
+
+def cleanup_all_pools():
+    """清理所有活跃的进程池"""
+    global ACTIVE_POOLS, POOLS_LOCK
+    with POOLS_LOCK:
+        logger.info(f"开始清理 {len(ACTIVE_POOLS)} 个活跃进程池...")
+        for pool in ACTIVE_POOLS:
+            try:
+                logger.info("正在终止进程池...")
+                pool.terminate()
+                pool.join(timeout=5)  # 等待5秒
+                if pool._state != 'CLOSED':
+                    logger.warning("进程池未正常关闭,强制终止")
+                    pool.kill()
+            except Exception as e:
+                logger.error(f"清理进程池时出错: {e}")
+        ACTIVE_POOLS.clear()
+        logger.info("所有进程池已清理")
+
+def signal_handler(signum, frame):
+    """信号处理器"""
+    logger.info(f"收到信号 {signum},开始清理...")
+    cleanup_all_pools()
+    # 关闭线程池
+    THREAD_POOL.shutdown(wait=False)
+    logger.info("清理完成,退出程序")
+    sys.exit(0)
+
+def register_signal_handlers():
+    """注册信号处理器"""
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
 
 def get_identify_tool():
     """惰性初始化 IdentifyTool,确保在子进程中可用"""
@@ -124,6 +160,9 @@ async def lifespan(app: FastAPI):
     # 启动时执行
     logger.info("🚀 启动 Knowledge Agent 服务...")
     
+    # 注册信号处理器
+    register_signal_handlers()
+    
     # 初始化全局工具
     global identify_tool
     identify_tool = IdentifyTool()
@@ -139,6 +178,8 @@ async def lifespan(app: FastAPI):
     
     # 关闭时执行
     logger.info("🛑 关闭 Knowledge Agent 服务...")
+    # 清理所有进程池
+    cleanup_all_pools()
     # 关闭线程池
     THREAD_POOL.shutdown(wait=False)
     logger.info("✅ 已关闭线程池")
@@ -542,9 +583,26 @@ def create_langgraph_workflow():
                 except RuntimeError:
                     pass  # 如果已经设置过,忽略错误
                 
-                with multiprocessing.Pool(processes=7) as pool:
+                pool = None
+                try:
+                    pool = multiprocessing.Pool(processes=7)
+                    with POOLS_LOCK:
+                        ACTIVE_POOLS.append(pool)
+                    
                     logger.info(f"开始多进程处理: 数量={len(process_args)}, 使用7个进程")
                     results = pool.map(process_single_item, process_args)
+                except Exception as e:
+                    logger.error(f"多进程处理异常: {e}")
+                    results = []
+                finally:
+                    if pool is not None:
+                        logger.info("正在关闭多进程池...")
+                        pool.close()
+                        pool.join()
+                        with POOLS_LOCK:
+                            if pool in ACTIVE_POOLS:
+                                ACTIVE_POOLS.remove(pool)
+                        logger.info("多进程池已关闭")
                 
                 # 恢复原始启动方法
                 try:

+ 69 - 0
cleanup_multiprocess.sh

@@ -0,0 +1,69 @@
+#!/bin/bash
+
+echo "🔍 查找所有 multiprocessing.spawn 相关进程..."
+
+# 查找所有 multiprocessing.spawn 进程
+SPAWN_PROCESSES=$(ps aux | grep "multiprocessing.spawn" | grep -v grep | awk '{print $2}')
+
+if [ -z "$SPAWN_PROCESSES" ]; then
+    echo "✅ 没有找到 multiprocessing.spawn 进程"
+else
+    echo "📋 找到以下 multiprocessing.spawn 进程:"
+    ps aux | grep "multiprocessing.spawn" | grep -v grep
+    
+    echo ""
+    echo "🛑 强制终止所有 multiprocessing.spawn 进程..."
+    for pid in $SPAWN_PROCESSES; do
+        echo "强制终止进程 $pid..."
+        kill -9 $pid 2>/dev/null
+    done
+    
+    echo "✅ multiprocessing.spawn 进程已清理"
+fi
+
+# 查找所有 multiprocessing.resource_tracker 进程
+echo ""
+echo "🔍 查找所有 multiprocessing.resource_tracker 相关进程..."
+
+TRACKER_PROCESSES=$(ps aux | grep "multiprocessing.resource_tracker" | grep -v grep | awk '{print $2}')
+
+if [ -z "$TRACKER_PROCESSES" ]; then
+    echo "✅ 没有找到 multiprocessing.resource_tracker 进程"
+else
+    echo "📋 找到以下 multiprocessing.resource_tracker 进程:"
+    ps aux | grep "multiprocessing.resource_tracker" | grep -v grep
+    
+    echo ""
+    echo "🛑 强制终止所有 multiprocessing.resource_tracker 进程..."
+    for pid in $TRACKER_PROCESSES; do
+        echo "强制终止进程 $pid..."
+        kill -9 $pid 2>/dev/null
+    done
+    
+    echo "✅ multiprocessing.resource_tracker 进程已清理"
+fi
+
+# 查找所有 python agent 相关进程
+echo ""
+echo "🔍 查找所有 python agent 相关进程..."
+
+AGENT_PROCESSES=$(ps aux | grep "python.*agent" | grep -v grep | awk '{print $2}')
+
+if [ -z "$AGENT_PROCESSES" ]; then
+    echo "✅ 没有找到 python agent 进程"
+else
+    echo "📋 找到以下 python agent 进程:"
+    ps aux | grep "python.*agent" | grep -v grep
+    
+    echo ""
+    echo "🛑 强制终止所有 python agent 进程..."
+    for pid in $AGENT_PROCESSES; do
+        echo "强制终止进程 $pid..."
+        kill -9 $pid 2>/dev/null
+    done
+    
+    echo "✅ python agent 进程已清理"
+fi
+
+echo ""
+echo "🏁 清理完成"

+ 31 - 5
start_service.sh

@@ -156,32 +156,58 @@ stop_service() {
     
     # 额外清理:查找并终止所有相关进程
     echo "🔍 检查是否有残留进程..."
-    REMAINING_PROCESSES=$(ps aux | grep -E "(python.*agent|knowledge-agent)" | grep -v grep | awk '{print $2}')
+    
+    # 查找所有可能的进程类型
+    REMAINING_PROCESSES=$(ps aux | grep -E "(python.*agent\.py|uvicorn.*agent|knowledge-agent|agent\.py|multiprocessing\.spawn|multiprocessing\.resource_tracker)" | grep -v grep | awk '{print $2}')
     
     if [ ! -z "$REMAINING_PROCESSES" ]; then
         echo "⚠️  发现残留进程,正在清理..."
+        echo "📋 找到的进程:"
+        ps aux | grep -E "(python.*agent\.py|uvicorn.*agent|knowledge-agent|agent\.py|multiprocessing\.spawn|multiprocessing\.resource_tracker)" | grep -v grep
+        
+        # 先尝试优雅终止
         for pid in $REMAINING_PROCESSES; do
-            echo "终止残留进程 $pid..."
+            echo "优雅终止进程 $pid..."
             kill -TERM $pid 2>/dev/null
         done
         
-        # 等待2
-        sleep 2
+        # 等待3
+        sleep 3
         
         # 检查是否还有残留
-        STILL_REMAINING=$(ps aux | grep -E "(python.*agent|knowledge-agent)" | grep -v grep | awk '{print $2}')
+        STILL_REMAINING=$(ps aux | grep -E "(python.*agent\.py|uvicorn.*agent|knowledge-agent|agent\.py|multiprocessing\.spawn|multiprocessing\.resource_tracker)" | grep -v grep | awk '{print $2}')
         if [ ! -z "$STILL_REMAINING" ]; then
             echo "强制终止顽固进程..."
             for pid in $STILL_REMAINING; do
                 echo "强制终止进程 $pid..."
                 kill -9 $pid 2>/dev/null
             done
+            
+            # 再次等待并检查
+            sleep 2
+            FINAL_CHECK=$(ps aux | grep -E "(python.*agent\.py|uvicorn.*agent|knowledge-agent|agent\.py|multiprocessing\.spawn|multiprocessing\.resource_tracker)" | grep -v grep | awk '{print $2}')
+            if [ ! -z "$FINAL_CHECK" ]; then
+                echo "⚠️  仍有进程无法终止,请手动检查:"
+                ps aux | grep -E "(python.*agent\.py|uvicorn.*agent|knowledge-agent|agent\.py|multiprocessing\.spawn|multiprocessing\.resource_tracker)" | grep -v grep
+            fi
         fi
         
         echo "✅ 残留进程已清理"
     else
         echo "✅ 没有发现残留进程"
     fi
+    
+    # 检查端口占用
+    echo "🔍 检查端口${PORT}占用情况..."
+    PORT_PROCESS=$(lsof -ti:${PORT} 2>/dev/null)
+    if [ ! -z "$PORT_PROCESS" ]; then
+        echo "⚠️  端口${PORT}仍被占用,进程ID: $PORT_PROCESS"
+        echo "强制终止占用端口的进程..."
+        kill -9 $PORT_PROCESS 2>/dev/null
+        echo "✅ 端口已释放"
+    else
+        echo "✅ 端口${PORT}已释放"
+    fi
 }
 
 # 重启服务

+ 93 - 0
test_process_cleanup.py

@@ -0,0 +1,93 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试多进程清理功能
+"""
+
+import os
+import sys
+import time
+import signal
+import multiprocessing
+import threading
+from contextlib import asynccontextmanager
+
+# 添加项目路径
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+# 模拟进程池跟踪
+ACTIVE_POOLS = []
+POOLS_LOCK = threading.Lock()
+
+def cleanup_all_pools():
+    """清理所有活跃的进程池"""
+    global ACTIVE_POOLS, POOLS_LOCK
+    with POOLS_LOCK:
+        print(f"开始清理 {len(ACTIVE_POOLS)} 个活跃进程池...")
+        for pool in ACTIVE_POOLS:
+            try:
+                print("正在终止进程池...")
+                pool.terminate()
+                pool.join(timeout=5)  # 等待5秒
+                if pool._state != 'CLOSED':
+                    print("进程池未正常关闭,强制终止")
+                    pool.kill()
+            except Exception as e:
+                print(f"清理进程池时出错: {e}")
+        ACTIVE_POOLS.clear()
+        print("所有进程池已清理")
+
+def signal_handler(signum, frame):
+    """信号处理器"""
+    print(f"收到信号 {signum},开始清理...")
+    cleanup_all_pools()
+    print("清理完成,退出程序")
+    sys.exit(0)
+
+def register_signal_handlers():
+    """注册信号处理器"""
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
+
+def worker_task(task_id):
+    """模拟工作任务"""
+    print(f"Worker {task_id} 开始工作...")
+    time.sleep(10)  # 模拟长时间工作
+    print(f"Worker {task_id} 完成工作")
+    return f"Task {task_id} completed"
+
+def test_multiprocess_cleanup():
+    """测试多进程清理"""
+    print("🚀 开始测试多进程清理...")
+    
+    # 注册信号处理器
+    register_signal_handlers()
+    
+    # 创建进程池
+    pool = None
+    try:
+        pool = multiprocessing.Pool(processes=3)
+        with POOLS_LOCK:
+            ACTIVE_POOLS.append(pool)
+        
+        print("开始多进程任务...")
+        tasks = [1, 2, 3, 4, 5]
+        results = pool.map(worker_task, tasks)
+        print(f"任务完成: {results}")
+        
+    except Exception as e:
+        print(f"多进程处理异常: {e}")
+    finally:
+        if pool is not None:
+            print("正在关闭多进程池...")
+            pool.close()
+            pool.join()
+            with POOLS_LOCK:
+                if pool in ACTIVE_POOLS:
+                    ACTIVE_POOLS.remove(pool)
+            print("多进程池已关闭")
+    
+    print("测试完成")
+
+if __name__ == "__main__":
+    test_multiprocess_cleanup()