import configparser from datetime import datetime, timedelta from typing import Dict, List from client import YarnClient from util import date_util, feishu_inform_util yarn_client = YarnClient.YarnClient("192.168.203.16") table_list = [ "alg_mid_feature_sharecf", "alg_vid_feature_all_share", "alg_vid_feature_all_return", "alg_vid_feature_share2return", "alg_vid_feature_basic_info", "alg_recsys_feature_cf_i2i_new", "alg_vid_feature_all_exp_v2", "alg_vid_feature_exp2share_v2", "alg_vid_feature_feed_noflow_exp_v2", "alg_vid_feature_feed_noflow_root_share_v2", "alg_vid_feature_feed_noflow_root_return_v2", "alg_vid_feature_feed_flow_exp_v2", "alg_vid_feature_feed_flow_root_share_v2", "alg_vid_feature_feed_flow_root_return_v2", "alg_vid_feature_feed_province_exp_v2", "alg_vid_feature_feed_province_root_share_v2", "alg_vid_feature_feed_province_root_return_v2", "alg_recsys_feature_cf_i2i_new_v2", "alg_cid_feature_basic_info", "alg_cid_feature_adver_action", "alg_cid_feature_cid_action", "alg_cid_feature_region_action", "alg_cid_feature_app_action", "alg_cid_feature_week_action", "alg_cid_feature_hour_action", "alg_cid_feature_brand_action", "alg_cid_feature_weChatVersion_action", "alg_cid_feature_vid_cf", "alg_cid_feature_vid_cf_rank", "alg_mid_feature_ad_action", "alg_mid_feature_play", "alg_mid_feature_share_and_return", "alg_mid_feature_play_tags", "alg_mid_feature_return_tags", "alg_mid_feature_share_tags", "alg_mid_feature_returncf", "alg_mid_feature_feed_exp_return_tags_v2", "alg_mid_feature_feed_exp_share_tags_v2" ] filter_date = datetime(2024, 1, 1) def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str): filtered_data = [ item for item in spark_task_list if table_name in item['name'] and date_util.str_cover_date(item['finishedTime']) > filter_date ] if filtered_data: latest_started_time = max( [date_util.str_cover_date(item['finishedTime']) for item in filtered_data]) print(f"表: {table_name}, 最后一次同步完成时间为: {latest_started_time}") now = datetime.now() time_difference = now - latest_started_time if time_difference > timedelta(minutes=140): return True, latest_started_time else: return False, filtered_data else: # 如果没有找到,表示近七个小时都没有同步过 return True, "七小时之前" def send_error_info(table_name: str, latest_started_time: str, webhook: str): mgs_text = f"\n- 大数据表名: {table_name}" \ f"\n- 最后一次同步时间: {latest_started_time}" \ f"\n- 超过两个小时没有同步,请关注" card_json = { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" }, { "tag": "markdown", "content": mgs_text, "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "特征同步延迟告警" }, "subtitle": { "tag": "plain_text", "content": "" }, "template": "red" } } } feishu_inform_util.send_card_msg_to_feishu(webhook, card_json) def _main(): # 读取配置文件 config = configparser.ConfigParser() config.read("/home/monitor/model_monitor/config/config.ini") webhook_url = config.get("feishu", "model.webhook") # 获取最近七小时的Spark任务 hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000 result = yarn_client.get_apps(finished_time_begin=hours_7_early) result = filter(lambda item: item['finalStatus'] == 'SUCCEEDED', result) for table_name in table_list: # 判断最后一次同步是否为两个小时以内 b, latest_started_time = handle_table(table_name, result) if b: print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟") # send_error_info(table_name, latest_started_time, webhook_url) if __name__ == '__main__': _main()