YarnClient.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import json
  2. from typing import List, Dict
  3. import requests
  4. from util import date_util
  5. class YarnClient(object):
  6. def __init__(self, cluster_ip: str):
  7. self.cluster_ip = cluster_ip
  8. def get_apps(self, queue="",
  9. finished_time_begin=0,
  10. finished_time_end=0,
  11. started_time_begin=0,
  12. started_time_end=0,
  13. limit=10000) -> List[Dict]:
  14. query_str = f"limit={limit}"
  15. if queue != "":
  16. query_str = f"{query_str}&queue={queue}"
  17. if finished_time_end > 0:
  18. query_str = f"{query_str}&finishedTimeBegin={finished_time_begin}"
  19. if finished_time_end > 0:
  20. query_str = f"{query_str}&finishedTimeEnd={finished_time_end}"
  21. if started_time_begin > 0:
  22. query_str = f"{query_str}&startedTimeBegin={started_time_begin}"
  23. if started_time_end > 0:
  24. query_str = f"{query_str}&startedTimeEnd={started_time_end}"
  25. url = f"http://{self.cluster_ip}:8088/ws/v1/cluster/apps?{query_str}"
  26. print(f"请求的Yarn Restful API为: {url}")
  27. response = requests.get(url)
  28. res = json.loads(response.text)
  29. result = []
  30. for app in res['apps']['app']:
  31. item = {
  32. "id": app['id'],
  33. "name": app['name'],
  34. "finalStatus": app['finalStatus'],
  35. "finishedTime": date_util.ts_cover_str(app['finishedTime']),
  36. "startedTime": date_util.ts_cover_str(app['startedTime']),
  37. "launchTime": date_util.ts_cover_str(app['launchTime']),
  38. "queue": app['queue'],
  39. "state": app['state'],
  40. "elapsedTime": app['elapsedTime'],
  41. }
  42. result.append(item)
  43. return result