1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from collections import defaultdict
- from datetime import datetime, timedelta
- from typing import List, Dict
- import pandas as pd
- from client import YarnClient, ODPSClient
- from util import date_util
- yarn_client = YarnClient.YarnClient("121.40.173.140")
- odps_client = ODPSClient.ODPSClient()
- columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "maxElapsedTime",
- "minElapsedTime", "avgElapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"]
- def format_value(value, column):
- if column in ["elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime"]:
- return date_util.timestamp_convert(value)
- else:
- return value
- def df_print(result):
- df = pd.DataFrame(result)
- formatted_df = df.apply(lambda x: [format_value(x[col], col) for col in df.columns], axis=1, result_type='expand')
- formatted_df.columns = df.columns
- sorted_df = formatted_df.sort_values(by="startedTime")
- sorted_df = sorted_df[columns]
- # 获取表头
- header = ' | '.join(sorted_df.columns)
- # 获取数据行
- def format_row(row):
- return ' | '.join([str(row[col]) for col in sorted_df.columns])
- rows = sorted_df.apply(format_row, axis=1).tolist()
- # 打印输出
- print(header)
- print('\n'.join(rows))
- def calc_elapsed_max_min_avg(result: list[dict]) -> list[dict]:
- elapsed_times = defaultdict(list)
- for item in result:
- elapsed_times[item['table_name']].append(item['elapsedTime'])
- stats = {}
- for name, times in elapsed_times.items():
- max_time = max(times)
- min_time = min(times)
- avg_time = int(sum(times) / len(times))
- stats[name] = {'maxElapsedTime': max_time, 'minElapsedTime': min_time, 'avgElapsedTime': avg_time}
- # 将计算结果放回原始字典
- for item in result:
- name = item['table_name']
- item['maxElapsedTime'] = stats[name]['maxElapsedTime']
- item['minElapsedTime'] = stats[name]['minElapsedTime']
- item['avgElapsedTime'] = stats[name]['avgElapsedTime']
- return result
- def _analyse():
- hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
- result = yarn_client.get_apps(started_time_begin=hours_7_early)
- result = [
- {
- **{k: v for k, v in item.items() if k != 'name'},
- 'table_name': item['name'].split(":")[1].strip()
- }
- for item in result
- if "alg" in item['name']
- ]
- result = calc_elapsed_max_min_avg(result)
- partition_info = {}
- for table_name in list({item['table_name'] for item in result}):
- resp = odps_client.get_all_partition_info(table_name=table_name)
- partition_info[table_name] = {item['分区']: item for item in resp}
- spark_task_list = []
- for item in result:
- dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
- if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
- item = {
- **item,
- **partition_info[item['table_name']][dt_hh]
- }
- spark_task_list.append(item)
- df_print(spark_task_list)
- if __name__ == '__main__':
- _analyse()
|