feature_spark_analyse.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from collections import defaultdict
  2. from datetime import datetime, timedelta
  3. from typing import List, Dict
  4. import pandas as pd
  5. from client import YarnClient, ODPSClient
  6. from util import date_util
  7. yarn_client = YarnClient.YarnClient("121.40.173.140")
  8. odps_client = ODPSClient.ODPSClient()
  9. columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "maxElapsedTime",
  10. "minElapsedTime", "avgElapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"]
  11. def format_value(value, column):
  12. if column in ["elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime"]:
  13. return date_util.timestamp_convert(value)
  14. else:
  15. return value
  16. def df_print(result):
  17. df = pd.DataFrame(result)
  18. formatted_df = df.apply(lambda x: [format_value(x[col], col) for col in df.columns], axis=1, result_type='expand')
  19. formatted_df.columns = df.columns
  20. sorted_df = formatted_df.sort_values(by="startedTime")
  21. sorted_df = sorted_df[columns]
  22. # 获取表头
  23. header = ' | '.join(sorted_df.columns)
  24. # 获取数据行
  25. def format_row(row):
  26. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  27. rows = sorted_df.apply(format_row, axis=1).tolist()
  28. # 打印输出
  29. print(header)
  30. print('\n'.join(rows))
  31. def calc_elapsed_max_min_avg(result: list[dict]) -> list[dict]:
  32. elapsed_times = defaultdict(list)
  33. for item in result:
  34. elapsed_times[item['table_name']].append(item['elapsedTime'])
  35. stats = {}
  36. for name, times in elapsed_times.items():
  37. max_time = max(times)
  38. min_time = min(times)
  39. avg_time = int(sum(times) / len(times))
  40. stats[name] = {'maxElapsedTime': max_time, 'minElapsedTime': min_time, 'avgElapsedTime': avg_time}
  41. # 将计算结果放回原始字典
  42. for item in result:
  43. name = item['table_name']
  44. item['maxElapsedTime'] = stats[name]['maxElapsedTime']
  45. item['minElapsedTime'] = stats[name]['minElapsedTime']
  46. item['avgElapsedTime'] = stats[name]['avgElapsedTime']
  47. return result
  48. def _analyse():
  49. hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
  50. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  51. result = [
  52. {
  53. **{k: v for k, v in item.items() if k != 'name'},
  54. 'table_name': item['name'].split(":")[1].strip()
  55. }
  56. for item in result
  57. if "alg" in item['name']
  58. ]
  59. result = calc_elapsed_max_min_avg(result)
  60. partition_info = {}
  61. for table_name in list({item['table_name'] for item in result}):
  62. resp = odps_client.get_all_partition_info(table_name=table_name)
  63. partition_info[table_name] = {item['分区']: item for item in resp}
  64. spark_task_list = []
  65. for item in result:
  66. dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
  67. if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
  68. item = {
  69. **item,
  70. **partition_info[item['table_name']][dt_hh]
  71. }
  72. spark_task_list.append(item)
  73. df_print(spark_task_list)
  74. if __name__ == '__main__':
  75. _analyse()