123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import argparse
- import configparser
- from datetime import datetime, timedelta
- from typing import Dict, List
- from client import YarnClient
- from util import date_util, feishu_inform_util
- yarn_client = YarnClient.YarnClient("192.168.203.16")
- table_list = [
- "alg_mid_feature_sharecf",
- "alg_vid_feature_all_share",
- "alg_vid_feature_all_return",
- "alg_vid_feature_share2return",
- "alg_vid_feature_basic_info",
- "alg_recsys_feature_cf_i2i_new",
- "alg_vid_feature_all_exp_v2",
- "alg_vid_feature_exp2share_v2",
- "alg_vid_feature_feed_noflow_exp_v2",
- "alg_vid_feature_feed_noflow_root_share_v2",
- "alg_vid_feature_feed_noflow_root_return_v2",
- "alg_vid_feature_feed_flow_exp_v2",
- "alg_vid_feature_feed_flow_root_share_v2",
- "alg_vid_feature_feed_flow_root_return_v2",
- "alg_vid_feature_feed_province_exp_v2",
- "alg_vid_feature_feed_province_root_share_v2",
- "alg_vid_feature_feed_province_root_return_v2",
- "alg_cid_feature_basic_info",
- "alg_cid_feature_adver_action",
- "alg_cid_feature_cid_action",
- "alg_cid_feature_region_action",
- "alg_cid_feature_app_action",
- "alg_cid_feature_week_action",
- "alg_cid_feature_hour_action",
- "alg_cid_feature_brand_action",
- "alg_cid_feature_weChatVersion_action",
- "alg_cid_feature_vid_cf",
- "alg_cid_feature_vid_cf_rank",
- "alg_mid_feature_ad_action",
- "alg_mid_feature_play",
- "alg_mid_feature_share_and_return",
- "alg_mid_feature_play_tags",
- "alg_mid_feature_return_tags",
- "alg_mid_feature_share_tags",
- "alg_mid_feature_returncf",
- "alg_mid_feature_feed_exp_return_tags_v2",
- "alg_mid_feature_feed_exp_share_tags_v2",
- "alg_vid_global_feature_20250212",
- "alg_vid_recommend_exp_feature_20250212",
- "alg_vid_recommend_flowpool_exp_feature_20250212",
- "alg_vid_apptype_recommend_exp_feature_20250212",
- "alg_vid_province_recommend_exp_feature_20250212",
- "alg_vid_brand_recommend_exp_feature_20250212",
- "alg_vid_hotsencetype_recommend_exp_feature_20250212",
- "scene_type_vid_cf_feature_20250212",
- "vid_click_cf_feature_20250212",
- "alg_recsys_feature_cf_i2i_v2",
- "alg_channel_recommend_exp_feature_20250212",
- "alg_merge_cate1_recommend_exp_feature_20250212",
- "alg_merge_cate2_recommend_exp_feature_20250212",
- "alg_video_unionid_recommend_exp_feature_20250212",
- "mid_merge_cate1_feature_20250212",
- "mid_merge_cate2_feature_20250212",
- "mid_global_feature_20250212",
- "mid_u2u_friend_index_feature_20250212"
- ]
- filter_date = datetime(2024, 1, 1)
- def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
- filtered_data = [
- item for item in spark_task_list
- if table_name == item['name']
- ]
- if not filtered_data:
- # 如果没有找到,表示近七个小时都没有同步过
- return True, "最近没有四小时同步过数据"
- # 判断最近一次完成时间是否大于两个小时
- filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True)
- last_finished_item = filtered_data[0]
- print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}")
- time_difference = datetime.now() - date_util.str_cover_date(last_finished_item['finishedTime'])
- if time_difference > timedelta(minutes=120):
- return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}"
- # 判断持续时间是否超过一个小时
- elapse = (date_util.str_cover_date(last_finished_item['finishedTime']) -
- date_util.str_cover_date(last_finished_item['startedTime']))
- print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
- if elapse > timedelta(minutes=50):
- return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}"
- return False, ""
- def send_error_info(table_name: str, warn_reason: str, webhook: str):
- mgs_text = f"\n- 大数据表名: {table_name}" \
- f"\n- 告警原因: {warn_reason}" \
- f"\n- 请关注"
- card_json = {
- "config": {},
- "i18n_elements": {
- "zh_cn": [
- {
- "tag": "markdown",
- "content": "",
- "text_align": "left",
- "text_size": "normal"
- },
- {
- "tag": "markdown",
- "content": mgs_text,
- "text_align": "left",
- "text_size": "normal"
- }
- ]
- },
- "i18n_header": {
- "zh_cn": {
- "title": {
- "tag": "plain_text",
- "content": "特征同步延迟告警"
- },
- "subtitle": {
- "tag": "plain_text",
- "content": ""
- },
- "template": "red"
- }
- }
- }
- feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
- def print_config(config_path):
- print(f"配置文件路径: {config_path}")
- config = configparser.ConfigParser()
- config.read(config_path)
- for section in config.sections():
- print(f"[{section}]")
- for key, value in config.items(section):
- print(f"{key} = {value}")
- def _main():
- parser = argparse.ArgumentParser(description="feature_spark_task_monitor")
- parser.add_argument("-c", "--config", required=False, help="config file path",
- default="/home/monitor/model_script/config/config.ini")
- args = parser.parse_args()
- print_config(args.config)
- # 读取配置文件
- # config = configparser.ConfigParser()
- # config.read(args.config)
- # webhook_url = config.get("feishu", "model.webhook")
- webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c'
- # 获取最近4小时的Spark任务
- hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000
- result = yarn_client.get_apps(finished_time_begin=hours_7_early)
- result = [
- {**item, 'name': item['name'].split(":")[1].strip()}
- for item in result
- if item['finalStatus'] == "SUCCEEDED"
- ]
- if len(result) == 0:
- print("未获取已完成的任务,跳过")
- return
- for table_name in table_list:
- b, warn_reason = handle_table(table_name, result)
- if b:
- print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
- send_error_info(table_name, warn_reason, webhook_url)
- if __name__ == '__main__':
- _main()
|