feature_spark_analyse.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from collections import defaultdict
  2. from datetime import datetime, timedelta
  3. import pandas as pd
  4. from client import YarnClient, ODPSClient
  5. from util import date_util
  6. yarn_client = YarnClient.YarnClient("121.40.173.140")
  7. odps_client = ODPSClient.ODPSClient()
  8. columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "maxElapsedTime",
  9. "minElapsedTime", "avgElapsedTime", "分区", "数据量", "数据大小", "创建时间", "更新时间"]
  10. def format_value(value, column):
  11. if column in ["elapsedTime", "maxElapsedTime", "minElapsedTime", "avgElapsedTime"]:
  12. return date_util.timestamp_convert(value)
  13. else:
  14. return value
  15. def df_print(result):
  16. df = pd.DataFrame(result)
  17. formatted_df = df.apply(lambda x: [format_value(x[col], col) for col in df.columns], axis=1, result_type='expand')
  18. formatted_df.columns = df.columns
  19. sorted_df = formatted_df.sort_values(by="startedTime")
  20. sorted_df = sorted_df[columns]
  21. # 获取表头
  22. header = ' | '.join(sorted_df.columns)
  23. # 获取数据行
  24. def format_row(row):
  25. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  26. rows = sorted_df.apply(format_row, axis=1).tolist()
  27. # 打印输出
  28. print(header)
  29. print('\n'.join(rows))
  30. def calc_elapsed_max_min_avg(result: list[dict]) -> list[dict]:
  31. elapsed_times = defaultdict(list)
  32. for item in result:
  33. elapsed_times[item['table_name']].append(item['elapsedTime'])
  34. stats = {}
  35. for name, times in elapsed_times.items():
  36. max_time = max(times)
  37. min_time = min(times)
  38. avg_time = int(sum(times) / len(times))
  39. stats[name] = {'maxElapsedTime': max_time, 'minElapsedTime': min_time, 'avgElapsedTime': avg_time}
  40. # 将计算结果放回原始字典
  41. for item in result:
  42. name = item['table_name']
  43. item['maxElapsedTime'] = stats[name]['maxElapsedTime']
  44. item['minElapsedTime'] = stats[name]['minElapsedTime']
  45. item['avgElapsedTime'] = stats[name]['avgElapsedTime']
  46. return result
  47. def _analyse():
  48. hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
  49. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  50. result = [
  51. {
  52. **{k: v for k, v in item.items() if k != 'name'},
  53. 'table_name': item['name'].split(":")[1].strip()
  54. }
  55. for item in result
  56. if "alg" in item['name']
  57. ]
  58. result = calc_elapsed_max_min_avg(result)
  59. partition_info = {}
  60. for table_name in list({item['table_name'] for item in result}):
  61. resp = odps_client.get_all_partition_info(table_name=table_name)
  62. partition_info[table_name] = {item['分区']: item for item in resp}
  63. spark_task_list = []
  64. for item in result:
  65. dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
  66. if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
  67. item = {
  68. **item,
  69. **partition_info[item['table_name']][dt_hh]
  70. }
  71. spark_task_list.append(item)
  72. df_print(spark_task_list)
  73. if __name__ == '__main__':
  74. _analyse()