from datetime import datetime, timedelta import pandas as pd from client import YarnClient, ODPSClient from util import date_util yarn_client = YarnClient.YarnClient("121.40.173.140") odps_client = ODPSClient.ODPSClient() columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"] def df_print(result): df = pd.DataFrame(result) sorted_df = df.sort_values(by="startedTime") sorted_df = sorted_df[columns] # 获取表头 header = ' | '.join(sorted_df.columns) # 获取数据行 def format_row(row): return ' | '.join([str(row[col]) for col in sorted_df.columns]) rows = sorted_df.apply(format_row, axis=1).tolist() # 打印输出 print(header) print('\n'.join(rows)) def _analyse(): hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000 result = yarn_client.get_apps(started_time_begin=hours_7_early) result = [ { **{k: v for k, v in item.items() if k != 'name'}, 'table_name': item['name'].split(":")[1].strip() } for item in result if "alg" in item['name'] and item['state'] == 'RUNNING' ] partition_info = {} for table_name in list({item['table_name'] for item in result}): resp = odps_client.get_all_partition_info(table_name=table_name) partition_info[table_name] = {item['分区']: item for item in resp} spark_task_list = [] for item in result: dt_hh = date_util.date_convert_dt_hh(item['startedTime']) if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]: item = { **item, **partition_info[item['table_name']][dt_hh] } spark_task_list.append(item) df_print(spark_task_list) if __name__ == '__main__': _analyse()