import argparse import configparser from datetime import datetime, timedelta from typing import Dict, List from client import YarnClient from util import date_util, feishu_inform_util yarn_client = YarnClient.YarnClient("192.168.203.16") table_list = [ "alg_mid_feature_sharecf", "alg_vid_feature_all_share", "alg_vid_feature_all_return", "alg_vid_feature_share2return", "alg_vid_feature_basic_info", "alg_recsys_feature_cf_i2i_new", "alg_vid_feature_all_exp_v2", "alg_vid_feature_exp2share_v2", "alg_vid_feature_feed_noflow_exp_v2", "alg_vid_feature_feed_noflow_root_share_v2", "alg_vid_feature_feed_noflow_root_return_v2", "alg_vid_feature_feed_flow_exp_v2", "alg_vid_feature_feed_flow_root_share_v2", "alg_vid_feature_feed_flow_root_return_v2", "alg_vid_feature_feed_province_exp_v2", "alg_vid_feature_feed_province_root_share_v2", "alg_vid_feature_feed_province_root_return_v2", "alg_cid_feature_basic_info", "alg_cid_feature_adver_action", "alg_cid_feature_cid_action", "alg_cid_feature_region_action", "alg_cid_feature_app_action", "alg_cid_feature_week_action", "alg_cid_feature_hour_action", "alg_cid_feature_brand_action", "alg_cid_feature_weChatVersion_action", "alg_cid_feature_vid_cf", "alg_cid_feature_vid_cf_rank", "alg_mid_feature_ad_action", "alg_mid_feature_play", "alg_mid_feature_share_and_return", "alg_mid_feature_play_tags", "alg_mid_feature_return_tags", "alg_mid_feature_share_tags", "alg_mid_feature_returncf", "alg_mid_feature_feed_exp_return_tags_v2", "alg_mid_feature_feed_exp_share_tags_v2", "alg_vid_global_feature_20250212", "alg_vid_recommend_exp_feature_20250212", "alg_vid_recommend_flowpool_exp_feature_20250212", "alg_vid_apptype_recommend_exp_feature_20250212", "alg_vid_province_recommend_exp_feature_20250212", "alg_vid_brand_recommend_exp_feature_20250212", "alg_vid_hotsencetype_recommend_exp_feature_20250212", "scene_type_vid_cf_feature_20250212", "vid_click_cf_feature_20250212", "alg_recsys_feature_cf_i2i_v2", "alg_channel_recommend_exp_feature_20250212", "alg_merge_cate1_recommend_exp_feature_20250212", "alg_merge_cate2_recommend_exp_feature_20250212", "alg_video_unionid_recommend_exp_feature_20250212", "mid_merge_cate1_feature_20250212", "mid_merge_cate2_feature_20250212", "mid_global_feature_20250212", ] filter_date = datetime(2024, 1, 1) current_hour = datetime.now().hour def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str): filtered_data = [ item for item in spark_task_list if table_name == item['name'] ] if not filtered_data: # 如果没有找到,表示近七个小时都没有同步过 return True, "最近没有四小时同步过数据" # 判断最近一次完成时间是否大于两个小时 filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True) last_finished_item = filtered_data[0] print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}") finished_time = date_util.str_cover_date(last_finished_item['finishedTime']) started_time = date_util.str_cover_date(last_finished_item['startedTime']) time_difference = datetime.now() - finished_time if time_difference > timedelta(minutes=120): return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}" # 判断持续时间是否超过一个小时 elapse = (finished_time - started_time) print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}") if elapse > timedelta(minutes=50): return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}" # 判断任务的完成时间是否是当前小时 finished_hour = finished_time.hour print(f"表: {table_name}, 最后一次完成是: {finished_hour} 小时, 当前小时为: {current_hour}") if finished_hour != current_hour: return True, f"当前小时的任务未完成,请关注!!!" return False, "" def send_error_info(table_name: str, warn_reason: str, webhook: str): mgs_text = f"\n- 大数据表名: {table_name}" \ f"\n- 告警原因: {warn_reason}" \ f"\n- 请关注" card_json = { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" }, { "tag": "markdown", "content": mgs_text, "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "特征同步延迟告警" }, "subtitle": { "tag": "plain_text", "content": "" }, "template": "red" } } } feishu_inform_util.send_card_msg_to_feishu(webhook, card_json) def print_config(config_path): print(f"配置文件路径: {config_path}") config = configparser.ConfigParser() config.read(config_path) for section in config.sections(): print(f"[{section}]") for key, value in config.items(section): print(f"{key} = {value}") def _main(): parser = argparse.ArgumentParser(description="feature_spark_task_monitor") parser.add_argument("-c", "--config", required=False, help="config file path", default="/home/monitor/model_script/config/config.ini") args = parser.parse_args() print_config(args.config) # 读取配置文件 config = configparser.ConfigParser() config.read(args.config) webhook_url = config.get("feishu", "model.webhook") # webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c' # 获取最近4小时的Spark任务 hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000 result = yarn_client.get_apps(finished_time_begin=hours_7_early) result = [ {**item, 'name': item['name'].split(":")[1].strip()} for item in result if item['finalStatus'] == "SUCCEEDED" ] if len(result) == 0: print("未获取已完成的任务,跳过") return for table_name in table_list: b, warn_reason = handle_table(table_name, result) if b: print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}") send_error_info(table_name, warn_reason, webhook_url) if __name__ == '__main__': _main()