1234567891011121314151617181920212223242526272829 |
- from client import XxlJobClient
- XXL_JOB_BASE_URL = "http://xxl-job-internal.piaoquantv.com/xxl-job-admin"
- xxl_job_client = XxlJobClient.XxJobClient(XXL_JOB_BASE_URL)
- def _main():
- all_job_group = {}
- for item in xxl_job_client.get_all_job_group():
- all_job_group[item['id']] = item
- for item in xxl_job_client.get_all_job_info():
- job_name = item['jobDesc']
- job_id = item['id']
- job_author = item['author']
- job_group = item['jobGroup']
- for log_item in xxl_job_client.get_all_job_log(job_id=job_id, log_status=2):
- job_group_info = all_job_group[job_group]
- print(
- f"【任务执行失败】执行器: {job_group_info['title']} "
- f"任务ID: {job_id}, "
- f"任务名称: {job_name},"
- f" 执行时间: {log_item['triggerTime']}"
- )
- if __name__ == '__main__':
- _main()
|