| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from datetime import datetime, time, timedelta
- import pytz
- from alibabacloud_mts20140618.client import Client as MTSClient
- from alibabacloud_mts20140618.models import ListJobRequest
- from alibabacloud_tea_openapi import models
- from resource.monitor.basic_monitor import BasicMonitor
- from util.thread_util import process_tasks
- def write_file(content: str, dt: str):
- print(content)
- with open(f"/Users/zhao/Desktop/tzld/文档/MPS/mps_monitor_{dt}.txt", "a+") as f:
- f.write(content)
- f.write("\n")
- class MPSMonitor(BasicMonitor):
- def __init__(self):
- super().__init__()
- @classmethod
- def download_date(cls, dt: datetime, client: MTSClient):
- dt_str = dt.strftime("%Y%m%d")
- write_file("任务ID,任务创建时间,任务结束时间,任务状态,时长,模板ID,管道ID,输入文件,输出文件", dt_str)
- next_page_token = ""
- start_time = datetime.combine(dt, time.min)
- end_time = datetime.combine(dt + timedelta(days=1), time.min)
- while True:
- request = ListJobRequest(
- maximum_page_size=100,
- start_of_job_created_time_range=start_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
- end_of_job_created_time_range=end_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
- next_page_token=next_page_token
- )
- response = client.list_job(request)
- body = response.body
- jobs = body.job_list.job
- for job in jobs:
- creation_time = job.creation_time
- finish_time = job.finish_time
- state = job.state
- job_id = job.job_id
- output = job.output
- duration = output.properties.duration
- output_file_info = output.output_file
- output_file = f"{output_file_info.bucket}:{output_file_info.object}"
- pipline_id = job.pipeline_id
- template_id = output.template_id
- input_file_info = job.input
- input_file = f"{input_file_info.bucket}:{input_file_info.object}"
- write_file(f"{job_id},{creation_time},{finish_time},{state},{duration},{template_id},{pipline_id},{input_file},{output_file}", dt_str)
- next_page_token = body.next_page_token
- if not next_page_token:
- break
- def run(self):
- config = models.Config(
- access_key_id=self.access_key_id,
- access_key_secret=self.access_key_secret,
- endpoint="mts.cn-hangzhou.aliyuncs.com"
- )
- today = datetime(2026, 1, 31, 0, 0, 0)
- min_dt = datetime(2025, 12, 1, 0, 0, 0)
- tasks = []
- for i in range(0, 1000):
- download_dt = today - timedelta(days=i)
- if download_dt < min_dt:
- break
- tasks.append(lambda dt=download_dt, client=MTSClient(config): self.download_date(dt, client))
- process_tasks(tasks, 15)
- if __name__ == '__main__':
- monitor = MPSMonitor()
- monitor.run()
|