| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 | 
							- import datetime
 
- import traceback
 
- import numpy as np
 
- from threading import Timer
 
- import pandas as pd
 
- from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 
- from config import set_config
 
- from log import Log
 
- config_, _ = set_config()
 
- log_ = Log()
 
- redis_helper = RedisHelper()
 
- features = [
 
-     'apptype',
 
-     'adcode',
 
-     'visit_uv_today',
 
-     'visit_uv_yesterday',
 
-     'b'
 
- ]
 
- def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
 
-     """根据活跃人数变化计算新的阈值参数"""
 
-     robot_msg_record = []
 
-     threshold_record_new = threshold_record.copy()
 
-     for app_type, config_params in ad_abtest_abcode_config.items():
 
-         # 获取对应端的数据, 更新阈值参数
 
-         # log_.info(f"app_type = {app_type}")
 
-         temp_df = feature_df[feature_df['apptype'] == app_type]
 
-         ab_test_id = config_params.get('ab_test_id')
 
-         ab_test_config = config_params.get('ab_test_config')
 
-         up_threshold_update = config_params.get('up_threshold_update')
 
-         down_threshold_update = config_params.get('down_threshold_update')
 
-         for config_name, ab_code_list in ab_test_config.items():
 
-             ad_abtest_tag = f"{ab_test_id}-{config_name}"
 
-             # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
 
-             if len(ab_code_list) > 0:
 
-                 b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
 
-                 if b_mean < 0:
 
-                     # 阈值按梯度调高
 
-                     gradient = up_threshold_update.get('gradient')
 
-                     update_range = up_threshold_update.get('update_range')
 
-                     b_i = (b_mean * -1) // gradient + 1
 
-                     threshold_param_new = \
 
-                         float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
 
-                 elif b_mean > 0.1:
 
-                     # 阈值按梯度调低
 
-                     b_i = (b_mean - 0.1) // up_threshold_update.get('gradient') + 1
 
-                     threshold_param_new = \
 
-                         float(threshold_record.get(ad_abtest_tag)) - up_threshold_update.get('update_range') * b_i
 
-                 else:
 
-                     continue
 
-                 if threshold_param_new > 0:
 
-                     threshold_record_new[ad_abtest_tag] = threshold_param_new
 
-                     robot_msg_record.append({'appType': app_type, 'ad_abtest_tag': ad_abtest_tag,
 
-                                              'b_i': int(b_i), 'update_param':
 
-                                              'param_old': round(float(threshold_record.get(ad_abtest_tag)), 4),
 
-                                              'param_new': round(threshold_param_new, 4)})
 
-     return threshold_record_new, robot_msg_record
 
- def update_threshold(threshold_record_old, threshold_record_new):
 
-     """更新阈值"""
 
-     ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
 
-                          for group in group_list]
 
-     ad_mid_group_list.append("mean_group")
 
-     ad_mid_group_list = list(set(ad_mid_group_list))
 
-     for ad_abtest_tag, threshold_param_new in threshold_record_new.items():
 
-         threshold_param_old = threshold_record_old.get(ad_abtest_tag)
 
-         log_.info(f"ad_abtest_tag = {ad_abtest_tag}, "
 
-                   f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
 
-         tag_list = ad_abtest_tag.split('-')
 
-         for group_key in ad_mid_group_list:
 
-             # 获取对应的阈值
 
-             key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}"
 
-             threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
 
-             if threshold_old is None:
 
-                 continue
 
-             # 计算新的阈值
 
-             threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new
 
-             log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, "
 
-                       f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
 
-             # 更新redis
 
-             redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new)
 
- def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
 
-     # 获取当前阈值参数值
 
-     threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
 
-     threshold_record = eval(threshold_record)
 
-     log_.info(f"threshold_record = {threshold_record}")
 
-     # 获取uv数据
 
-     feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
 
-     feature_df['apptype'] = feature_df['apptype'].astype(int)
 
-     feature_df['b'] = feature_df['b'].astype(float)
 
-     # 根据活跃人数变化计算新的阈值参数
 
-     threshold_record_new, robot_msg_record = get_threshold_record_new(
 
-         ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record)
 
-     log_.info(f"threshold_record_new = {threshold_record_new}")
 
-     # 更新阈值
 
-     update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
 
-     # 更新阈值参数
 
-     redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
 
-                                    value=str(threshold_record_new))
 
-     return robot_msg_record
 
- def timer_check():
 
-     try:
 
-         ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
 
-         project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
 
-         table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
 
-         now_date = datetime.datetime.today()
 
-         now_min = datetime.datetime.now().minute
 
-         log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
 
-         dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
 
-         # 查看当前更新的数据是否已准备好
 
-         data_count = data_check(project=project, table=table, dt=dt)
 
-         if data_count > 0:
 
-             log_.info(f"data count = {data_count}")
 
-             # 数据准备好,进行更新
 
-             robot_msg_record = update_ad_abtest_threshold(
 
-                 project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config)
 
-             if len(robot_msg_record) > 0:
 
-                 robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
 
-                 msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ',   ')}\n"
 
-             else:
 
-                 msg = "无需更新!\n"
 
-             send_msg_to_feishu(
 
-                 webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
 
-                 key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
 
-                 msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
 
-             )
 
-             log_.info(f"threshold update end!")
 
-         elif now_min > 30:
 
-             log_.info('threshold update data is None!')
 
-             send_msg_to_feishu(
 
-                 webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
 
-                 key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
 
-                 msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
 
-             )
 
-         else:
 
-             # 数据没准备好,1分钟后重新检查
 
-             Timer(60, timer_check).start()
 
-     except Exception as e:
 
-         log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
 
-         send_msg_to_feishu(
 
-             webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
 
-             key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
 
-             msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
 
-                      f"exception: {e}\n"
 
-                      f"traceback: {traceback.format_exc()}"
 
-         )
 
- if __name__ == '__main__':
 
-     timer_check()
 
 
  |