K8SClient.py 833 B

1234567891011121314151617181920
  1. from kubernetes import client, config
  2. class K8SClient(object):
  3. def __init__(self, config_file):
  4. self.config_file = config_file
  5. config.load_kube_config(self.config_file)
  6. self.core_api = client.CoreV1Api()
  7. self.apps_api = client.AppsV1Api()
  8. def get_pod_info_by_deployment(self, deployment_name, namespace='default'):
  9. deployment = self.apps_api.read_namespaced_deployment(name=deployment_name, namespace=namespace)
  10. replicaset_selector = deployment.spec.selector.match_labels
  11. # 根据 ReplicaSet 的标签选择器查找所有相关的 Pod
  12. label_selector = ",".join([f"{key}={value}" for key, value in replicaset_selector.items()])
  13. pods = self.core_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
  14. return pods.items