|
@@ -1,5 +1,7 @@
|
|
|
+import copy
|
|
|
import datetime
|
|
|
import traceback
|
|
|
+import math
|
|
|
import numpy as np
|
|
|
from threading import Timer
|
|
|
|
|
@@ -63,73 +65,134 @@ def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_reco
|
|
|
def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
|
|
|
"""根据广告uv计算新的阈值参数"""
|
|
|
robot_msg_record = []
|
|
|
- threshold_record_new = threshold_record.copy()
|
|
|
+ threshold_record_new = copy.deepcopy(threshold_record)
|
|
|
+ # 根据目标uv进行调整
|
|
|
for app_type, target_uv_mapping in ad_target_uv.items():
|
|
|
+ # 获取app_type对应的目标uv
|
|
|
+ temp_df = feature_df[feature_df['apptype'] == int(app_type)]
|
|
|
+ # 获取app_type对应的阈值调整参数
|
|
|
update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
|
|
|
ab_test_id = update_threshold_params.get('ab_test_id')
|
|
|
- temp_df = feature_df[feature_df['apptype'] == int(app_type)]
|
|
|
+ not_update = update_threshold_params.get('not_update')
|
|
|
+ gradient = update_threshold_params.get('gradient')
|
|
|
+ max_update_step = update_threshold_params.get('max_update_step')
|
|
|
+ threshold_update_mapping = update_threshold_params.get('threshold_update')
|
|
|
+ threshold_record_old = threshold_record.get(ab_test_id)
|
|
|
+ # print(ab_test_id, threshold_record, threshold_record_old)
|
|
|
for ab_test_group, target_uv in target_uv_mapping.items():
|
|
|
- threshold_update = update_threshold_params.get('threshold_update')
|
|
|
-
|
|
|
- for app_type, config_params in ad_abtest_abcode_config.items():
|
|
|
- # 获取对应端的数据, 更新阈值参数
|
|
|
- # log_.info(f"app_type = {app_type}")
|
|
|
- temp_df = feature_df[feature_df['apptype'] == app_type]
|
|
|
- ab_test_id = config_params.get('ab_test_id')
|
|
|
- threshold_update = config_params.get('threshold_update')
|
|
|
-
|
|
|
- for config_name, ab_code_list in ab_test_config.items():
|
|
|
- ad_abtest_tag = f"{ab_test_id}-{config_name}"
|
|
|
- # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
|
|
|
- if len(ab_code_list) > 0:
|
|
|
- b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
|
|
|
- if b_mean < 0:
|
|
|
- # 阈值按梯度调高
|
|
|
- gradient = up_threshold_update[config_name].get('gradient')
|
|
|
- update_range = up_threshold_update[config_name].get('update_range')
|
|
|
- b_i = (b_mean * -1) // gradient + 1
|
|
|
- threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
|
|
|
- elif b_mean > 0.1:
|
|
|
- # 阈值按梯度调低
|
|
|
- gradient = down_threshold_update[config_name].get('gradient')
|
|
|
- update_range = down_threshold_update[config_name].get('update_range')
|
|
|
- b_i = (b_mean - 0.1) // gradient + 1
|
|
|
- threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
|
|
|
- else:
|
|
|
- continue
|
|
|
- if threshold_param_new > 0:
|
|
|
- threshold_record_new[ad_abtest_tag] = threshold_param_new
|
|
|
- robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
|
|
|
- 'gradient': round(gradient, 4), 'range': round(update_range, 4),
|
|
|
- 'i': int(b_i),
|
|
|
- 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
|
|
|
- 'paramNew': round(threshold_param_new, 4)})
|
|
|
+ if target_uv is None:
|
|
|
+ continue
|
|
|
+ update_range = threshold_update_mapping.get(ab_test_group)
|
|
|
+ # 获取对应组的当前uv
|
|
|
+ current_uv = temp_df[temp_df['分组'] == ab_test_group]['广告uv'].values[0]
|
|
|
+ # 计算uv差值
|
|
|
+ uv_differ = current_uv - target_uv
|
|
|
+ if abs(uv_differ) < not_update:
|
|
|
+ continue
|
|
|
+ # 获取当前阈值参数
|
|
|
+ threshold_param_old = threshold_record_old[ab_test_group].get('group')
|
|
|
+ if uv_differ < 0:
|
|
|
+ # 当前uv < 目标uv,阈值按梯度调低
|
|
|
+ step = math.ceil((uv_differ * -1) / gradient)
|
|
|
+ step = max_update_step if step > max_update_step else step
|
|
|
+ threshold_param_new = float(threshold_param_old) - update_range * step
|
|
|
+ elif uv_differ > 0:
|
|
|
+ # 当前uv > 目标uv,阈值按梯度调高
|
|
|
+ step = math.ceil(uv_differ / gradient)
|
|
|
+ step = max_update_step if step > max_update_step else step
|
|
|
+ threshold_param_new = float(threshold_param_old) + update_range * step
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+ if threshold_param_new > 0:
|
|
|
+ log_.info(
|
|
|
+ {
|
|
|
+ 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
|
|
|
+ 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
|
|
|
+ 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
|
|
|
+ 'range': round(update_range, 4),
|
|
|
+ 'paramOld': round(float(threshold_param_old), 4),
|
|
|
+ 'paramNew': round(threshold_param_new, 4)
|
|
|
+ }
|
|
|
+ )
|
|
|
+ threshold_record_new[ab_test_id][ab_test_group]['group'] = threshold_param_new
|
|
|
+ threshold_record_new[ab_test_id][ab_test_group]['mean_group'] = threshold_param_new
|
|
|
+ robot_msg_record.append(
|
|
|
+ {
|
|
|
+ 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
|
|
|
+ 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
|
|
|
+ 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
|
|
|
+ 'range': round(update_range, 4),
|
|
|
+ 'paramOld': round(float(threshold_param_old), 4),
|
|
|
+ 'paramNew': round(threshold_param_new, 4)
|
|
|
+ }
|
|
|
+ )
|
|
|
return threshold_record_new, robot_msg_record
|
|
|
|
|
|
|
|
|
def update_threshold(threshold_record_old, threshold_record_new):
|
|
|
"""更新阈值"""
|
|
|
+ # 获取用户组列表
|
|
|
ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
|
|
|
for group in group_list]
|
|
|
ad_mid_group_list.append("mean_group")
|
|
|
ad_mid_group_list = list(set(ad_mid_group_list))
|
|
|
- for ad_abtest_tag, threshold_param_new in threshold_record_new.items():
|
|
|
- threshold_param_old = threshold_record_old.get(ad_abtest_tag)
|
|
|
- log_.info(f"ad_abtest_tag = {ad_abtest_tag}, "
|
|
|
- f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
|
|
|
- tag_list = ad_abtest_tag.split('-')
|
|
|
- for group_key in ad_mid_group_list:
|
|
|
- # 获取对应的阈值
|
|
|
- key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}"
|
|
|
- threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
|
|
|
- if threshold_old is None:
|
|
|
+ # 获取实验配置列表
|
|
|
+ ad_abtest_config_mapping = {}
|
|
|
+ abtest_id_list = []
|
|
|
+ for key, val in config_.AD_ABTEST_CONFIG.items():
|
|
|
+ abtest_id, abtest_config_tag = key.split('-')
|
|
|
+ if abtest_id in abtest_id_list:
|
|
|
+ ad_abtest_config_mapping[abtest_id].append((abtest_config_tag, val))
|
|
|
+ else:
|
|
|
+ abtest_id_list.append(abtest_id)
|
|
|
+ ad_abtest_config_mapping[abtest_id] = [(abtest_config_tag, val)]
|
|
|
+ log_.info(f"ad_abtest_config_mapping = {ad_abtest_config_mapping}")
|
|
|
+
|
|
|
+ # 计算新的阈值并更新
|
|
|
+ for abtest_id, threshold_param_mapping in threshold_record_new.items():
|
|
|
+ for abtest_group, threshold_param_new in threshold_param_mapping.items():
|
|
|
+ threshold_param_old = threshold_record_old[abtest_id].get(abtest_group)
|
|
|
+ if str(threshold_param_old) == str(threshold_param_new):
|
|
|
+ # print(abtest_id, abtest_group, threshold_param_old, threshold_param_new)
|
|
|
continue
|
|
|
- # 计算新的阈值
|
|
|
- threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new
|
|
|
- log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, "
|
|
|
- f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
|
|
|
- # 更新redis
|
|
|
- redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new)
|
|
|
+ log_.info(f"abtest_id = {abtest_id}, abtest_group = {abtest_group}, "
|
|
|
+ f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
|
|
|
+ for abtest_config_tag, config_val in ad_abtest_config_mapping.get(abtest_id, []):
|
|
|
+ for group_key in ad_mid_group_list:
|
|
|
+ # 获取对应的阈值
|
|
|
+ key_name = \
|
|
|
+ f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
|
|
|
+ threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
|
|
|
+ if threshold_old is None:
|
|
|
+ continue
|
|
|
+ # 计算新的阈值
|
|
|
+ if group_key == 'mean_group':
|
|
|
+ threshold_new = \
|
|
|
+ float(threshold_old) / threshold_param_old['mean_group'] * threshold_param_new['mean_group']
|
|
|
+ else:
|
|
|
+ threshold_new = \
|
|
|
+ float(threshold_old) / threshold_param_old['group'] * threshold_param_new['group']
|
|
|
+
|
|
|
+ # 更新redis
|
|
|
+ redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new, expire_time=2 * 24 * 3600)
|
|
|
+ log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
|
|
|
+ f"abtest_group = {abtest_group}, group_key = {group_key}, "
|
|
|
+ f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
|
|
|
+
|
|
|
+ # 关怀模式实验阈值更新
|
|
|
+ care_model = config_val.get('care_model', None)
|
|
|
+ threshold_rate = config_val.get('threshold_rate', None)
|
|
|
+ if care_model is True:
|
|
|
+ care_model_key_name = \
|
|
|
+ f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
|
|
|
+ care_model_threshold_old = redis_helper.get_data_from_redis(key_name=care_model_key_name)
|
|
|
+ care_model_threshold_new = threshold_new * threshold_rate
|
|
|
+ redis_helper.set_data_to_redis(key_name=care_model_key_name,
|
|
|
+ value=care_model_threshold_new, expire_time=2 * 24 * 3600)
|
|
|
+ log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
|
|
|
+ f"abtest_group = {abtest_group}, group_key = {group_key}, "
|
|
|
+ f"care_model_threshold_old = {care_model_threshold_old}, "
|
|
|
+ f"care_model_threshold_new = {care_model_threshold_new}")
|
|
|
|
|
|
|
|
|
def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
|
|
@@ -150,7 +213,7 @@ def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_t
|
|
|
update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
|
|
|
# 更新阈值参数
|
|
|
redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
|
|
|
- value=str(threshold_record_new))
|
|
|
+ value=str(threshold_record_new), expire_time=2 * 24 * 3600)
|
|
|
return robot_msg_record
|
|
|
|
|
|
|