# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/7/20 import json import os import socket import sys import psutil sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper class MonitorCpuMemory: @classmethod def get_script_list(cls, log_type, crawler, env): script_list = [] select_sql = f""" SELECT DISTINCT `key` FROM crawler_enum WHERE `key` LIKE "%run%"; """ sql_response = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") for response in sql_response: script_list.append(response["key"]) script_list.append("run_cpu_memory") return script_list @classmethod def get_ip_address(cls): try: # 创建一个 UDP 套接字 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 连接到一个外部 IP 地址 sock.connect(("8.8.8.8", 80)) # 获取本地 IP 地址 local_ip = sock.getsockname()[0] return local_ip except socket.error: return None @classmethod def get_pid_path(cls, script_name): # 遍历所有正在运行的进程 for proc in psutil.process_iter(): try: # 获取进程的命令行参数 cmds = proc.cmdline() # 检查命令行参数是否包含爬虫脚本的名称或关键字 for cmd in cmds: if script_name in cmd: # Common.logger(log_type, crawler).info(f"cmd:{cmd}") # 获取进程的PID pid = proc.pid pid_path = { "pid": pid, "path": cmd, } return pid_path except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass @classmethod def get_cpu_memory(cls, script_name): # 获取当前进程的PID pid_path = cls.get_pid_path(script_name) # print(f"pid_path:{pid_path}") if pid_path is None: return None # 获取CPU的使用情况 cpu_percent = round(psutil.Process(pid_path["pid"]).cpu_percent(), 2) # 获取内存的使用情况 memory_percent = round(psutil.Process(pid_path["pid"]).memory_percent(), 2) cpu_memory = { "pid": pid_path["pid"], "path": pid_path["path"], "cpu": cpu_percent, "memory": memory_percent, } return cpu_memory @classmethod def get_all_script_cpu_memory(cls, log_type, crawler, env): script_list = cls.get_script_list(log_type, crawler, env) for script_name in script_list: try: Common.logger(log_type, crawler).info(f"开始监控:{script_name}") Common.logging(log_type, crawler, env, f"开始监控:{script_name}") crawler_info = cls.get_cpu_memory(script_name) if crawler_info is None: Common.logger(log_type, crawler).info(f"脚本未运行\n") Common.logging(log_type, crawler, env, f"脚本未运行\n") continue script_info_dict = json.dumps({ "crawler_name": script_name, "crawler_ip": cls.get_ip_address(), "crawler_pid": crawler_info["pid"], "crawler_path": crawler_info["path"], "crawler_cpu": crawler_info["cpu"], "crawler_memory": crawler_info["memory"] }) Common.logger(log_type, crawler).info(f'script_info:{script_info_dict}\n') Common.logging(log_type, crawler, env, f'{script_info_dict}') except Exception as e: Common.logger(log_type, crawler).error(f"监控{script_name}时异常:{e}\n") Common.logging(log_type, crawler, env, f"监控{script_name}时异常:{e}\n") if __name__ == "__main__": ipaddress = MonitorCpuMemory.get_ip_address() print(ipaddress) pass