瀏覽代碼

feat:添加分析任务

zhaohaipeng 10 月之前
父節點
當前提交
e622b0301c
共有 1 個文件被更改,包括 10 次插入5 次删除
  1. 10 5
      feature_spark_monitor.py

+ 10 - 5
feature_spark_monitor.py

@@ -54,7 +54,7 @@ filter_date = datetime(2024, 1, 1)
 def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
     filtered_data = [
         item for item in spark_task_list
-        if table_name in item['name'] and date_util.str_cover_date(item['finishedTime']) > filter_date
+        if table_name == item['name']
     ]
     print(f"{table_name} ==> {filtered_data}")
     if filtered_data:
@@ -113,14 +113,19 @@ def send_error_info(table_name: str, latest_started_time: str, webhook: str):
 
 def _main():
     # 读取配置文件
-    config = configparser.ConfigParser()
-    config.read("/home/monitor/model_monitor/config/config.ini")
-    webhook_url = config.get("feishu", "model.webhook")
+    # config = configparser.ConfigParser()
+    # config.read("/home/monitor/model_monitor/config/config.ini")
+    # webhook_url = config.get("feishu", "model.webhook")
 
     # 获取最近七小时的Spark任务
     hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
     result = yarn_client.get_apps(finished_time_begin=hours_7_early)
-    result = filter(lambda item: item['finalStatus'] == 'SUCCEEDED', result)
+    result = [
+        {**item, 'name': item['name'].split(":")[1].strip()}
+        for item in result
+        if item['finalStatus'] == "SUCCEEDED"
+
+    ]
 
     for table_name in table_list:
         # 判断最后一次同步是否为两个小时以内