Przeglądaj źródła

feat:添加MPS监控脚本

zhaohaipeng 1 miesiąc temu
rodzic
commit
8c1387bb52
2 zmienionych plików z 126 dodań i 0 usunięć
  1. 85 0
      resource/monitor/mps_monitor.py
  2. 41 0
      util/thread_util.py

+ 85 - 0
resource/monitor/mps_monitor.py

@@ -0,0 +1,85 @@
+from datetime import datetime, time, timedelta
+
+import pytz
+from alibabacloud_mts20140618.client import Client as MTSClient
+from alibabacloud_mts20140618.models import ListJobRequest
+from alibabacloud_tea_openapi import models
+
+from resource.monitor.basic_monitor import BasicMonitor
+from util.thread_util import process_tasks
+
+
+def write_file(content: str, dt: str):
+    print(content)
+    with open(f"/Users/zhao/Desktop/tzld/文档/MPS/mps_monitor_{dt}.txt", "a+") as f:
+        f.write(content)
+        f.write("\n")
+
+
+class MPSMonitor(BasicMonitor):
+
+    def __init__(self):
+        super().__init__()
+
+    @classmethod
+    def download_date(cls, dt: datetime, client: MTSClient):
+
+        dt_str = dt.strftime("%Y%m%d")
+
+        write_file("任务ID,任务创建时间,任务结束时间,任务状态,时长,模板ID,管道ID,输入文件,输出文件", dt_str)
+        next_page_token = ""
+        start_time = datetime.combine(dt, time.min)
+        end_time = datetime.combine(dt + timedelta(days=1), time.min)
+        while True:
+            request = ListJobRequest(
+                maximum_page_size=100,
+                start_of_job_created_time_range=start_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
+                end_of_job_created_time_range=end_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
+                next_page_token=next_page_token
+            )
+            response = client.list_job(request)
+            body = response.body
+            jobs = body.job_list.job
+            for job in jobs:
+                creation_time = job.creation_time
+                finish_time = job.finish_time
+                state = job.state
+                job_id = job.job_id
+                output = job.output
+
+                duration = output.properties.duration
+                output_file_info = output.output_file
+                output_file = f"{output_file_info.bucket}:{output_file_info.object}"
+
+                pipline_id = job.pipeline_id
+                template_id = output.template_id
+
+                input_file_info = job.input
+                input_file = f"{input_file_info.bucket}:{input_file_info.object}"
+                write_file(f"{job_id},{creation_time},{finish_time},{state},{duration},{template_id},{pipline_id},{input_file},{output_file}", dt_str)
+            next_page_token = body.next_page_token
+            if not next_page_token:
+                break
+
+    def run(self):
+        config = models.Config(
+            access_key_id=self.access_key_id,
+            access_key_secret=self.access_key_secret,
+            endpoint="mts.cn-hangzhou.aliyuncs.com"
+        )
+
+        today = datetime(2026, 1, 31, 0, 0, 0)
+        min_dt = datetime(2025, 12, 1, 0, 0, 0)
+        tasks = []
+        for i in range(0, 1000):
+            download_dt = today - timedelta(days=i)
+            if download_dt < min_dt:
+                break
+            tasks.append(lambda dt=download_dt, client=MTSClient(config): self.download_date(dt, client))
+
+        process_tasks(tasks, 15)
+
+
+if __name__ == '__main__':
+    monitor = MPSMonitor()
+    monitor.run()

+ 41 - 0
util/thread_util.py

@@ -0,0 +1,41 @@
+import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Callable, Sequence
+
+
+def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int = 10) -> None:
+    """
+    通用任务处理器,将任务分批并发执行。
+
+    :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
+    :param max_workers: 最大并发数
+    """
+    total_tasks = len(tasks)
+    task_counter = 0
+
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        future_tasks = {}
+
+        for task in tasks:
+            task_counter += 1
+            print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
+
+            # 提交任务
+            future = executor.submit(task)
+            future_tasks[future] = (task, task_counter)
+
+            time.sleep(0.01)
+
+            # 控制每批次提交的任务数
+            if len(future_tasks) == max_workers or task_counter == total_tasks:
+                # 等待当前批次完成
+                for future in as_completed(future_tasks):
+                    task, counter = future_tasks[future]
+                    try:
+                        # 获取任务执行结果
+                        future.result()
+                        print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
+                    except Exception as exc:
+                        print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
+                # 清空当前批次任务
+                future_tasks = {}