XxlJobClient.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import json
  2. import requests
  3. header = {
  4. "Cookie": "Hm_lvt_7f5c31e86e4cc8a706f7b3957d81f353=1714270679; "
  5. "XXL_JOB_LOGIN_IDENTITY=7b226964223a312c22757365726e616d65223a2261646d696e222c2270617373776f7264"
  6. "223a226531306164633339343962613539616262653536653035376632306638383365222c22726f6c65223a312c2270"
  7. "65726d697373696f6e223a6e756c6c7d",
  8. "content-type": "application/x-www-form-urlencoded; charset=UTF-8"
  9. }
  10. class XxJobClient(object):
  11. def __init__(self, base_url):
  12. self.base_url = base_url
  13. def get_all_job_log(self, job_group=0, job_id=0, log_status=-1) -> list[dict]:
  14. """
  15. 获取job的执行日志
  16. :param job_group: 执行器ID, 0-全部
  17. :param job_id: 任务ID, 0-全部
  18. :param log_status: 任务状态,-1-全部, 1-成功, 2-失败, 3-进行中
  19. :return:
  20. """
  21. url = f"{self.base_url}/joblog/pageList"
  22. param = {
  23. "start": 0,
  24. "length": 1000,
  25. "jobGroup": job_group,
  26. "jobId": job_id,
  27. "logStatus": log_status,
  28. "filterTime": "2024-09-27 00:00:00 - 2024-09-27 23:59:59"
  29. }
  30. response = requests.post(url, data=param, headers=header)
  31. return json.loads(response.content)['data']
  32. def get_all_job_group(self) -> list[dict]:
  33. """
  34. 获取所有的执行器
  35. :return:
  36. """
  37. url = f"{self.base_url}/jobgroup/pageList"
  38. param = {
  39. "start": 0,
  40. "length": 1000,
  41. }
  42. response = requests.post(url, data=param, headers=header)
  43. return json.loads(response.content)['data']
  44. def get_all_job_info(self, job_group=0, trigger_status=-1) -> list[dict]:
  45. """
  46. 获取所有的job信息
  47. :param job_group: 执行器ID
  48. :param trigger_status: 状态 0-关,1-开,-1-全部
  49. :return:
  50. """
  51. url = f"{self.base_url}/jobinfo/pageList"
  52. param = {
  53. "jobGroup": job_group,
  54. "start": 0,
  55. "length": 1000,
  56. "triggerStatus": trigger_status
  57. }
  58. response = requests.post(url, data=param, headers=header)
  59. return json.loads(response.content)['data']