test_process_cleanup.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 测试多进程清理功能
  5. """
  6. import os
  7. import sys
  8. import time
  9. import signal
  10. import multiprocessing
  11. import threading
  12. from contextlib import asynccontextmanager
  13. # 添加项目路径
  14. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  15. # 模拟进程池跟踪
  16. ACTIVE_POOLS = []
  17. POOLS_LOCK = threading.Lock()
  18. def cleanup_all_pools():
  19. """清理所有活跃的进程池"""
  20. global ACTIVE_POOLS, POOLS_LOCK
  21. with POOLS_LOCK:
  22. print(f"开始清理 {len(ACTIVE_POOLS)} 个活跃进程池...")
  23. for pool in ACTIVE_POOLS:
  24. try:
  25. print("正在终止进程池...")
  26. pool.terminate()
  27. pool.join(timeout=5) # 等待5秒
  28. if pool._state != 'CLOSED':
  29. print("进程池未正常关闭,强制终止")
  30. pool.kill()
  31. except Exception as e:
  32. print(f"清理进程池时出错: {e}")
  33. ACTIVE_POOLS.clear()
  34. print("所有进程池已清理")
  35. def signal_handler(signum, frame):
  36. """信号处理器"""
  37. print(f"收到信号 {signum},开始清理...")
  38. cleanup_all_pools()
  39. print("清理完成,退出程序")
  40. sys.exit(0)
  41. def register_signal_handlers():
  42. """注册信号处理器"""
  43. signal.signal(signal.SIGTERM, signal_handler)
  44. signal.signal(signal.SIGINT, signal_handler)
  45. def worker_task(task_id):
  46. """模拟工作任务"""
  47. print(f"Worker {task_id} 开始工作...")
  48. time.sleep(10) # 模拟长时间工作
  49. print(f"Worker {task_id} 完成工作")
  50. return f"Task {task_id} completed"
  51. def test_multiprocess_cleanup():
  52. """测试多进程清理"""
  53. print("🚀 开始测试多进程清理...")
  54. # 注册信号处理器
  55. register_signal_handlers()
  56. # 创建进程池
  57. pool = None
  58. try:
  59. pool = multiprocessing.Pool(processes=3)
  60. with POOLS_LOCK:
  61. ACTIVE_POOLS.append(pool)
  62. print("开始多进程任务...")
  63. tasks = [1, 2, 3, 4, 5]
  64. results = pool.map(worker_task, tasks)
  65. print(f"任务完成: {results}")
  66. except Exception as e:
  67. print(f"多进程处理异常: {e}")
  68. finally:
  69. if pool is not None:
  70. print("正在关闭多进程池...")
  71. pool.close()
  72. pool.join()
  73. with POOLS_LOCK:
  74. if pool in ACTIVE_POOLS:
  75. ACTIVE_POOLS.remove(pool)
  76. print("多进程池已关闭")
  77. print("测试完成")
  78. if __name__ == "__main__":
  79. test_multiprocess_cleanup()