123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- import copy
- import datetime
- import traceback
- import math
- import numpy as np
- from threading import Timer
- import pandas as pd
- from my_utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get
- from my_config import set_config
- from log import Log
- config_, _ = set_config()
- log_ = Log()
- redis_helper = RedisHelper()
- features = [
- 'apptype',
- '分组',
- '广告uv'
- ]
- def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
- """根据活跃人数变化计算新的阈值参数"""
- robot_msg_record = []
- threshold_record_new = threshold_record.copy()
- for app_type, config_params in ad_abtest_abcode_config.items():
- # 获取对应端的数据, 更新阈值参数
- # log_.info(f"app_type = {app_type}")
- temp_df = feature_df[feature_df['apptype'] == app_type]
- ab_test_id = config_params.get('ab_test_id')
- ab_test_config = config_params.get('ab_test_config')
- up_threshold_update = config_params.get('up_threshold_update')
- down_threshold_update = config_params.get('down_threshold_update')
- for config_name, ab_code_list in ab_test_config.items():
- ad_abtest_tag = f"{ab_test_id}-{config_name}"
- # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
- if len(ab_code_list) > 0:
- b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
- if b_mean < 0:
- # 阈值按梯度调高
- gradient = up_threshold_update[config_name].get('gradient')
- update_range = up_threshold_update[config_name].get('update_range')
- b_i = (b_mean * -1) // gradient + 1
- threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
- elif b_mean > 0.1:
- # 阈值按梯度调低
- gradient = down_threshold_update[config_name].get('gradient')
- update_range = down_threshold_update[config_name].get('update_range')
- b_i = (b_mean - 0.1) // gradient + 1
- threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
- else:
- continue
- if threshold_param_new > 0:
- threshold_record_new[ad_abtest_tag] = threshold_param_new
- robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
- 'gradient': round(gradient, 4), 'range': round(update_range, 4),
- 'i': int(b_i),
- 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
- 'paramNew': round(threshold_param_new, 4)})
- return threshold_record_new, robot_msg_record
- def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
- """根据广告uv计算新的阈值参数"""
- robot_msg_record = []
- threshold_record_new = copy.deepcopy(threshold_record)
- # 根据目标uv进行调整
- for app_type, target_uv_mapping in ad_target_uv.items():
- # 获取app_type对应的目标uv
- temp_df = feature_df[feature_df['apptype'] == int(app_type)]
- # 获取app_type对应的阈值调整参数
- update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
- ab_test_id = update_threshold_params.get('ab_test_id')
- not_update = update_threshold_params.get('not_update')
- gradient = update_threshold_params.get('gradient')
- max_update_step = update_threshold_params.get('max_update_step')
- threshold_update_mapping = update_threshold_params.get('threshold_update')
- threshold_record_old = threshold_record.get(ab_test_id)
- # print(ab_test_id, threshold_record, threshold_record_old)
- for ab_test_group, target_uv in target_uv_mapping.items():
- if target_uv is None:
- continue
- gradient, max_update_step = get_ad_uv_update_config(app_type=app_type, ab_group=ab_test_group)
- update_range = threshold_update_mapping.get(ab_test_group)
- # 获取对应组的当前uv
- try:
- current_uv = temp_df[temp_df['分组'] == ab_test_group]['广告uv'].values[0]
- except Exception as e:
- continue
- # 计算uv差值
- uv_differ = current_uv - target_uv
- if abs(uv_differ) <= not_update:
- continue
- # 获取当前阈值参数
- threshold_param_old = threshold_record_old[ab_test_group].get('group')
- if uv_differ < 0:
- # 当前uv < 目标uv,阈值按梯度调低(第一个梯度区间:向上取整,之后:四舍五入)
- if abs(uv_differ) < gradient:
- step = math.ceil(abs(uv_differ) / gradient)
- else:
- step = round(abs(uv_differ) / gradient)
- step = max_update_step if step > max_update_step else step
- threshold_param_new = float(threshold_param_old) - update_range * step
- elif uv_differ > 0:
- # 当前uv > 目标uv,阈值按梯度调高(第一个梯度区间:向上取整,之后:四舍五入)
- if uv_differ < gradient:
- step = math.ceil(uv_differ / gradient)
- else:
- step = round(uv_differ / gradient)
- step = max_update_step if step > max_update_step else step
- threshold_param_new = float(threshold_param_old) + update_range * step
- else:
- continue
- if threshold_param_new <= 0:
- threshold_param_new = 0
- log_.info(
- {
- 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
- 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
- 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
- 'range': round(update_range, 4),
- 'paramOld': round(float(threshold_param_old), 4),
- 'paramNew': round(threshold_param_new, 4)
- }
- )
- threshold_record_new[ab_test_id][ab_test_group]['group'] = threshold_param_new
- threshold_record_new[ab_test_id][ab_test_group]['mean_group'] = threshold_param_new
- robot_msg_record.append(
- {
- 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
- 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
- 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
- 'range': round(update_range, 4),
- 'paramOld': round(float(threshold_param_old), 4),
- 'paramNew': round(threshold_param_new, 4)
- }
- )
- return threshold_record_new, robot_msg_record
- def update_threshold(threshold_record_old, threshold_record_new):
- """更新阈值"""
- # 获取用户组列表
- ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
- for group in group_list]
- ad_mid_group_list.append("mean_group")
- ad_mid_group_list = list(set(ad_mid_group_list))
- # 获取实验配置列表
- ad_abtest_config_mapping = {}
- abtest_id_list = []
- for key, val in config_.AD_ABTEST_CONFIG.items():
- abtest_id, abtest_config_tag = key.split('-')
- if abtest_id in abtest_id_list:
- ad_abtest_config_mapping[abtest_id].append((abtest_config_tag, val))
- else:
- abtest_id_list.append(abtest_id)
- ad_abtest_config_mapping[abtest_id] = [(abtest_config_tag, val)]
- log_.info(f"ad_abtest_config_mapping = {ad_abtest_config_mapping}")
- # 计算新的阈值并更新
- for abtest_id, threshold_param_mapping in threshold_record_new.items():
- for abtest_group, threshold_param_new in threshold_param_mapping.items():
- threshold_param_old = threshold_record_old[abtest_id].get(abtest_group)
- if str(threshold_param_old) == str(threshold_param_new):
- # print(abtest_id, abtest_group, threshold_param_old, threshold_param_new)
- continue
- log_.info(f"abtest_id = {abtest_id}, abtest_group = {abtest_group}, "
- f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
- for abtest_config_tag, config_val in ad_abtest_config_mapping.get(abtest_id, []):
- for group_key in ad_mid_group_list:
- # 获取对应的阈值
- key_name = \
- f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
- threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
- if threshold_old is None:
- continue
- # 原阈值为0时,加10**(-5)兜底处理
- if float(threshold_old) == 0:
- threshold_old = float(threshold_old) + 10**(-5)
- # 计算新的阈值
- if group_key == 'mean_group':
- if threshold_param_old['mean_group'] == 0:
- threshold_new = \
- float(threshold_old) / 10**(-5) * threshold_param_new['mean_group']
- else:
- threshold_new = \
- float(threshold_old) / threshold_param_old['mean_group'] * threshold_param_new['mean_group']
- else:
- if threshold_param_old['group'] == 0:
- threshold_new = \
- float(threshold_old) / 10**(-5) * threshold_param_new['group']
- else:
- threshold_new = \
- float(threshold_old) / threshold_param_old['group'] * threshold_param_new['group']
- # 更新redis
- redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new, expire_time=2 * 24 * 3600)
- log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
- f"abtest_group = {abtest_group}, group_key = {group_key}, "
- f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
- # 关怀模式实验阈值更新
- care_model = config_val.get('care_model', None)
- threshold_rate = config_val.get('threshold_rate', None)
- if care_model is True:
- care_model_key_name = \
- f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
- care_model_threshold_old = redis_helper.get_data_from_redis(key_name=care_model_key_name)
- care_model_threshold_new = threshold_new * threshold_rate
- redis_helper.set_data_to_redis(key_name=care_model_key_name,
- value=care_model_threshold_new, expire_time=2 * 24 * 3600)
- log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
- f"abtest_group = {abtest_group}, group_key = {group_key}, "
- f"care_model_threshold_old = {care_model_threshold_old}, "
- f"care_model_threshold_new = {care_model_threshold_new}")
- def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
- # 获取当前阈值参数值
- threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
- threshold_record = eval(threshold_record)
- log_.info(f"threshold_record = {threshold_record}")
- # 获取uv数据
- feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
- feature_df['apptype'] = feature_df['apptype'].astype(int)
- feature_df['广告uv'] = feature_df['广告uv'].astype(float)
- # 根据广告uv变化计算新的阈值参数
- threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv(
- ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df,
- threshold_record=threshold_record, ad_target_uv=ad_target_uv)
- log_.info(f"threshold_record_new = {threshold_record_new}")
- # 更新阈值
- update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
- # 更新阈值参数
- redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
- value=str(threshold_record_new), expire_time=2 * 24 * 3600)
- return robot_msg_record
- def get_ad_target_uv(now_h):
- """获取管理后台开启自动调整阈值开关的目标uv值"""
- ad_target_uv = {}
- result = request_get(request_url=config_.GET_AD_TARGET_UV_URL)
- if result is None:
- log_.info('获取管理后台广告目标uv值失败!')
- return ad_target_uv
- if result['code'] != 0:
- log_.info('获取管理后台广告目标uv值失败!')
- return ad_target_uv
- if not result['content']:
- return ad_target_uv
- for item in result['content']:
- app_type = item['productId']
- target_uv_mapping = {}
- target_uv_param = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type)).get('target_uv_param', {})
- for uv_item in item['uvTargetDetails']:
- ab_group = uv_item['abParam']
- target_uv = uv_item['uvTarget']
- target_uv_param_group = target_uv_param.get(ab_group, None)
- if target_uv_param_group is not None:
- update_hours = target_uv_param_group.get('update_hours')
- update_param = target_uv_param_group.get('update_param')
- if now_h in update_hours:
- target_uv *= update_param
- target_uv_mapping[ab_group] = target_uv
- ad_target_uv[app_type] = target_uv_mapping
- return ad_target_uv
- def get_ad_uv_update_config(app_type, ab_group):
- """获取对应组自动调整阈值参数:梯度,最大步长"""
- now_h = datetime.datetime.now().hour
- update_threshold_params = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type))
- gradient = update_threshold_params.get('gradient')
- max_update_step = update_threshold_params.get('max_update_step')
- target_uv_param = update_threshold_params.get('target_uv_param', {})
- target_uv_param_group = target_uv_param.get(ab_group, None)
- if target_uv_param_group is not None:
- special_update_config = target_uv_param_group.get('special_update_config', None)
- if special_update_config is not None:
- special_hours = special_update_config.get('special_hours', [])
- if now_h in special_hours:
- gradient = special_update_config.get('special_gradient')
- max_update_step = special_update_config.get('special_max_update_step')
- return gradient, max_update_step
- def timer_check():
- try:
- # 获取自动调整阈值参数
- ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
- # 自动调整阈值参数存储至redis
- redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_PARAM_RECORD,
- value=str(ad_abtest_abcode_config),
- expire_time=24 * 3600)
- project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
- table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
- now_date = datetime.datetime.today()
- now_h = datetime.datetime.now().hour
- now_min = datetime.datetime.now().minute
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- # 00:00 - 09:00 不做阈值参数调整
- if 0 <= now_h < 9:
- log_.info(f"00:00 - 09:00 不做阈值参数调整")
- return
- # 管理后台获取开启自动调整阈值开关的目标uv值
- ad_target_uv = get_ad_target_uv(now_h=now_h)
- log_.info(f"ad_target_uv: {ad_target_uv}")
- if len(ad_target_uv) == 0:
- return
- # 查看当前更新的数据是否已准备好
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
- data_count = data_check(project=project, table=table, dt=dt)
- if data_count > 0:
- log_.info(f"data count = {data_count}")
- # 数据准备好,进行更新
- robot_msg_record = update_ad_abtest_threshold(
- project=project, table=table, dt=dt,
- ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv)
- if len(robot_msg_record) > 0:
- robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
- msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n"
- else:
- msg = "无需更新!\n"
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
- )
- log_.info(f"threshold update end!")
- elif now_min > 30:
- log_.info('threshold update data is None!')
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
- )
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, timer_check).start()
- except Exception as e:
- log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- timer_check()
|