Browse Source

feat:添加分析任务

zhaohaipeng 10 months ago
parent
commit
ab6ed2a7ad
2 changed files with 70 additions and 61 deletions
  1. 65 0
      feature_spark_analyse.py
  2. 5 61
      feature_spark_monitor.py

+ 65 - 0
feature_spark_analyse.py

@@ -0,0 +1,65 @@
+from datetime import datetime, timedelta
+
+import pandas as pd
+
+from client import YarnClient, ODPSClient
+from util import date_util
+
+yarn_client = YarnClient.YarnClient("121.40.173.140")
+odps_client = ODPSClient.ODPSClient()
+
+columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
+           "数据大小", "创建时间", "更新时间"]
+
+
+def df_print(result):
+    df = pd.DataFrame(result)
+    sorted_df = df.sort_values(by="startedTime")
+
+    sorted_df = sorted_df[columns]
+
+    # 获取表头
+    header = ' | '.join(sorted_df.columns)
+
+    # 获取数据行
+    def format_row(row):
+        return ' | '.join([str(row[col]) for col in sorted_df.columns])
+
+    rows = sorted_df.apply(format_row, axis=1).tolist()
+
+    # 打印输出
+    print(header)
+    print('\n'.join(rows))
+
+
+def _analyse():
+    hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
+    result = yarn_client.get_apps(started_time_begin=hours_7_early)
+    result = [
+        {
+            **{k: v for k, v in item.items() if k != 'name'},
+            'table_name': item['name'].split(":")[1].strip()
+        }
+        for item in result
+        if "alg" in item['name'] and item['state'] == 'RUNNING'
+    ]
+    partition_info = {}
+    for table_name in list({item['table_name'] for item in result}):
+        resp = odps_client.get_all_partition_info(table_name=table_name)
+        partition_info[table_name] = {item['分区']: item for item in resp}
+
+    spark_task_list = []
+    for item in result:
+        dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
+        if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
+            item = {
+                **item,
+                **partition_info[item['table_name']][dt_hh]
+            }
+            spark_task_list.append(item)
+
+    df_print(spark_task_list)
+
+
+if __name__ == '__main__':
+    _analyse()

+ 5 - 61
feature_spark_monitor.py

@@ -2,13 +2,10 @@ import configparser
 from datetime import datetime, timedelta
 from typing import Dict, List
 
-import pandas as pd
-
-from client import YarnClient, ODPSClient
+from client import YarnClient
 from util import date_util, feishu_inform_util
 
-yarn_client = YarnClient.YarnClient("121.40.173.140")
-odps_client = ODPSClient.ODPSClient()
+yarn_client = YarnClient.YarnClient("192.168.203.16")
 
 table_list = [
     "alg_mid_feature_sharecf",
@@ -53,29 +50,6 @@ table_list = [
 
 filter_date = datetime(2024, 1, 1)
 
-columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
-           "数据大小", "创建时间", "更新时间"]
-
-
-def df_print(result):
-    df = pd.DataFrame(result)
-    sorted_df = df.sort_values(by="startedTime")
-
-    sorted_df = sorted_df[columns]
-
-    # 获取表头
-    header = ' | '.join(sorted_df.columns)
-
-    # 获取数据行
-    def format_row(row):
-        return ' | '.join([str(row[col]) for col in sorted_df.columns])
-
-    rows = sorted_df.apply(format_row, axis=1).tolist()
-
-    # 打印输出
-    print(header)
-    print('\n'.join(rows))
-
 
 def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
     filtered_data = [
@@ -84,8 +58,8 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
     ]
     if filtered_data:
         latest_started_time = max(
-            [date_util.str_cover_date(item['startedTime']) for item in filtered_data])
-        print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
+            [date_util.str_cover_date(item['finishedTime']) for item in filtered_data])
+        print(f"表: {table_name}, 最后一次同步完成时间为: {latest_started_time}")
         now = datetime.now()
         time_difference = now - latest_started_time
         if time_difference > timedelta(minutes=140):
@@ -154,35 +128,5 @@ def _main():
             send_error_info(table_name, latest_started_time, webhook_url)
 
 
-def _analyse():
-    hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
-    result = yarn_client.get_apps(started_time_begin=hours_7_early)
-    result = [
-        {
-            **{k: v for k, v in item.items() if k != 'name'},
-            'table_name': item['name'].split(":")[1].strip()
-        }
-        for item in result
-        if "alg" in item['name'] and item['state'] == 'RUNNING'
-    ]
-    partition_info = {}
-    for table_name in list({item['table_name'] for item in result}):
-        resp = odps_client.get_all_partition_info(table_name=table_name)
-        partition_info[table_name] = {item['分区']: item for item in resp}
-
-    spark_task_list = []
-    for item in result:
-        dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
-        if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
-            item = {
-                **item,
-                **partition_info[item['table_name']][dt_hh]
-            }
-            spark_task_list.append(item)
-
-    df_print(spark_task_list)
-
-
 if __name__ == '__main__':
-    # _main()
-    _analyse()
+    _main()