Explorar el Código

feat:添加特征Spark任务告警

zhaohaipeng hace 11 meses
padre
commit
03469e91f7
Se han modificado 8 ficheros con 260 adiciones y 0 borrados
  1. 1 0
      .gitignore
  2. 44 0
      client/YarnClient.py
  3. 0 0
      client/__init__.py
  4. 2 0
      config/config.ini
  5. 166 0
      feature_spark_monitor.py
  6. 0 0
      util/__init__.py
  7. 9 0
      util/dateutil.py
  8. 38 0
      util/feishu_inform_util.py

+ 1 - 0
.gitignore

@@ -58,3 +58,4 @@ docs/_build/
 # PyBuilder
 target/
 
+.idea

+ 44 - 0
client/YarnClient.py

@@ -0,0 +1,44 @@
+import json
+
+import requests
+
+from util import dateutil
+
+
+class YarnClient(object):
+    def __init__(self, cluster_ip: str):
+        self.cluster_ip = cluster_ip
+
+    def get_apps(self, queue="", finished_time_begin=0, finished_time_end=0, started_time_begin=0, started_time_end=0,
+                 limit=10) -> list[dict]:
+        query_str = f"limit=&{limit}"
+        if queue != "":
+            query_str = f"{query_str}&queue={queue}"
+        if finished_time_end > 0:
+            query_str = f"{query_str}&finishedTimeBegin={finished_time_begin}"
+        if finished_time_end > 0:
+            query_str = f"{query_str}&finishedTimeEnd={finished_time_end}"
+        if started_time_begin > 0:
+            query_str = f"{query_str}&startedTimeBegin={started_time_begin}"
+        if started_time_end > 0:
+            query_str = f"{query_str}&startedTimeEnd={started_time_end}"
+
+        url = f"http://{self.cluster_ip}:8088/ws/v1/cluster/apps?{query_str}"
+        response = requests.get(url)
+        res = json.loads(response.text)
+        result = []
+        for app in res['apps']['app']:
+            item = {
+                "id": app['id'],
+                "name": app['name'],
+                "finalStatus": app['finalStatus'],
+                "finishedTime": dateutil.ts_cover_str(app['finishedTime']),
+                "startedTime": dateutil.ts_cover_str(app['startedTime']),
+                "launchTime": dateutil.ts_cover_str(app['launchTime']),
+                "queue": app['queue'],
+                "state": app['state'],
+                "elapsedTime": app['elapsedTime'],
+            }
+            result.append(item)
+
+        return result

+ 0 - 0
client/__init__.py


+ 2 - 0
config/config.ini

@@ -0,0 +1,2 @@
+[feishu]
+model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4

+ 166 - 0
feature_spark_monitor.py

