from collections import defaultdict from datetime import datetime, timedelta from typing import List, Dict import pandas as pd from client import YarnClient, ODPSClient from util import date_util yarn_client = YarnClient.YarnClient("121.40.173.140") odps_client = ODPSClient.ODPSClient() columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"] def format_value(value, column): if column in ["elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime"]: return date_util.timestamp_convert(value) else: return value def df_print(result): df = pd.DataFrame(result) formatted_df = df.apply(lambda x: [format_value(x[col], col) for col in df.columns], axis=1, result_type='expand') formatted_df.columns = df.columns sorted_df = formatted_df.sort_values(by="startedTime") sorted_df = sorted_df[columns] # 获取表头 header = ' | '.join(sorted_df.columns) # 获取数据行 def format_row(row): return ' | '.join([str(row[col]) for col in sorted_df.columns]) rows = sorted_df.apply(format_row, axis=1).tolist() # 打印输出 print(header) print('\n'.join(rows)) def calc_elapsed_max_min_avg(result: list[dict]) -> list[dict]: elapsed_times = defaultdict(list) for item in result: elapsed_times[item['table_name']].append(item['elapsedTime']) stats = {} for name, times in elapsed_times.items(): max_time = max(times) min_time = min(times) avg_time = int(sum(times) / len(times)) stats[name] = {'maxElapsedTime': max_time, 'minElapsedTime': min_time, 'avgElapsedTime': avg_time} # 将计算结果放回原始字典 for item in result: name = item['table_name'] item['maxElapsedTime'] = stats[name]['maxElapsedTime'] item['minElapsedTime'] = stats[name]['minElapsedTime'] item['avgElapsedTime'] = stats[name]['avgElapsedTime'] return result def _analyse(): hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000 result = yarn_client.get_apps(started_time_begin=hours_7_early) result = [ { **{k: v for k, v in item.items() if k != 'name'}, 'table_name': item['name'].split(":")[1].strip() } for item in result if "alg" in item['name'] ] result = calc_elapsed_max_min_avg(result) partition_info = {} for table_name in list({item['table_name'] for item in result}): resp = odps_client.get_all_partition_info(table_name=table_name) partition_info[table_name] = {item['分区']: item for item in resp} spark_task_list = [] for item in result: dt_hh = date_util.date_convert_dt_hh(item['startedTime']) if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]: item = { **item, **partition_info[item['table_name']][dt_hh] } spark_task_list.append(item) df_print(spark_task_list) if __name__ == '__main__': _analyse()