import configparser from datetime import datetime, timedelta from typing import Dict, List import pandas as pd from client import YarnClient from util import dateutil, feishu_inform_util yarn_client = YarnClient.YarnClient("192.168.203.16") table_list = [ "alg_mid_feature_sharecf", "alg_vid_feature_all_share", "alg_vid_feature_all_return", "alg_vid_feature_share2return", "alg_vid_feature_basic_info", "alg_recsys_feature_cf_i2i_new", "alg_vid_feature_all_exp_v2", "alg_vid_feature_exp2share_v2", "alg_vid_feature_feed_noflow_exp_v2", "alg_vid_feature_feed_noflow_root_share_v2", "alg_vid_feature_feed_noflow_root_return_v2", "alg_vid_feature_feed_flow_exp_v2", "alg_vid_feature_feed_flow_root_share_v2", "alg_vid_feature_feed_flow_root_return_v2", "alg_vid_feature_feed_province_exp_v2", "alg_vid_feature_feed_province_root_share_v2", "alg_vid_feature_feed_province_root_return_v2", "alg_recsys_feature_cf_i2i_new_v2", "alg_cid_feature_basic_info", "alg_cid_feature_adver_action", "alg_cid_feature_cid_action", "alg_cid_feature_region_action", "alg_cid_feature_app_action", "alg_cid_feature_week_action", "alg_cid_feature_hour_action", "alg_cid_feature_brand_action", "alg_cid_feature_weChatVersion_action", "alg_cid_feature_vid_cf", "alg_cid_feature_vid_cf_rank", "alg_mid_feature_ad_action", "alg_mid_feature_play", "alg_mid_feature_share_and_return", "alg_mid_feature_play_tags", "alg_mid_feature_return_tags", "alg_mid_feature_share_tags", "alg_mid_feature_returncf", "alg_mid_feature_feed_exp_return_tags_v2", "alg_mid_feature_feed_exp_share_tags_v2" ] filter_date = datetime(2024, 1, 1) def df_print(result): df = pd.DataFrame(result) # 过滤出 name 中包含 'cid' 的行 filtered_df = df[df['name'].str.contains('cid')].copy() # 使用 .copy() 生成副本以避免警告 filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False) sorted_df = filtered_df.sort_values(by="startedTime") # 获取表头 header = ' | '.join(sorted_df.columns) def format_row(row): return ' | '.join([str(row[col]) for col in sorted_df.columns]) # 获取数据行 rows = filtered_df.apply(format_row, axis=1).tolist() # 打印输出 print(header) print('-' * len(header)) print('\n'.join(rows)) def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str): filtered_data = [ item for item in spark_task_list if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date ] if filtered_data: latest_started_time = max( [dateutil.str_cover_date(item['startedTime']) for item in filtered_data]) print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}") now = datetime.now() time_difference = now - latest_started_time if time_difference > timedelta(minutes=140): return True, latest_started_time return False, filtered_data # 如果没有找到,表示近七个小时都没有同步过 return True, "七小时之前" def send_error_info(table_name: str, latest_started_time: str, webhook: str): mgs_text = f"\n- 大数据表名: {table_name}" \ f"\n- 最后一次同步时间: {latest_started_time}" \ f"\n- 超过两个小时没有同步,请关注" card_json = { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" }, { "tag": "markdown", "content": mgs_text, "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "【测试】特征同步延迟告警" }, "subtitle": { "tag": "plain_text", "content": "" }, "template": "red" } } } feishu_inform_util.send_card_msg_to_feishu(webhook, card_json) def _main(): # 读取配置文件 config = configparser.ConfigParser() config.read("/home/monitor/model_monitor/config/config.ini") webhook_url = config.get("feishu", "model.webhook") # 获取最近七小时的Spark任务 hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000 result = yarn_client.get_apps(started_time_begin=hours_7_early) for table_name in table_list: # 判断最后一次同步是否为两个小时以内 b, latest_started_time = handle_table(table_name, result) if b: print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟") send_error_info(table_name, latest_started_time, webhook_url) if __name__ == '__main__': _main()