123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- import configparser
- from datetime import datetime, timedelta
- from typing import Dict, List
- import pandas as pd
- from client import YarnClient, ODPSClient
- from util import date_util, feishu_inform_util
- yarn_client = YarnClient.YarnClient("121.40.173.140")
- odps_client = ODPSClient.ODPSClient()
- table_list = [
- "alg_mid_feature_sharecf",
- "alg_vid_feature_all_share",
- "alg_vid_feature_all_return",
- "alg_vid_feature_share2return",
- "alg_vid_feature_basic_info",
- "alg_recsys_feature_cf_i2i_new",
- "alg_vid_feature_all_exp_v2",
- "alg_vid_feature_exp2share_v2",
- "alg_vid_feature_feed_noflow_exp_v2",
- "alg_vid_feature_feed_noflow_root_share_v2",
- "alg_vid_feature_feed_noflow_root_return_v2",
- "alg_vid_feature_feed_flow_exp_v2",
- "alg_vid_feature_feed_flow_root_share_v2",
- "alg_vid_feature_feed_flow_root_return_v2",
- "alg_vid_feature_feed_province_exp_v2",
- "alg_vid_feature_feed_province_root_share_v2",
- "alg_vid_feature_feed_province_root_return_v2",
- "alg_recsys_feature_cf_i2i_new_v2",
- "alg_cid_feature_basic_info",
- "alg_cid_feature_adver_action",
- "alg_cid_feature_cid_action",
- "alg_cid_feature_region_action",
- "alg_cid_feature_app_action",
- "alg_cid_feature_week_action",
- "alg_cid_feature_hour_action",
- "alg_cid_feature_brand_action",
- "alg_cid_feature_weChatVersion_action",
- "alg_cid_feature_vid_cf",
- "alg_cid_feature_vid_cf_rank",
- "alg_mid_feature_ad_action",
- "alg_mid_feature_play",
- "alg_mid_feature_share_and_return",
- "alg_mid_feature_play_tags",
- "alg_mid_feature_return_tags",
- "alg_mid_feature_share_tags",
- "alg_mid_feature_returncf",
- "alg_mid_feature_feed_exp_return_tags_v2",
- "alg_mid_feature_feed_exp_share_tags_v2"
- ]
- filter_date = datetime(2024, 1, 1)
- columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
- "数据大小", "创建时间", "更新时间"]
- def df_print(result):
- df = pd.DataFrame(result)
- sorted_df = df.sort_values(by="startedTime")
- sorted_df = sorted_df[columns]
- # 获取表头
- header = ' | '.join(sorted_df.columns)
- # 获取数据行
- def format_row(row):
- return ' | '.join([str(row[col]) for col in sorted_df.columns])
- rows = sorted_df.apply(format_row, axis=1).tolist()
- # 打印输出
- print(header)
- print('\n'.join(rows))
- def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
- filtered_data = [
- item for item in spark_task_list
- if table_name in item['name'] and date_util.str_cover_date(item['startedTime']) > filter_date
- ]
- if filtered_data:
- latest_started_time = max(
- [date_util.str_cover_date(item['startedTime']) for item in filtered_data])
- print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
- now = datetime.now()
- time_difference = now - latest_started_time
- if time_difference > timedelta(minutes=140):
- return True, latest_started_time
- return False, filtered_data
- # 如果没有找到,表示近七个小时都没有同步过
- return True, "七小时之前"
- def send_error_info(table_name: str, latest_started_time: str, webhook: str):
- mgs_text = f"\n- 大数据表名: {table_name}" \
- f"\n- 最后一次同步时间: {latest_started_time}" \
- f"\n- 超过两个小时没有同步,请关注"
- card_json = {
- "config": {},
- "i18n_elements": {
- "zh_cn": [
- {
- "tag": "markdown",
- "content": "",
- "text_align": "left",
- "text_size": "normal"
- },
- {
- "tag": "markdown",
- "content": mgs_text,
- "text_align": "left",
- "text_size": "normal"
- }
- ]
- },
- "i18n_header": {
- "zh_cn": {
- "title": {
- "tag": "plain_text",
- "content": "【测试】特征同步延迟告警"
- },
- "subtitle": {
- "tag": "plain_text",
- "content": ""
- },
- "template": "red"
- }
- }
- }
- feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
- def _main():
- # 读取配置文件
- config = configparser.ConfigParser()
- config.read("/home/monitor/model_monitor/config/config.ini")
- webhook_url = config.get("feishu", "model.webhook")
- # 获取最近七小时的Spark任务
- hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
- result = yarn_client.get_apps(started_time_begin=hours_7_early)
- for table_name in table_list:
- # 判断最后一次同步是否为两个小时以内
- b, latest_started_time = handle_table(table_name, result)
- if b:
- print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
- send_error_info(table_name, latest_started_time, webhook_url)
- def _analyse():
- hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
- result = yarn_client.get_apps(started_time_begin=hours_7_early)
- result = [
- {
- **{k: v for k, v in item.items() if k != 'name'},
- 'table_name': item['name'].split(":")[1].strip()
- }
- for item in result
- if "alg" in item['name'] and item['state'] == 'RUNNING'
- ]
- partition_info = {}
- for table_name in list({item['table_name'] for item in result}):
- resp = odps_client.get_all_partition_info(table_name=table_name)
- partition_info[table_name] = {item['分区']: item for item in resp}
- spark_task_list = []
- for item in result:
- dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
- if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
- item = {
- **item,
- **partition_info[item['table_name']][dt_hh]
- }
- spark_task_list.append(item)
- df_print(spark_task_list)
- if __name__ == '__main__':
- # _main()
- _analyse()
|