Quellcode durchsuchen

feat:修改监控脚本

zhaohaipeng vor 9 Monaten
Ursprung
Commit
8943cacf34
3 geänderte Dateien mit 44 neuen und 38 gelöschten Zeilen
  1. 36 30
      feature_spark_monitor.py
  2. 1 1
      hadoop_monitor.py
  3. 7 7
      util/date_util.py

+ 36 - 30
feature_spark_monitor.py

@@ -1,3 +1,4 @@
+import argparse
 import configparser
 from datetime import datetime, timedelta
 from typing import Dict, List
@@ -56,25 +57,34 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
         item for item in spark_task_list
         if table_name == item['name']
     ]
-    if filtered_data:
-        latest_started_time = max(
-            [date_util.str_cover_date(item['finishedTime']) for item in filtered_data]
-        )
-        print(f"表: {table_name}, 最后一次同步完成时间为: {latest_started_time}")
-        time_difference = datetime.now() - latest_started_time
-        if time_difference > timedelta(minutes=140):
-            return True, latest_started_time
-        else:
-            return False, filtered_data
-    else:
+    if not filtered_data:
         # 如果没有找到,表示近七个小时都没有同步过
-        return True, "七小时之前"
+        return True, "最近没有四小时同步过数据"
 
+    # 判断最近一次完成时间是否大于两个小时
+    filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True)
+    last_finished_item = filtered_data[0]
+    print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}")
 
-def send_error_info(table_name: str, latest_started_time: str, webhook: str):
+    time_difference = datetime.now() - date_util.str_cover_date(last_finished_item['finishedTime'])
+    if time_difference > timedelta(minutes=120):
+        return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}"
+
+    # 判断持续时间是否超过一个小时
+    elapse = (date_util.str_cover_date(last_finished_item['finishedTime']) -
+              date_util.str_cover_date(last_finished_item['startedTime']))
+    print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
+
+    if elapse > timedelta(minutes=50):
+        return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}"
+
+    return False, ""
+
+
+def send_error_info(table_name: str, warn_reason: str, webhook: str):
     mgs_text = f"\n- 大数据表名: {table_name}" \
-               f"\n- 最后一次同步时间: {latest_started_time}" \
-               f"\n- 超过两个小时没有同步,请关注"
+               f"\n- 告警原因: {warn_reason}" \
+               f"\n- 请关注"
     card_json = {
         "config": {},
         "i18n_elements": {
@@ -111,13 +121,18 @@ def send_error_info(table_name: str, latest_started_time: str, webhook: str):
 
 
 def _main():
+    parser = argparse.ArgumentParser(description="feature_spark_task_monitor")
+    parser.add_argument("-c", "--config", required=False, help="config file path",
+                        default="/home/monitor/model_monitor/config/config.ini")
+    args = parser.parse_args()
+
     # 读取配置文件
     config = configparser.ConfigParser()
-    config.read("/home/monitor/model_monitor/config/config.ini")
+    config.read(args.config)
     webhook_url = config.get("feishu", "model.webhook")
 
-    # 获取最近小时的Spark任务
-    hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
+    # 获取最近4小时的Spark任务
+    hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000
     result = yarn_client.get_apps(finished_time_begin=hours_7_early)
     result = [
         {**item, 'name': item['name'].split(":")[1].strip()}
@@ -130,20 +145,11 @@ def _main():
         print("未获取已完成的任务,跳过")
         return
 
-    early_started_time = min(
-        [date_util.str_cover_date(item['startedTime']) for item in result]
-    )
-
-    if (datetime.now() - early_started_time) < timedelta(hours=3):
-        print("任务最开始时间小于六个小时,疑似清理过任务。跳过")
-        return
-
     for table_name in table_list:
-        # 判断最后一次同步是否为两个小时以内
-        b, latest_started_time = handle_table(table_name, result)
+        b, warn_reason = handle_table(table_name, result)
         if b:
-            print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
-            send_error_info(table_name, latest_started_time, webhook_url)
+            print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
+            send_error_info(table_name, warn_reason, webhook_url)
 
 
 if __name__ == '__main__':

+ 1 - 1
hadoop_monitor.py

@@ -16,5 +16,5 @@ if __name__ == '__main__':
             f"节点: {node}, "
             f"容量: {convert_util.byte_convert(capacity)}, "
             f"已使用: {convert_util.byte_convert(used)}, "
-            f"使用率: {used / capacity * 100}"
+            f"使用率: {used / capacity}"
         )

+ 7 - 7
util/date_util.py

@@ -1,12 +1,12 @@
 from datetime import datetime
 
 
-def ts_cover_str(ts=0) -> str:
-    return datetime.fromtimestamp(ts / 1000).strftime('%Y-%m-%d %H:%M:%S')
+def ts_cover_str(ts=0, _format='%Y-%m-%d %H:%M:%S') -> str:
+    return datetime.fromtimestamp(ts / 1000).strftime(_format)
 
 
-def str_cover_date(s: str):
-    return datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
+def str_cover_date(s: str, _format='%Y-%m-%d %H:%M:%S') -> datetime:
+    return datetime.strptime(s, _format)
 
 
 def timestamp_convert(ts: int) -> str:
@@ -22,12 +22,12 @@ def seconds_convert(seconds: int) -> str:
     if hours > 0:
         s = f"{s} {hours}小时"
     if minutes > 0:
-        s = f"{s} {minutes}分"
+        s = f"{s} {minutes}分"
     if seconds > 0:
         s = f"{s} {seconds}秒"
     return s
 
 
-def date_convert_dt_hh(date_str: str) -> str:
-    date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
+def date_convert_dt_hh(date_str: str, _format='%Y-%m-%d %H:%M:%S') -> str:
+    date_obj = datetime.strptime(date_str, _format)
     return date_obj.strftime('%Y%m%d%H')