@@ -0,0 +1,166 @@
+import configparser
+from datetime import datetime, timedelta
+
+from client import YarnClient
+from util import dateutil, feishu_inform_util
+import pandas as pd
+
+yarn_client = YarnClient.YarnClient("121.40.173.140")
+
+table_list = [
+    "alg_cid_feature_basic_info",
+    "alg_cid_feature_adver_action",
+    "alg_cid_feature_cid_action",
+    "alg_cid_feature_region_action",
+    "alg_cid_feature_app_action",
+    "alg_cid_feature_week_action",
+    "alg_cid_feature_hour_action",
+    "alg_cid_feature_brand_action",
+    "alg_cid_feature_weChatVersion_action",
+    "alg_cid_feature_vid_cf",
+    "alg_cid_feature_vid_cf_rank",
+    "alg_mid_feature_ad_action",
+    "alg_vid_feature_all_exp",
+    "alg_vid_feature_all_share",
+    "alg_vid_feature_all_return",
+    "alg_vid_feature_exp2share",
+    "alg_vid_feature_share2return",
+    "alg_vid_feature_feed_noflow_exp",
+    "alg_vid_feature_feed_noflow_root_share",
+    "alg_vid_feature_feed_noflow_root_return",
+    "alg_vid_feature_feed_flow_exp",
+    "alg_vid_feature_feed_flow_root_share",
+    "alg_vid_feature_feed_flow_root_return",
+    "alg_vid_feature_feed_province_exp",
+    "alg_vid_feature_feed_province_root_share",
+    "alg_vid_feature_feed_province_root_return",
+    "alg_vid_feature_basic_info",
+    "alg_recsys_feature_cf_i2i_new",
+    "alg_vid_feature_all_exp_v2",
+    "alg_vid_feature_exp2share_v2",
+    "alg_vid_feature_feed_noflow_exp_v2",
+    "alg_vid_feature_feed_noflow_root_share_v2",
+    "alg_vid_feature_feed_noflow_root_return_v2",
+    "alg_vid_feature_feed_flow_exp_v2",
+    "alg_vid_feature_feed_flow_root_share_v2",
+    "alg_vid_feature_feed_flow_root_return_v2",
+    "alg_vid_feature_feed_province_exp_v2",
+    "alg_vid_feature_feed_province_root_share_v2",
+    "alg_vid_feature_feed_province_root_return_v2",
+    "alg_recsys_feature_cf_i2i_new_v2",
+    "alg_mid_feature_play",
+    "alg_mid_feature_share_and_return",
+    "alg_mid_feature_play_tags",
+    "alg_mid_feature_return_tags",
+    "alg_mid_feature_share_tags",
+    "alg_mid_feature_feed_exp_share_tags",
+    "alg_mid_feature_feed_exp_return_tags",
+    "alg_mid_feature_sharecf",
+    "alg_mid_feature_returncf",
+    "alg_mid_feature_feed_exp_return_tags_v2",
+    "alg_mid_feature_feed_exp_share_tags_v2"
+]
+
+filter_date = datetime(2024, 1, 1)
+
+
+def df_print(result):
+    df = pd.DataFrame(result)
+    # 过滤出 name 中包含 'cid' 的行
+    filtered_df = df[df['name'].str.contains('cid')].copy()  # 使用 .copy() 生成副本以避免警告
+    filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False)
+
+    sorted_df = filtered_df.sort_values(by="startedTime")
+
+    # 获取表头
+    header = ' | '.join(sorted_df.columns)
+
+    def format_row(row):
+        return ' | '.join([str(row[col]) for col in sorted_df.columns])
+
+    # 获取数据行
+    rows = filtered_df.apply(format_row, axis=1).tolist()
+
+    # 打印输出
+    print(header)
+    print('-' * len(header))
+    print('\n'.join(rows))
+
+
+def handle_table(table_name: str, spark_task_list: list[dict]) -> (bool, str):
+    filtered_data = [
+        item for item in spark_task_list
+        if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date
+    ]
+    if filtered_data:
+        latest_started_time = max(
+            [dateutil.str_cover_date(item['startedTime']) for item in filtered_data])
+        print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
+        now = datetime.now()
+        time_difference = now - latest_started_time
+        if time_difference > timedelta(minutes=140):
+            return True, latest_started_time
+
+        return False, filtered_data
+
+    return False, ""
+
+
+def send_error_info(table_name: str, latest_started_time: str):
+    mgs_text = f"\n- 大数据表名: {table_name}" \
+               f"\n- 最后一次同步时间: {latest_started_time}" \
+               f"\n- 超过两个小时没有同步,请关注"
+    card_json = {
+        "config": {},
+        "i18n_elements": {
+            "zh_cn": [
+                {
+                    "tag": "markdown",
+                    "content": "",
+                    "text_align": "left",
+                    "text_size": "normal"
+                },
+                {
+                    "tag": "markdown",
+                    "content": mgs_text,
+                    "text_align": "left",
+                    "text_size": "normal"
+                }
+            ]
+        },
+        "i18n_header": {
+            "zh_cn": {
+                "title": {
+                    "tag": "plain_text",
+                    "content": "【测试】大数据表同步延迟告警"
+                },
+                "subtitle": {
+                    "tag": "plain_text",
+                    "content": ""
+                },
+                "template": "red"
+            }
+        }
+    }
+
+
+def _main():
+    # 读取配置文件
+    config = configparser.ConfigParser()
+    config.read("config/config.ini")
+    print(config.get("feishu", 'model.webhook'))
+
+    # 获取最近七小时的Spark任务
+    hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
+    result = yarn_client.get_apps(started_time_begin=hours_7_early)
+
+    for table_name in table_list:
+        # 判断最后一次同步是否为两个小时以内
+        b, latest_started_time = handle_table(table_name, result)
+        if b:
+            print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
+            send_error_info(table_name, latest_started_time)
+
+
+if __name__ == '__main__':
+    _main()

+ 0 - 0
util/__init__.py


+ 9 - 0
util/dateutil.py

@@ -0,0 +1,9 @@
+import datetime
+
+
+def ts_cover_str(ts=0) -> str:
+    return datetime.datetime.fromtimestamp(ts / 1000).strftime('%Y-%m-%d %H:%M:%S')
+
+
+def str_cover_date(s: str):
+    return datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S')

+ 38 - 0
util/feishu_inform_util.py

@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+import argparse
+import json
+
+import pytz
+import requests
+
+from datetime import datetime
+
+
+def send_card_msg_to_feishu(webhook, card_json):
+    """发送消息到飞书"""
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "interactive",
+        "card": card_json
+    }
+    print(f"推送飞书消息内容: {json.dumps(payload_message)}")
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)
+
+
+def timestamp_format(timestamp: str) -> str:
+    try:
+        return (datetime.utcfromtimestamp(int(timestamp))
+                .replace(tzinfo=pytz.UTC)
+                .astimezone(pytz.timezone('Asia/Shanghai'))
+                .strftime('%Y-%m-%d %H:%M:%S')
+                )
+    except ValueError as e:
+        return timestamp
+
+
+def seconds_convert(seconds):
+    hours = seconds // 3600
+    minutes = (seconds % 3600) // 60
+    seconds = seconds % 60
+    return f"{hours}小时 {minutes}分钟 {seconds}秒"