|
@@ -70,14 +70,14 @@ filter_date = datetime(2024, 1, 1)
|
|
|
current_hour = datetime.now().hour
|
|
|
|
|
|
|
|
|
-def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
|
|
|
+def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str, str):
|
|
|
filtered_data = [
|
|
|
item for item in spark_task_list
|
|
|
if table_name == item['name']
|
|
|
]
|
|
|
if not filtered_data:
|
|
|
# 如果没有找到,表示近七个小时都没有同步过
|
|
|
- return True, "最近没有四小时同步过数据"
|
|
|
+ return True, "最近没有四小时同步过数据", "error"
|
|
|
|
|
|
# 判断最近一次完成时间是否大于两个小时
|
|
|
filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True)
|
|
@@ -89,26 +89,26 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
|
|
|
|
|
|
time_difference = datetime.now() - finished_time
|
|
|
if time_difference > timedelta(minutes=120):
|
|
|
- return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}"
|
|
|
+ return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}", "error"
|
|
|
|
|
|
# 判断持续时间是否超过一个小时
|
|
|
elapse = (finished_time - started_time)
|
|
|
print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
|
|
|
|
|
|
if elapse > timedelta(minutes=50):
|
|
|
- return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}"
|
|
|
+ return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}", "warn"
|
|
|
|
|
|
# 判断任务的完成时间是否是当前小时
|
|
|
finished_hour = finished_time.hour
|
|
|
|
|
|
print(f"表: {table_name}, 最后一次完成是: {finished_hour} 小时, 当前小时为: {current_hour}")
|
|
|
if finished_hour != current_hour:
|
|
|
- return True, f"当前小时的任务未完成,请关注!!!"
|
|
|
+ return True, f"当前小时的任务未完成", "warn"
|
|
|
|
|
|
return False, ""
|
|
|
|
|
|
|
|
|
-def invoke_feishu_card_mgs(webhook: str, content: str):
|
|
|
+def invoke_feishu_card_mgs(webhook: str, content: str, alarm_level="error"):
|
|
|
card_json = {
|
|
|
"config": {},
|
|
|
"i18n_elements": {
|
|
@@ -137,18 +137,18 @@ def invoke_feishu_card_mgs(webhook: str, content: str):
|
|
|
"tag": "plain_text",
|
|
|
"content": ""
|
|
|
},
|
|
|
- "template": "red"
|
|
|
+ "template": "red" if alarm_level == "error" else "yellow"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
|
|
|
|
|
|
|
|
|
-def send_error_info(table_name: str, warn_reason: str, webhook: str):
|
|
|
+def send_error_info(table_name: str, warn_reason: str, alarm_level: str, webhook: str):
|
|
|
mgs_text = f"\n- 大数据表名: {table_name}" \
|
|
|
f"\n- 告警原因: {warn_reason}" \
|
|
|
f"\n- 请关注"
|
|
|
- invoke_feishu_card_mgs(webhook, mgs_text)
|
|
|
+ invoke_feishu_card_mgs(webhook, mgs_text, alarm_level)
|
|
|
|
|
|
|
|
|
def print_config(config_path):
|
|
@@ -204,10 +204,10 @@ def _main():
|
|
|
print(f"表: {table_name} 的分区键只有一个,暂不处理")
|
|
|
continue
|
|
|
|
|
|
- b, warn_reason = handle_table(table_name, result)
|
|
|
+ b, warn_reason, alarm_level = handle_table(table_name, result)
|
|
|
if b:
|
|
|
print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
|
|
|
- send_error_info(table_name, warn_reason, webhook_url)
|
|
|
+ send_error_info(table_name, warn_reason, alarm_level, webhook_url)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
@@ -219,4 +219,4 @@ if __name__ == '__main__':
|
|
|
f"\n- 告警原因: 监控脚本执行异常" \
|
|
|
f"\n- 请关注"
|
|
|
url = "https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c"
|
|
|
- invoke_feishu_card_mgs(url, txt)
|
|
|
+ invoke_feishu_card_mgs(url, txt, "error")
|