data_monitor.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # 对训练数据的分布进行监控
  2. import numpy as np
  3. import pandas as pd
  4. import datetime
  5. from my_config import set_config
  6. from rov_train import process_data, process_predict_data
  7. from my_utils import send_msg_to_feishu
  8. config_, env = set_config()
  9. def get_feature_distribution(feature_name, feature_data):
  10. statistical_results = {'feature_name': feature_name}
  11. feature_data = np.array(feature_data)
  12. feature_data_sorted = sorted(feature_data)
  13. length = len(feature_data_sorted)
  14. count_0 = len([item for item in feature_data_sorted if item == 0])
  15. print('data_count = {}, count_0 = {}, rate_0 = {}'.format(length, count_0, count_0/length))
  16. statistical_results['data_count'] = length
  17. statistical_results['0_count'] = count_0
  18. statistical_results['0_rate'] = count_0/length
  19. # 整体数据分布
  20. for percentile in [0.25, 0.5, 0.75, 1]:
  21. data_count = int(length * percentile)
  22. data = feature_data_sorted[:data_count + 1]
  23. data_mean = np.mean(data)
  24. data_var = np.var(data)
  25. data_std = np.std(data)
  26. # print('percentile = {}, data_count = {}, mean = {}, var = {}, std = {}'.format(
  27. # percentile, data_count, data_mean, data_var, data_std))
  28. statistical_results['mean_{}'.format(percentile)] = data_mean
  29. statistical_results['var_{}'.format(percentile)] = data_var
  30. statistical_results['std_{}'.format(percentile)] = data_std
  31. # 非零数据分布
  32. data_non_zero = [item for item in feature_data_sorted if item != 0]
  33. for percentile in [0.25, 0.5, 0.75, 1]:
  34. data_count = int(len(data_non_zero) * percentile)
  35. data = data_non_zero[:data_count + 1]
  36. data_mean = np.mean(data)
  37. data_var = np.var(data)
  38. data_std = np.std(data)
  39. # print('percentile = {}, data_count = {}, mean = {}, var = {}, std = {}'.format(
  40. # percentile, data_count, data_mean, data_var, dat_std))
  41. statistical_results['non_zero_mean_{}'.format(percentile)] = data_mean
  42. statistical_results['non_zero_var_{}'.format(percentile)] = data_var
  43. statistical_results['non_zero_std_{}'.format(percentile)] = data_std
  44. return statistical_results
  45. def all_feature_distribution(data, file):
  46. res = []
  47. columns = [
  48. 'feature_name', 'data_count', '0_count', '0_rate',
  49. 'mean_0.25', 'mean_0.5', 'mean_0.75', 'mean_1',
  50. 'var_0.25', 'var_0.5', 'var_0.75', 'var_1',
  51. 'std_0.25', 'std_0.5', 'std_0.75', 'std_1',
  52. 'non_zero_mean_0.25', 'non_zero_mean_0.5', 'non_zero_mean_0.75', 'non_zero_mean_1',
  53. 'non_zero_var_0.25', 'non_zero_var_0.5', 'non_zero_var_0.75', 'non_zero_var_1',
  54. 'non_zero_std_0.25', 'non_zero_std_0.5', 'non_zero_std_0.75', 'non_zero_std_1'
  55. ]
  56. feature_importance = pd.read_csv('data/model_feature_importance.csv')
  57. feature_name_list = list(feature_importance['feature'])
  58. for feature_name in feature_name_list:
  59. print(feature_name)
  60. feature_data = data[feature_name]
  61. statistical_results = get_feature_distribution(feature_name=feature_name, feature_data=feature_data)
  62. res.append(statistical_results)
  63. df = pd.DataFrame(res, columns=columns)
  64. df.to_csv(file)
  65. def cal_diff(base_data_file, compare_data_file, feature_top=-1):
  66. """
  67. 计算数据偏移量,并对超过指定指标的特征进行飞书报警
  68. :param base_data_file: 对比的基准数据文件路径
  69. :param compare_data_file: 需对比的数据文件路径
  70. :param feature_top: 根据特征重要性排序获取的特征top,默认:-1,所有特征
  71. :return: 偏移量超过指定指标的特征以及对应的偏移量
  72. """
  73. df_init_base = pd.read_csv(base_data_file, index_col='feature_name')
  74. df_init_compare = pd.read_csv(compare_data_file, index_col='feature_name')
  75. # 计算特征数据偏移量
  76. feature_top_compare = df_init_compare.index.values[:feature_top]
  77. # 需观测的指标
  78. monitor_list = [
  79. '0_rate',
  80. 'mean_1', 'var_1',
  81. 'non_zero_mean_0.25', 'non_zero_mean_0.5', 'non_zero_mean_0.75', 'non_zero_mean_1',
  82. 'non_zero_var_0.25', 'non_zero_var_0.5', 'non_zero_var_0.75', 'non_zero_var_1'
  83. ]
  84. df_base = df_init_base.loc[feature_top_compare, monitor_list]
  85. df_compare = df_init_compare.loc[feature_top_compare, monitor_list]
  86. df_diff = abs(df_compare - df_base) / abs(df_base)
  87. # 获取偏移超过 10% 的特征及对应的偏移量
  88. offset_feature_list = []
  89. for column in df_diff.columns.to_list():
  90. df_temp = df_diff.loc[df_diff[column] >= 0.1]
  91. if not df_temp.empty:
  92. offset_feature_list.append(df_temp)
  93. if len(offset_feature_list) == 0:
  94. return None
  95. df_offset = pd.concat(offset_feature_list)
  96. # 去重
  97. df_offset.drop_duplicates(inplace=True)
  98. return df_offset
  99. def main():
  100. now_date = datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d')
  101. # now_date = '20220119'
  102. # 训练数据
  103. print('train data monitor...')
  104. train_data_file = 'data/train_data_monitor_{}.csv'.format(now_date)
  105. train_filename = config_.TRAIN_DATA_FILENAME
  106. train_x, train_y, videos, fea = process_data(filename=train_filename)
  107. all_feature_distribution(train_x, file=train_data_file)
  108. # 预测数据
  109. print('predict data monitor...')
  110. predict_data_file = 'data/predict_data_monitor_{}.csv'.format(now_date)
  111. predict_filename = config_.PREDICT_DATA_FILENAME
  112. predict_x, video_ids = process_predict_data(filename=predict_filename)
  113. all_feature_distribution(predict_x, file=predict_data_file)
  114. # 数据偏移监控
  115. print('calculate offset...')
  116. webhook = config_.FEISHU_ROBOT['feature_monitor_robot'].get('webhook')
  117. key_word = config_.FEISHU_ROBOT['feature_monitor_robot'].get('key_word')
  118. # 训练数据特征与前一天对比
  119. yesterday_date = datetime.datetime.strftime(
  120. datetime.datetime.today() - datetime.timedelta(days=1),
  121. '%Y%m%d'
  122. )
  123. train_data_yesterday_file = 'data/train_data_monitor_{}.csv'.format(yesterday_date)
  124. df_offset_train = cal_diff(base_data_file=train_data_yesterday_file,
  125. compare_data_file=train_data_file)
  126. if df_offset_train is not None:
  127. msg_text = '\ndate: {} \ntag: train_offset \n{}'.format(now_date, df_offset_train)
  128. send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text)
  129. # 预测数据特征与前一天对比
  130. predict_data_yesterday_file = 'data/predict_data_monitor_{}.csv'.format(yesterday_date)
  131. df_offset_predict = cal_diff(base_data_file=predict_data_yesterday_file,
  132. compare_data_file=predict_data_file)
  133. if df_offset_predict is not None:
  134. msg_text = '\ndate: {} \ntag: predict_offset \n{}'.format(now_date, df_offset_predict)
  135. send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text)
  136. # 当天预测数据特征与训练数据特征对比
  137. df_offset_predict_train = cal_diff(base_data_file=train_data_file, compare_data_file=predict_data_file)
  138. if df_offset_predict_train is not None:
  139. msg_text = '\ndate: {} \ntag: predict_train_offset \n{}'.format(now_date, df_offset_predict_train)
  140. send_msg_to_feishu(webhook=webhook, key_word=key_word, msg_text=msg_text)
  141. if __name__ == '__main__':
  142. main()