xxl_job.py 926 B

1234567891011121314151617181920212223242526272829
  1. from client import XxlJobClient
  2. XXL_JOB_BASE_URL = "http://xxl-job-internal.piaoquantv.com/xxl-job-admin"
  3. xxl_job_client = XxlJobClient.XxJobClient(XXL_JOB_BASE_URL)
  4. def _main():
  5. all_job_group = {}
  6. for item in xxl_job_client.get_all_job_group():
  7. all_job_group[item['id']] = item
  8. for item in xxl_job_client.get_all_job_info():
  9. job_name = item['jobDesc']
  10. job_id = item['id']
  11. job_author = item['author']
  12. job_group = item['jobGroup']
  13. for log_item in xxl_job_client.get_all_job_log(job_id=job_id, log_status=2):
  14. job_group_info = all_job_group[job_group]
  15. print(
  16. f"【任务执行失败】执行器: {job_group_info['title']} "
  17. f"任务ID: {job_id}, "
  18. f"任务名称: {job_name},"
  19. f" 执行时间: {log_item['triggerTime']}"
  20. )
  21. if __name__ == '__main__':
  22. _main()