import json import requests from util import dateutil class YarnClient(object): def __init__(self, cluster_ip: str): self.cluster_ip = cluster_ip def get_apps(self, queue="", finished_time_begin=0, finished_time_end=0, started_time_begin=0, started_time_end=0, limit=10) -> list[dict]: query_str = f"limit=&{limit}" if queue != "": query_str = f"{query_str}&queue={queue}" if finished_time_end > 0: query_str = f"{query_str}&finishedTimeBegin={finished_time_begin}" if finished_time_end > 0: query_str = f"{query_str}&finishedTimeEnd={finished_time_end}" if started_time_begin > 0: query_str = f"{query_str}&startedTimeBegin={started_time_begin}" if started_time_end > 0: query_str = f"{query_str}&startedTimeEnd={started_time_end}" url = f"http://{self.cluster_ip}:8088/ws/v1/cluster/apps?{query_str}" response = requests.get(url) res = json.loads(response.text) result = [] for app in res['apps']['app']: item = { "id": app['id'], "name": app['name'], "finalStatus": app['finalStatus'], "finishedTime": dateutil.ts_cover_str(app['finishedTime']), "startedTime": dateutil.ts_cover_str(app['startedTime']), "launchTime": dateutil.ts_cover_str(app['launchTime']), "queue": app['queue'], "state": app['state'], "elapsedTime": app['elapsedTime'], } result.append(item) return result