|
@@ -5,7 +5,7 @@ from threading import Timer
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
-from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
|
|
|
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get
|
|
|
from config import set_config
|
|
|
from log import Log
|
|
|
config_, _ = set_config()
|
|
@@ -14,10 +14,8 @@ redis_helper = RedisHelper()
|
|
|
|
|
|
features = [
|
|
|
'apptype',
|
|
|
- 'adcode',
|
|
|
- 'visit_uv_today',
|
|
|
- 'visit_uv_yesterday',
|
|
|
- 'b'
|
|
|
+ '分组',
|
|
|
+ '广告uv'
|
|
|
]
|
|
|
|
|
|
|
|
@@ -62,6 +60,53 @@ def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_reco
|
|
|
return threshold_record_new, robot_msg_record
|
|
|
|
|
|
|
|
|
+def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
|
|
|
+ """根据广告uv计算新的阈值参数"""
|
|
|
+ robot_msg_record = []
|
|
|
+ threshold_record_new = threshold_record.copy()
|
|
|
+ for app_type, target_uv_mapping in ad_target_uv.items():
|
|
|
+ update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
|
|
|
+ ab_test_id = update_threshold_params.get('ab_test_id')
|
|
|
+ temp_df = feature_df[feature_df['apptype'] == int(app_type)]
|
|
|
+ for ab_test_group, target_uv in target_uv_mapping.items():
|
|
|
+ threshold_update = update_threshold_params.get('threshold_update')
|
|
|
+
|
|
|
+ for app_type, config_params in ad_abtest_abcode_config.items():
|
|
|
+ # 获取对应端的数据, 更新阈值参数
|
|
|
+ # log_.info(f"app_type = {app_type}")
|
|
|
+ temp_df = feature_df[feature_df['apptype'] == app_type]
|
|
|
+ ab_test_id = config_params.get('ab_test_id')
|
|
|
+ threshold_update = config_params.get('threshold_update')
|
|
|
+
|
|
|
+ for config_name, ab_code_list in ab_test_config.items():
|
|
|
+ ad_abtest_tag = f"{ab_test_id}-{config_name}"
|
|
|
+ # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
|
|
|
+ if len(ab_code_list) > 0:
|
|
|
+ b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
|
|
|
+ if b_mean < 0:
|
|
|
+ # 阈值按梯度调高
|
|
|
+ gradient = up_threshold_update[config_name].get('gradient')
|
|
|
+ update_range = up_threshold_update[config_name].get('update_range')
|
|
|
+ b_i = (b_mean * -1) // gradient + 1
|
|
|
+ threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
|
|
|
+ elif b_mean > 0.1:
|
|
|
+ # 阈值按梯度调低
|
|
|
+ gradient = down_threshold_update[config_name].get('gradient')
|
|
|
+ update_range = down_threshold_update[config_name].get('update_range')
|
|
|
+ b_i = (b_mean - 0.1) // gradient + 1
|
|
|
+ threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+ if threshold_param_new > 0:
|
|
|
+ threshold_record_new[ad_abtest_tag] = threshold_param_new
|
|
|
+ robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
|
|
|
+ 'gradient': round(gradient, 4), 'range': round(update_range, 4),
|
|
|
+ 'i': int(b_i),
|
|
|
+ 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
|
|
|
+ 'paramNew': round(threshold_param_new, 4)})
|
|
|
+ return threshold_record_new, robot_msg_record
|
|
|
+
|
|
|
+
|
|
|
def update_threshold(threshold_record_old, threshold_record_new):
|
|
|
"""更新阈值"""
|
|
|
ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
|
|
@@ -87,7 +132,7 @@ def update_threshold(threshold_record_old, threshold_record_new):
|
|
|
redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new)
|
|
|
|
|
|
|
|
|
-def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
|
|
|
+def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
|
|
|
# 获取当前阈值参数值
|
|
|
threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
|
|
|
threshold_record = eval(threshold_record)
|
|
@@ -95,10 +140,11 @@ def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
|
|
|
# 获取uv数据
|
|
|
feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
|
|
|
feature_df['apptype'] = feature_df['apptype'].astype(int)
|
|
|
- feature_df['b'] = feature_df['b'].astype(float)
|
|
|
- # 根据活跃人数变化计算新的阈值参数
|
|
|
- threshold_record_new, robot_msg_record = get_threshold_record_new(
|
|
|
- ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record)
|
|
|
+ feature_df['广告uv'] = feature_df['广告uv'].astype(float)
|
|
|
+ # 根据广告uv变化计算新的阈值参数
|
|
|
+ threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv(
|
|
|
+ ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df,
|
|
|
+ threshold_record=threshold_record, ad_target_uv=ad_target_uv)
|
|
|
log_.info(f"threshold_record_new = {threshold_record_new}")
|
|
|
# 更新阈值
|
|
|
update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
|
|
@@ -108,23 +154,54 @@ def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
|
|
|
return robot_msg_record
|
|
|
|
|
|
|
|
|
+def get_ad_target_uv():
|
|
|
+ """获取管理后台开启自动调整阈值开关的目标uv值"""
|
|
|
+ ad_target_uv = {}
|
|
|
+ result = request_get(request_url=config_.GET_AD_TARGET_UV_URL)
|
|
|
+ if result is None:
|
|
|
+ log_.info('获取管理后台广告目标uv值失败!')
|
|
|
+ return ad_target_uv
|
|
|
+ if result['code'] != 0:
|
|
|
+ log_.info('获取管理后台广告目标uv值失败!')
|
|
|
+ return ad_target_uv
|
|
|
+ if not result['content']:
|
|
|
+ return ad_target_uv
|
|
|
+ for item in result['content']:
|
|
|
+ app_type = item['productId']
|
|
|
+ target_uv_mapping = {}
|
|
|
+ for uv_item in item['uvTargetDetails']:
|
|
|
+ ab_group = uv_item['abParam']
|
|
|
+ target_uv = uv_item['uvTarget']
|
|
|
+ target_uv_mapping[ab_group] = target_uv
|
|
|
+ ad_target_uv[app_type] = target_uv_mapping
|
|
|
+ return ad_target_uv
|
|
|
+
|
|
|
+
|
|
|
def timer_check():
|
|
|
try:
|
|
|
+ # 获取自动调整阈值参数
|
|
|
ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
|
|
|
project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
|
|
|
table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
|
|
|
now_date = datetime.datetime.today()
|
|
|
now_min = datetime.datetime.now().minute
|
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
|
|
|
+
|
|
|
+ # 管理后台获取开启自动调整阈值开关的目标uv值
|
|
|
+ ad_target_uv = get_ad_target_uv()
|
|
|
+ log_.info(f"ad_target_uv: {ad_target_uv}")
|
|
|
+ if len(ad_target_uv) == 0:
|
|
|
+ return
|
|
|
|
|
|
# 查看当前更新的数据是否已准备好
|
|
|
+ dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
|
|
|
data_count = data_check(project=project, table=table, dt=dt)
|
|
|
if data_count > 0:
|
|
|
log_.info(f"data count = {data_count}")
|
|
|
# 数据准备好,进行更新
|
|
|
robot_msg_record = update_ad_abtest_threshold(
|
|
|
- project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config)
|
|
|
+ project=project, table=table, dt=dt,
|
|
|
+ ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv)
|
|
|
if len(robot_msg_record) > 0:
|
|
|
robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
|
|
|
msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n"
|