feature_spark_analyse.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from datetime import datetime, timedelta
  2. import pandas as pd
  3. from client import YarnClient, ODPSClient
  4. from util import date_util
  5. yarn_client = YarnClient.YarnClient("121.40.173.140")
  6. odps_client = ODPSClient.ODPSClient()
  7. columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
  8. "数据大小", "创建时间", "更新时间"]
  9. def df_print(result):
  10. df = pd.DataFrame(result)
  11. sorted_df = df.sort_values(by="startedTime")
  12. sorted_df = sorted_df[columns]
  13. # 获取表头
  14. header = ' | '.join(sorted_df.columns)
  15. # 获取数据行
  16. def format_row(row):
  17. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  18. rows = sorted_df.apply(format_row, axis=1).tolist()
  19. # 打印输出
  20. print(header)
  21. print('\n'.join(rows))
  22. def _analyse():
  23. hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
  24. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  25. result = [
  26. {
  27. **{k: v for k, v in item.items() if k != 'name'},
  28. 'table_name': item['name'].split(":")[1].strip()
  29. }
  30. for item in result
  31. if "alg" in item['name'] and item['state'] == 'RUNNING'
  32. ]
  33. partition_info = {}
  34. for table_name in list({item['table_name'] for item in result}):
  35. resp = odps_client.get_all_partition_info(table_name=table_name)
  36. partition_info[table_name] = {item['分区']: item for item in resp}
  37. spark_task_list = []
  38. for item in result:
  39. dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
  40. if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
  41. item = {
  42. **item,
  43. **partition_info[item['table_name']][dt_hh]
  44. }
  45. spark_task_list.append(item)
  46. df_print(spark_task_list)
  47. if __name__ == '__main__':
  48. _analyse()