from kubernetes import client, config class K8SClient(object): def __init__(self, config_file): self.config_file = config_file config.load_kube_config(self.config_file) self.core_api = client.CoreV1Api() self.apps_api = client.AppsV1Api() def get_pod_info_by_deployment(self, deployment_name, namespace='default'): deployment = self.apps_api.read_namespaced_deployment(name=deployment_name, namespace=namespace) replicaset_selector = deployment.spec.selector.match_labels # 根据 ReplicaSet 的标签选择器查找所有相关的 Pod label_selector = ",".join([f"{key}={value}" for key, value in replicaset_selector.items()]) pods = self.core_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector) return pods.items