mps_monitor.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from datetime import datetime, time, timedelta
  2. import pytz
  3. from alibabacloud_mts20140618.client import Client as MTSClient
  4. from alibabacloud_mts20140618.models import ListJobRequest
  5. from alibabacloud_tea_openapi import models
  6. from resource.monitor.basic_monitor import BasicMonitor
  7. from util.thread_util import process_tasks
  8. def write_file(content: str, dt: str):
  9. print(content)
  10. with open(f"/Users/zhao/Desktop/tzld/文档/MPS/mps_monitor_{dt}.txt", "a+") as f:
  11. f.write(content)
  12. f.write("\n")
  13. class MPSMonitor(BasicMonitor):
  14. def __init__(self):
  15. super().__init__()
  16. @classmethod
  17. def download_date(cls, dt: datetime, client: MTSClient):
  18. dt_str = dt.strftime("%Y%m%d")
  19. write_file("任务ID,任务创建时间,任务结束时间,任务状态,时长,模板ID,管道ID,输入文件,输出文件", dt_str)
  20. next_page_token = ""
  21. start_time = datetime.combine(dt, time.min)
  22. end_time = datetime.combine(dt + timedelta(days=1), time.min)
  23. while True:
  24. request = ListJobRequest(
  25. maximum_page_size=100,
  26. start_of_job_created_time_range=start_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
  27. end_of_job_created_time_range=end_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
  28. next_page_token=next_page_token
  29. )
  30. response = client.list_job(request)
  31. body = response.body
  32. jobs = body.job_list.job
  33. for job in jobs:
  34. creation_time = job.creation_time
  35. finish_time = job.finish_time
  36. state = job.state
  37. job_id = job.job_id
  38. output = job.output
  39. duration = output.properties.duration
  40. output_file_info = output.output_file
  41. output_file = f"{output_file_info.bucket}:{output_file_info.object}"
  42. pipline_id = job.pipeline_id
  43. template_id = output.template_id
  44. input_file_info = job.input
  45. input_file = f"{input_file_info.bucket}:{input_file_info.object}"
  46. write_file(f"{job_id},{creation_time},{finish_time},{state},{duration},{template_id},{pipline_id},{input_file},{output_file}", dt_str)
  47. next_page_token = body.next_page_token
  48. if not next_page_token:
  49. break
  50. def run(self):
  51. config = models.Config(
  52. access_key_id=self.access_key_id,
  53. access_key_secret=self.access_key_secret,
  54. endpoint="mts.cn-hangzhou.aliyuncs.com"
  55. )
  56. today = datetime(2026, 1, 31, 0, 0, 0)
  57. min_dt = datetime(2025, 12, 1, 0, 0, 0)
  58. tasks = []
  59. for i in range(0, 1000):
  60. download_dt = today - timedelta(days=i)
  61. if download_dt < min_dt:
  62. break
  63. tasks.append(lambda dt=download_dt, client=MTSClient(config): self.download_date(dt, client))
  64. process_tasks(tasks, 15)
  65. if __name__ == '__main__':
  66. monitor = MPSMonitor()
  67. monitor.run()