123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 系统监控脚本
- 监控调度器的运行状态、内存使用、CPU使用等,自动处理异常情况
- """
- import os
- import sys
- import time
- import psutil
- import signal
- import subprocess
- import json
- import logging
- from datetime import datetime, timedelta
- from typing import Dict, Any, Optional
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('logs/system_monitor.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class SystemMonitor:
- def __init__(self):
- self.config = {
- 'max_memory_mb': 2048, # 最大内存使用量
- 'max_cpu_percent': 80, # 最大CPU使用率
- 'max_disk_percent': 90, # 最大磁盘使用率
- 'check_interval': 30, # 检查间隔(秒)
- 'restart_delay': 60, # 重启延迟(秒)
- 'max_restarts': 5, # 最大重启次数
- 'pid_file': 'scheduler.pid', # 调度器PID文件
- 'log_file': 'logs/system_monitor.log'
- }
-
- self.restart_count = 0
- self.last_restart_time = None
- self.running = True
-
- # 设置信号处理
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
-
- def signal_handler(self, signum, frame):
- """信号处理函数"""
- signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
- logger.info(f"收到信号 {signal_name},正在停止监控...")
- self.running = False
-
- def get_process_info(self) -> Optional[Dict[str, Any]]:
- """获取调度器进程信息"""
- try:
- if not os.path.exists(self.config['pid_file']):
- return None
-
- with open(self.config['pid_file'], 'r') as f:
- pid = int(f.read().strip())
-
- if not psutil.pid_exists(pid):
- return None
-
- process = psutil.Process(pid)
- return {
- 'pid': pid,
- 'name': process.name(),
- 'memory_mb': process.memory_info().rss / 1024 / 1024,
- 'cpu_percent': process.cpu_percent(),
- 'status': process.status(),
- 'create_time': process.create_time(),
- 'num_threads': process.num_threads()
- }
- except Exception as e:
- logger.error(f"获取进程信息失败: {e}")
- return None
-
- def check_system_resources(self) -> Dict[str, Any]:
- """检查系统资源使用情况"""
- try:
- # CPU使用率
- cpu_percent = psutil.cpu_percent(interval=1)
-
- # 内存使用率
- memory = psutil.virtual_memory()
- memory_percent = memory.percent
- memory_available_gb = memory.available / 1024 / 1024 / 1024
-
- # 磁盘使用率
- disk = psutil.disk_usage('/')
- disk_percent = disk.percent
-
- # 网络连接数
- net_connections = len(psutil.net_connections())
-
- return {
- 'cpu_percent': cpu_percent,
- 'memory_percent': memory_percent,
- 'memory_available_gb': memory_available_gb,
- 'disk_percent': disk_percent,
- 'net_connections': net_connections,
- 'timestamp': datetime.now().isoformat()
- }
- except Exception as e:
- logger.error(f"检查系统资源失败: {e}")
- return {}
-
- def check_logs_for_errors(self) -> bool:
- """检查日志文件中的错误"""
- try:
- log_files = [
- 'logs/scheduler_stdout.log',
- 'logs/scheduler_*.log'
- ]
-
- error_patterns = [
- 'double free',
- 'corruption',
- 'segmentation fault',
- 'memory error',
- 'out of memory',
- 'killed'
- ]
-
- for log_pattern in log_files:
- if '*' in log_pattern:
- # 处理通配符
- import glob
- log_files = glob.glob(log_pattern)
- else:
- log_files = [log_pattern]
-
- for log_file in log_files:
- if os.path.exists(log_file):
- try:
- with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
- content = f.read()
- for pattern in error_patterns:
- if pattern.lower() in content.lower():
- logger.warning(f"在日志文件 {log_file} 中发现错误模式: {pattern}")
- return True
- except Exception as e:
- logger.error(f"读取日志文件 {log_file} 失败: {e}")
-
- return False
- except Exception as e:
- logger.error(f"检查日志文件失败: {e}")
- return False
-
- def restart_scheduler(self) -> bool:
- """重启调度器"""
- try:
- current_time = datetime.now()
-
- # 检查重启频率限制
- if (self.last_restart_time and
- (current_time - self.last_restart_time).seconds < self.config['restart_delay']):
- logger.warning("重启过于频繁,跳过本次重启")
- return False
-
- if self.restart_count >= self.config['max_restarts']:
- logger.error(f"达到最大重启次数 ({self.config['max_restarts']}),停止重启")
- return False
-
- logger.info("正在重启调度器...")
-
- # 停止现有进程
- process_info = self.get_process_info()
- if process_info:
- try:
- os.kill(process_info['pid'], signal.SIGTERM)
- time.sleep(5)
- if psutil.pid_exists(process_info['pid']):
- os.kill(process_info['pid'], signal.SIGILL)
- except Exception as e:
- logger.error(f"停止进程失败: {e}")
-
- # 等待进程完全停止
- time.sleep(10)
-
- # 启动新进程
- try:
- subprocess.Popen(['python3', 'multi_thread_scheduler.py'],
- stdout=open('logs/scheduler_stdout.log', 'a'),
- stderr=subprocess.STDOUT)
-
- # 等待进程启动
- time.sleep(15)
-
- # 检查是否启动成功
- if self.get_process_info():
- logger.info("调度器重启成功")
- self.restart_count += 1
- self.last_restart_time = current_time
- return True
- else:
- logger.error("调度器重启失败")
- return False
-
- except Exception as e:
- logger.error(f"启动调度器失败: {e}")
- return False
-
- except Exception as e:
- logger.error(f"重启调度器过程中发生错误: {e}")
- return False
-
- def should_restart(self, process_info: Dict[str, Any], system_info: Dict[str, Any]) -> bool:
- """判断是否需要重启"""
- if not process_info:
- logger.warning("调度器进程不存在,需要重启")
- return True
-
- # 检查内存使用
- if process_info['memory_mb'] > self.config['max_memory_mb']:
- logger.warning(f"内存使用过高: {process_info['memory_mb']:.1f}MB > {self.config['max_memory_mb']}MB")
- return True
-
- # 检查CPU使用率
- if process_info['cpu_percent'] > self.config['max_cpu_percent']:
- logger.warning(f"CPU使用率过高: {process_info['cpu_percent']:.1f}% > {self.config['max_cpu_percent']}%")
- return True
-
- # 检查系统资源
- if system_info.get('memory_percent', 0) > 90:
- logger.warning(f"系统内存使用率过高: {system_info['memory_percent']:.1f}%")
- return True
-
- if system_info.get('disk_percent', 0) > self.config['max_disk_percent']:
- logger.warning(f"磁盘使用率过高: {system_info['disk_percent']:.1f}%")
- return True
-
- # 检查日志中的错误
- if self.check_logs_for_errors():
- logger.warning("检测到日志错误,需要重启")
- return True
-
- return False
-
- def run(self):
- """运行监控"""
- logger.info("系统监控启动")
- logger.info(f"配置: {json.dumps(self.config, indent=2)}")
-
- while self.running:
- try:
- # 获取进程信息
- process_info = self.get_process_info()
-
- # 获取系统资源信息
- system_info = self.check_system_resources()
-
- # 记录状态
- if process_info:
- logger.info(f"进程状态: PID={process_info['pid']}, "
- f"内存={process_info['memory_mb']:.1f}MB, "
- f"CPU={process_info['cpu_percent']:.1f}%")
-
- if system_info:
- logger.info(f"系统状态: CPU={system_info['cpu_percent']:.1f}%, "
- f"内存={system_info['memory_percent']:.1f}%, "
- f"磁盘={system_info['disk_percent']:.1f}%")
-
- # 检查是否需要重启
- if self.should_restart(process_info, system_info):
- if self.restart_scheduler():
- logger.info("重启操作完成")
- else:
- logger.error("重启操作失败")
-
- # 等待下次检查
- time.sleep(self.config['check_interval'])
-
- except Exception as e:
- logger.error(f"监控过程中发生错误: {e}")
- time.sleep(self.config['check_interval'])
-
- logger.info("系统监控已停止")
- def main():
- """主函数"""
- print("=" * 60)
- print("系统监控脚本")
- print("=" * 60)
- print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("=" * 60)
-
- try:
- monitor = SystemMonitor()
- monitor.run()
- except KeyboardInterrupt:
- print("\n收到中断信号,正在停止...")
- except Exception as e:
- print(f"监控脚本运行失败: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|