Ver código fonte

feat:修改同步任务监控

zhaohaipeng 3 meses atrás
pai
commit
626706800f

+ 12 - 0
client/ApolloClient.py

@@ -0,0 +1,12 @@
+import requests
+
+
+class ApolloClient(object):
+    def __init__(self, base_url):
+        self.base_url = base_url
+
+    def get_config(self, app_id: str, cluster: str, namespace: str):
+        url = f"{self.base_url}/configs/{app_id}/{cluster}/{namespace}"
+        print(f"请求的Apollo地址为: {url}")
+        response = requests.get(url)
+        return response.json()

+ 4 - 1
config/config.ini

@@ -13,4 +13,7 @@ password = Wqsd@2019
 [feature.redis]
 host = r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
 port = 6379
-password = Wqsd@2019
+password = Wqsd@2019
+
+[apollo]
+meta = http://apolloconfig-internal.piaoquantv.com

+ 4 - 1
config/config_test.ini

@@ -13,4 +13,7 @@ password = Wqsd@2019
 [feature.redis]
 host = r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
 port = 6479
-password = Wqsd@2019
+password = Wqsd@2019
+
+[apollo]
+meta = http://apolloconfig-internal.piaoquantv.com

+ 24 - 8
script/feature_spark_monitor.py

@@ -1,10 +1,11 @@
 import argparse
 import configparser
+import json
 from datetime import datetime, timedelta
 from typing import Dict, List
 
-from client import YarnClient
-from util import date_util, feishu_inform_util
+from client import YarnClient, ApolloClient
+from util import date_util, json_util, feishu_inform_util
 
 yarn_client = YarnClient.YarnClient("192.168.203.16")
 
@@ -173,6 +174,9 @@ def _main():
     webhook_url = config.get("feishu", "model.webhook")
     # webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c'
 
+    apollo_meta_url = config.get("apollo", "meta")
+    apollo = ApolloClient.ApolloClient(apollo_meta_url)
+
     # 获取最近4小时的Spark任务
     hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000
     result = yarn_client.get_apps(finished_time_begin=hours_7_early)
@@ -184,10 +188,22 @@ def _main():
     ]
 
     if len(result) == 0:
-        print("未获取已完成的任务,跳过")
+        print("未获取已完成的任务,跳过")
         return
 
-    for table_name in table_list:
+    j = apollo.get_config("recommend-feature", "default", "application")
+    dts_config = json_util.remove_comments(j['configurations']['dts.config'])
+    table_config_list = json.loads(dts_config)
+    for table_config in table_config_list:
+        print("\n\n\n")
+        table_name = table_config['odps']['table']
+        print(f"当前处理的表为: {table_name}")
+        partitions = table_config['odps']['partition']
+        if len(partitions) != 2:
+            # 只处理有两个分区键的小时级表
+            print(f"表: {table_name} 的分区键只有一个,暂不处理")
+            continue
+
         b, warn_reason = handle_table(table_name, result)
         if b:
             print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
@@ -199,8 +215,8 @@ if __name__ == '__main__':
         _main()
     except Exception as e:
         print(f"监控告警发生异常: {e}")
-        content = f"\n- 特征同步异常告警" \
-                   f"\n- 告警原因: 监控脚本执行异常" \
-                   f"\n- 请关注"
+        txt = f"\n- 特征同步异常告警" \
+              f"\n- 告警原因: 监控脚本执行异常" \
+              f"\n- 请关注"
         url = "https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c"
-        invoke_feishu_card_mgs(url, content)
+        invoke_feishu_card_mgs(url, txt)

+ 15 - 0
util/json_util.py

@@ -0,0 +1,15 @@
+import re
+
+
+def remove_comments(json_str):
+    # 移除 // 和 /* */ 注释
+    pattern = r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"'
+    regex = re.compile(pattern, re.DOTALL | re.MULTILINE)
+
+    def replacer(match):
+        s = match.group(0)
+        if s.startswith('/'):
+            return ''
+        return s
+
+    return regex.sub(replacer, json_str)