|
@@ -58,7 +58,8 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
|
|
|
]
|
|
|
if filtered_data:
|
|
|
latest_started_time = max(
|
|
|
- [date_util.str_cover_date(item['finishedTime']) for item in filtered_data])
|
|
|
+ [date_util.str_cover_date(item['finishedTime']) for item in filtered_data]
|
|
|
+ )
|
|
|
print(f"表: {table_name}, 最后一次同步完成时间为: {latest_started_time}")
|
|
|
now = datetime.now()
|
|
|
time_difference = now - latest_started_time
|
|
@@ -112,9 +113,9 @@ def send_error_info(table_name: str, latest_started_time: str, webhook: str):
|
|
|
|
|
|
def _main():
|
|
|
# 读取配置文件
|
|
|
- config = configparser.ConfigParser()
|
|
|
- config.read("/home/monitor/model_monitor/config/config.ini")
|
|
|
- webhook_url = config.get("feishu", "model.webhook")
|
|
|
+ # config = configparser.ConfigParser()
|
|
|
+ # config.read("/home/monitor/model_monitor/config/config.ini")
|
|
|
+ # webhook_url = config.get("feishu", "model.webhook")
|
|
|
|
|
|
# 获取最近七小时的Spark任务
|
|
|
hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
|
|
@@ -126,8 +127,12 @@ def _main():
|
|
|
|
|
|
]
|
|
|
|
|
|
- if len(result) == 0:
|
|
|
- print("未获取到数据,跳过本次告警")
|
|
|
+ early_started_time = min(
|
|
|
+ [date_util.str_cover_date(item['startedTime']) for item in result]
|
|
|
+ )
|
|
|
+
|
|
|
+ if (early_started_time - datetime.now()) < timedelta(hours=7):
|
|
|
+ print("任务最开始时间小于六个小时,疑似清理过任务。跳过")
|
|
|
return
|
|
|
|
|
|
for table_name in table_list:
|
|
@@ -135,7 +140,7 @@ def _main():
|
|
|
b, latest_started_time = handle_table(table_name, result)
|
|
|
if b:
|
|
|
print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
|
|
|
- send_error_info(table_name, latest_started_time, webhook_url)
|
|
|
+ # send_error_info(table_name, latest_started_time, webhook_url)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|