# 对训练数据的分布进行监控 import numpy as np import pandas as pd import datetime from config import set_config from rov_train import process_data, process_predict_data from utils import send_msg_to_feishu config_, env = set_config() def get_feature_distribution(feature_name, feature_data): statistical_results = {'feature_name': feature_name} feature_data = np.array(feature_data) feature_data_sorted = sorted(feature_data) length = len(feature_data_sorted) count_0 = len([item for item in feature_data_sorted if item == 0]) print('data_count = {}, count_0 = {}, rate_0 = {}'.format(length, count_0, count_0/length)) statistical_results['data_count'] = length statistical_results['0_count'] = count_0 statistical_results['0_rate'] = count_0/length # 整体数据分布 for percentile in [0.25, 0.5, 0.75, 1]: data_count = int(length * percentile) data = feature_data_sorted[:data_count + 1] data_mean = np.mean(data) data_var = np.var(data) data_std = np.std(data) # print('percentile = {}, data_count = {}, mean = {}, var = {}, std = {}'.format( # percentile, data_count, data_mean, data_var, data_std)) statistical_results['mean_{}'.format(percentile)] = data_mean statistical_results['var_{}'.format(percentile)] = data_var statistical_results['std_{}'.format(percentile)] = data_std # 非零数据分布 data_non_zero = [item for item in feature_data_sorted if item != 0] for percentile in [0.25, 0.5, 0.75, 1]: data_count = int(len(data_non_zero) * percentile) data = data_non_zero[:data_count + 1] data_mean = np.mean(data) data_var = np.var(data) data_std = np.std(data) # print('percentile = {}, data_count = {}, mean = {}, var = {}, std = {}'.format( # percentile, data_count, data_mean, data_var, dat_std)) statistical_results['non_zero_mean_{}'.format(percentile)] = data_mean statistical_results['non_zero_var_{}'.format(percentile)] = data_var statistical_results['non_zero_std_{}'.format(percentile)] = data_std return statistical_results def all_feature_distribution(data, file): res = [] columns = [ 'feature_name', 'data_count', '0_count', '0_rate', 'mean_0.25', 'mean_0.5', 'mean_0.75', 'mean_1', 'var_0.25', 'var_0.5', 'var_0.75', 'var_1', 'std_0.25', 'std_0.5', 'std_0.75', 'std_1', 'non_zero_mean_0.25', 'non_zero_mean_0.5', 'non_zero_mean_0.75', 'non_zero_mean_1', 'non_zero_var_0.25', 'non_zero_var_0.5', 'non_zero_var_0.75', 'non_zero_var_1', 'non_zero_std_0.25', 'non_zero_std_0.5', 'non_zero_std_0.75', 'non_zero_std_1' ] feature_importance = pd.read_csv('data/model_feature_importance.csv') feature_name_list = list(feature_importance['feature']) for feature_name in feature_name_list: print(feature_name) feature_data = data[feature_name] statistical_results = get_feature_distribution(feature_name=feature_name, feature_data=feature_data) res.append(statistical_results) df = pd.DataFrame(res, columns=columns) df.to_csv(file) def cal_diff(base_data_file, compare_data_file, feature_top=-1): """ 计算数据偏移量,并对超过指定指标的特征进行飞书报警 :param base_data_file: 对比的基准数据文件路径 :param compare_data_file: 需对比的数据文件路径 :param feature_top: 根据特征重要性排序获取的特征top,默认:-1,所有特征 :return: 偏移量超过指定指标的特征以及对应的偏移量 """ df_init_base = pd.read_csv(base_data_file, index_col='feature_name') df_init_compare = pd.read_csv(compare_data_file, index_col='feature_name') # 计算特征数据偏移量 feature_top_compare = df_init_compare.index.values[:feature_top] # 需观测的指标 monitor_list = [ '0_rate', 'mean_1', 'var_1', 'non_zero_mean_0.25', 'non_zero_mean_0.5', 'non_zero_mean_0.75', 'non_zero_mean_1', 'non_zero_var_0.25', 'non_zero_var_0.5', 'non_zero_var_0.75', 'non_zero_var_1' ] df_base = df_init_base.loc[feature_top_compare, monitor_list] df_compare = df_init_compare.loc[feature_top_compare, monitor_list] df_diff = abs(df_compare - df_base) / abs(df_base) # 获取偏移超过 10% 的特征及对应的偏移量 offset_feature_list = [] for column in df_diff.columns.to_list(): df_temp = df_diff.loc[df_diff[column] >= 0.1] if not df_temp.empty: offset_feature_list.append(df_temp) if len(offset_feature_list) == 0: return None df_offset = pd.concat(offset_feature_list) # 去重 df_offset.drop_duplicates(inplace=True) return df_offset def main(): now_date = datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d') # now_date = '20220119' # 训练数据 print('train data monitor...') train_data_file = 'data/train_data_monitor_{}.csv'.format(now_date) train_filename = config_.TRAIN_DATA_FILENAME # train_x, train_y, videos, fea = process_data(filename=train_filename) # all_feature_distribution(train_x, file=train_data_file) # 预测数据 print('predict data monitor...') predict_data_file = 'data/predict_data_monitor_{}.csv'.format(now_date) predict_filename = config_.PREDICT_DATA_FILENAME # predict_x, video_ids = process_predict_data(filename=predict_filename) # all_feature_distribution(predict_x, file=predict_data_file) # 数据偏移监控 print('calculate offset...') webhook = config_.FEISHU_ROBOT['feature_monitor_robot'].get('webhook') key_word = config_.FEISHU_ROBOT['feature_monitor_robot'].get('key_word') # 训练数据特征与前一天对比 yesterday_date = datetime.datetime.strftime( datetime.datetime.today() - datetime.timedelta(days=1), '%Y%m%d' ) train_data_yesterday_file = 'data/train_data_monitor_{}.csv'.format(yesterday_date) df_offset_train = cal_diff(base_data_file=train_data_yesterday_file, compare_data_file=train_data_file) if df_offset_train is not None: msg_text = '\ndate: {} \ntag: train_offset \n{}'.format(now_date, df_offset_train) send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text) # 预测数据特征与前一天对比 predict_data_yesterday_file = 'data/predict_data_monitor_{}.csv'.format(yesterday_date) df_offset_predict = cal_diff(base_data_file=predict_data_yesterday_file, compare_data_file=predict_data_file) if df_offset_predict is not None: msg_text = '\ndate: {} \ntag: predict_offset \n{}'.format(now_date, df_offset_predict) send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text) # 当天预测数据特征与训练数据特征对比 df_offset_predict_train = cal_diff(base_data_file=train_data_file, compare_data_file=predict_data_file) if df_offset_predict_train is not None: msg_text = '\ndate: {} \ntag: predict_train_offset \n{}'.format(now_date, df_offset_predict_train) send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text) if __name__ == '__main__': main()