|
@@ -2,9 +2,10 @@ import configparser
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Dict, List
|
|
|
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
from client import YarnClient
|
|
|
from util import dateutil, feishu_inform_util
|
|
|
-import pandas as pd
|
|
|
|
|
|
yarn_client = YarnClient.YarnClient("192.168.203.16")
|
|
|
|
|
@@ -107,7 +108,7 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
|
|
|
return False, ""
|
|
|
|
|
|
|
|
|
-def send_error_info(table_name: str, latest_started_time: str):
|
|
|
+def send_error_info(table_name: str, latest_started_time: str, webhook: str):
|
|
|
mgs_text = f"\n- 大数据表名: {table_name}" \
|
|
|
f"\n- 最后一次同步时间: {latest_started_time}" \
|
|
|
f"\n- 超过两个小时没有同步,请关注"
|
|
@@ -143,12 +144,14 @@ def send_error_info(table_name: str, latest_started_time: str):
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
|
|
|
|
|
|
|
|
|
def _main():
|
|
|
# 读取配置文件
|
|
|
config = configparser.ConfigParser()
|
|
|
config.read("config/config.ini")
|
|
|
+ webhook_url = config.get("feishu", "model.webhook")
|
|
|
|
|
|
# 获取最近七小时的Spark任务
|
|
|
hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
|
|
@@ -159,7 +162,7 @@ def _main():
|
|
|
b, latest_started_time = handle_table(table_name, result)
|
|
|
if b:
|
|
|
print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
|
|
|
- send_error_info(table_name, latest_started_time)
|
|
|
+ send_error_info(table_name, latest_started_time, webhook_url)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|