#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试多进程清理功能 """ import os import sys import time import signal import multiprocessing import threading from contextlib import asynccontextmanager # 添加项目路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 模拟进程池跟踪 ACTIVE_POOLS = [] POOLS_LOCK = threading.Lock() def cleanup_all_pools(): """清理所有活跃的进程池""" global ACTIVE_POOLS, POOLS_LOCK with POOLS_LOCK: print(f"开始清理 {len(ACTIVE_POOLS)} 个活跃进程池...") for pool in ACTIVE_POOLS: try: print("正在终止进程池...") pool.terminate() pool.join(timeout=5) # 等待5秒 if pool._state != 'CLOSED': print("进程池未正常关闭,强制终止") pool.kill() except Exception as e: print(f"清理进程池时出错: {e}") ACTIVE_POOLS.clear() print("所有进程池已清理") def signal_handler(signum, frame): """信号处理器""" print(f"收到信号 {signum},开始清理...") cleanup_all_pools() print("清理完成,退出程序") sys.exit(0) def register_signal_handlers(): """注册信号处理器""" signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def worker_task(task_id): """模拟工作任务""" print(f"Worker {task_id} 开始工作...") time.sleep(10) # 模拟长时间工作 print(f"Worker {task_id} 完成工作") return f"Task {task_id} completed" def test_multiprocess_cleanup(): """测试多进程清理""" print("🚀 开始测试多进程清理...") # 注册信号处理器 register_signal_handlers() # 创建进程池 pool = None try: pool = multiprocessing.Pool(processes=3) with POOLS_LOCK: ACTIVE_POOLS.append(pool) print("开始多进程任务...") tasks = [1, 2, 3, 4, 5] results = pool.map(worker_task, tasks) print(f"任务完成: {results}") except Exception as e: print(f"多进程处理异常: {e}") finally: if pool is not None: print("正在关闭多进程池...") pool.close() pool.join() with POOLS_LOCK: if pool in ACTIVE_POOLS: ACTIVE_POOLS.remove(pool) print("多进程池已关闭") print("测试完成") if __name__ == "__main__": test_multiprocess_cleanup()