#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 系统监控脚本 监控调度器的运行状态、内存使用、CPU使用等,自动处理异常情况 """ import os import sys import time import psutil import signal import subprocess import json import logging from datetime import datetime, timedelta from typing import Dict, Any, Optional # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/system_monitor.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class SystemMonitor: def __init__(self): self.config = { 'max_memory_mb': 2048, # 最大内存使用量 'max_cpu_percent': 80, # 最大CPU使用率 'max_disk_percent': 90, # 最大磁盘使用率 'check_interval': 30, # 检查间隔(秒) 'restart_delay': 60, # 重启延迟(秒) 'max_restarts': 5, # 最大重启次数 'pid_file': 'scheduler.pid', # 调度器PID文件 'log_file': 'logs/system_monitor.log' } self.restart_count = 0 self.last_restart_time = None self.running = True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): """信号处理函数""" signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" logger.info(f"收到信号 {signal_name},正在停止监控...") self.running = False def get_process_info(self) -> Optional[Dict[str, Any]]: """获取调度器进程信息""" try: if not os.path.exists(self.config['pid_file']): return None with open(self.config['pid_file'], 'r') as f: pid = int(f.read().strip()) if not psutil.pid_exists(pid): return None process = psutil.Process(pid) return { 'pid': pid, 'name': process.name(), 'memory_mb': process.memory_info().rss / 1024 / 1024, 'cpu_percent': process.cpu_percent(), 'status': process.status(), 'create_time': process.create_time(), 'num_threads': process.num_threads() } except Exception as e: logger.error(f"获取进程信息失败: {e}") return None def check_system_resources(self) -> Dict[str, Any]: """检查系统资源使用情况""" try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用率 memory = psutil.virtual_memory() memory_percent = memory.percent memory_available_gb = memory.available / 1024 / 1024 / 1024 # 磁盘使用率 disk = psutil.disk_usage('/') disk_percent = disk.percent # 网络连接数 net_connections = len(psutil.net_connections()) return { 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'memory_available_gb': memory_available_gb, 'disk_percent': disk_percent, 'net_connections': net_connections, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"检查系统资源失败: {e}") return {} def check_logs_for_errors(self) -> bool: """检查日志文件中的错误""" try: log_files = [ 'logs/scheduler_stdout.log', 'logs/scheduler_*.log' ] error_patterns = [ 'double free', 'corruption', 'segmentation fault', 'memory error', 'out of memory', 'killed' ] for log_pattern in log_files: if '*' in log_pattern: # 处理通配符 import glob log_files = glob.glob(log_pattern) else: log_files = [log_pattern] for log_file in log_files: if os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() for pattern in error_patterns: if pattern.lower() in content.lower(): logger.warning(f"在日志文件 {log_file} 中发现错误模式: {pattern}") return True except Exception as e: logger.error(f"读取日志文件 {log_file} 失败: {e}") return False except Exception as e: logger.error(f"检查日志文件失败: {e}") return False def restart_scheduler(self) -> bool: """重启调度器""" try: current_time = datetime.now() # 检查重启频率限制 if (self.last_restart_time and (current_time - self.last_restart_time).seconds < self.config['restart_delay']): logger.warning("重启过于频繁,跳过本次重启") return False if self.restart_count >= self.config['max_restarts']: logger.error(f"达到最大重启次数 ({self.config['max_restarts']}),停止重启") return False logger.info("正在重启调度器...") # 停止现有进程 process_info = self.get_process_info() if process_info: try: os.kill(process_info['pid'], signal.SIGTERM) time.sleep(5) if psutil.pid_exists(process_info['pid']): os.kill(process_info['pid'], signal.SIGILL) except Exception as e: logger.error(f"停止进程失败: {e}") # 等待进程完全停止 time.sleep(10) # 启动新进程 try: subprocess.Popen(['python3', 'multi_thread_scheduler.py'], stdout=open('logs/scheduler_stdout.log', 'a'), stderr=subprocess.STDOUT) # 等待进程启动 time.sleep(15) # 检查是否启动成功 if self.get_process_info(): logger.info("调度器重启成功") self.restart_count += 1 self.last_restart_time = current_time return True else: logger.error("调度器重启失败") return False except Exception as e: logger.error(f"启动调度器失败: {e}") return False except Exception as e: logger.error(f"重启调度器过程中发生错误: {e}") return False def should_restart(self, process_info: Dict[str, Any], system_info: Dict[str, Any]) -> bool: """判断是否需要重启""" if not process_info: logger.warning("调度器进程不存在,需要重启") return True # 检查内存使用 if process_info['memory_mb'] > self.config['max_memory_mb']: logger.warning(f"内存使用过高: {process_info['memory_mb']:.1f}MB > {self.config['max_memory_mb']}MB") return True # 检查CPU使用率 if process_info['cpu_percent'] > self.config['max_cpu_percent']: logger.warning(f"CPU使用率过高: {process_info['cpu_percent']:.1f}% > {self.config['max_cpu_percent']}%") return True # 检查系统资源 if system_info.get('memory_percent', 0) > 90: logger.warning(f"系统内存使用率过高: {system_info['memory_percent']:.1f}%") return True if system_info.get('disk_percent', 0) > self.config['max_disk_percent']: logger.warning(f"磁盘使用率过高: {system_info['disk_percent']:.1f}%") return True # 检查日志中的错误 if self.check_logs_for_errors(): logger.warning("检测到日志错误,需要重启") return True return False def run(self): """运行监控""" logger.info("系统监控启动") logger.info(f"配置: {json.dumps(self.config, indent=2)}") while self.running: try: # 获取进程信息 process_info = self.get_process_info() # 获取系统资源信息 system_info = self.check_system_resources() # 记录状态 if process_info: logger.info(f"进程状态: PID={process_info['pid']}, " f"内存={process_info['memory_mb']:.1f}MB, " f"CPU={process_info['cpu_percent']:.1f}%") if system_info: logger.info(f"系统状态: CPU={system_info['cpu_percent']:.1f}%, " f"内存={system_info['memory_percent']:.1f}%, " f"磁盘={system_info['disk_percent']:.1f}%") # 检查是否需要重启 if self.should_restart(process_info, system_info): if self.restart_scheduler(): logger.info("重启操作完成") else: logger.error("重启操作失败") # 等待下次检查 time.sleep(self.config['check_interval']) except Exception as e: logger.error(f"监控过程中发生错误: {e}") time.sleep(self.config['check_interval']) logger.info("系统监控已停止") def main(): """主函数""" print("=" * 60) print("系统监控脚本") print("=" * 60) print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) try: monitor = SystemMonitor() monitor.run() except KeyboardInterrupt: print("\n收到中断信号,正在停止...") except Exception as e: print(f"监控脚本运行失败: {e}") sys.exit(1) if __name__ == "__main__": main()