zhaohaipeng пре 10 месеци
родитељ
комит
d8c53e5e3b
2 измењених фајлова са 38 додато и 5 уклоњено
  1. 1 1
      client/YarnClient.py
  2. 37 4
      feature_spark_analyse.py

+ 1 - 1
client/YarnClient.py

@@ -43,7 +43,7 @@ class YarnClient(object):
                 "launchTime": date_util.ts_cover_str(app['launchTime']),
                 "queue": app['queue'],
                 "state": app['state'],
-                "elapsedTime": date_util.timestamp_convert(app['elapsedTime']),
+                "elapsedTime": app['elapsedTime'],
             }
             result.append(item)
 

+ 37 - 4
feature_spark_analyse.py

@@ -1,4 +1,6 @@
+from collections import defaultdict
 from datetime import datetime, timedelta
+from typing import List, Dict
 
 import pandas as pd
 
@@ -8,13 +10,22 @@ from util import date_util
 yarn_client = YarnClient.YarnClient("121.40.173.140")
 odps_client = ODPSClient.ODPSClient()
 
-columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
-           "数据大小", "创建时间", "更新时间"]
+columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "maxElapsedTime",
+           "minElapsedTime", "avgElapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"]
+
+
+def format_value(value, column):
+    if column in ["elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime"]:
+        return date_util.timestamp_convert(value)
+    else:
+        return value
 
 
 def df_print(result):
     df = pd.DataFrame(result)
-    sorted_df = df.sort_values(by="startedTime")
+    formatted_df = df.apply(lambda x: [format_value(x[col], col) for col in df.columns], axis=1, result_type='expand')
+    formatted_df.columns = df.columns
+    sorted_df = formatted_df.sort_values(by="startedTime")
 
     sorted_df = sorted_df[columns]
 
@@ -32,6 +43,27 @@ def df_print(result):
     print('\n'.join(rows))
 
 
+def calc_elapsed_max_min_avg(result: list[dict]) -> list[dict]:
+    elapsed_times = defaultdict(list)
+    for item in result:
+        elapsed_times[item['table_name']].append(item['elapsedTime'])
+
+    stats = {}
+    for name, times in elapsed_times.items():
+        max_time = max(times)
+        min_time = min(times)
+        avg_time = int(sum(times) / len(times))
+        stats[name] = {'maxElapsedTime': max_time, 'minElapsedTime': min_time, 'avgElapsedTime': avg_time}
+
+    # 将计算结果放回原始字典
+    for item in result:
+        name = item['table_name']
+        item['maxElapsedTime'] = stats[name]['maxElapsedTime']
+        item['minElapsedTime'] = stats[name]['minElapsedTime']
+        item['avgElapsedTime'] = stats[name]['avgElapsedTime']
+    return result
+
+
 def _analyse():
     hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
     result = yarn_client.get_apps(started_time_begin=hours_7_early)
@@ -41,8 +73,9 @@ def _analyse():
             'table_name': item['name'].split(":")[1].strip()
         }
         for item in result
-        if "alg" in item['name'] and item['state'] == 'RUNNING'
+        if "alg" in item['name']
     ]
+    result = calc_elapsed_max_min_avg(result)
     partition_info = {}
     for table_name in list({item['table_name'] for item in result}):
         resp = odps_client.get_all_partition_info(table_name=table_name)