import copy import datetime import traceback import math import numpy as np from threading import Timer import pandas as pd from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get from config import set_config from log import Log config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() features = [ 'apptype', '分组', '广告uv' ] def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record): """根据活跃人数变化计算新的阈值参数""" robot_msg_record = [] threshold_record_new = threshold_record.copy() for app_type, config_params in ad_abtest_abcode_config.items(): # 获取对应端的数据, 更新阈值参数 # log_.info(f"app_type = {app_type}") temp_df = feature_df[feature_df['apptype'] == app_type] ab_test_id = config_params.get('ab_test_id') ab_test_config = config_params.get('ab_test_config') up_threshold_update = config_params.get('up_threshold_update') down_threshold_update = config_params.get('down_threshold_update') for config_name, ab_code_list in ab_test_config.items(): ad_abtest_tag = f"{ab_test_id}-{config_name}" # log_.info(f"ad_abtest_tag = {ad_abtest_tag}") if len(ab_code_list) > 0: b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean() if b_mean < 0: # 阈值按梯度调高 gradient = up_threshold_update[config_name].get('gradient') update_range = up_threshold_update[config_name].get('update_range') b_i = (b_mean * -1) // gradient + 1 threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i elif b_mean > 0.1: # 阈值按梯度调低 gradient = down_threshold_update[config_name].get('gradient') update_range = down_threshold_update[config_name].get('update_range') b_i = (b_mean - 0.1) // gradient + 1 threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i else: continue if threshold_param_new > 0: threshold_record_new[ad_abtest_tag] = threshold_param_new robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag, 'gradient': round(gradient, 4), 'range': round(update_range, 4), 'i': int(b_i), 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4), 'paramNew': round(threshold_param_new, 4)}) return threshold_record_new, robot_msg_record def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv): """根据广告uv计算新的阈值参数""" robot_msg_record = [] threshold_record_new = copy.deepcopy(threshold_record) # 根据目标uv进行调整 for app_type, target_uv_mapping in ad_target_uv.items(): # 获取app_type对应的目标uv temp_df = feature_df[feature_df['apptype'] == int(app_type)] # 获取app_type对应的阈值调整参数 update_threshold_params = ad_abtest_abcode_config.get(int(app_type)) ab_test_id = update_threshold_params.get('ab_test_id') not_update = update_threshold_params.get('not_update') gradient = update_threshold_params.get('gradient') max_update_step = update_threshold_params.get('max_update_step') threshold_update_mapping = update_threshold_params.get('threshold_update') threshold_record_old = threshold_record.get(ab_test_id) # print(ab_test_id, threshold_record, threshold_record_old) for ab_test_group, target_uv in target_uv_mapping.items(): if target_uv is None: continue gradient, max_update_step = get_ad_uv_update_config(app_type=app_type, ab_group=ab_test_group) update_range = threshold_update_mapping.get(ab_test_group) # 获取对应组的当前uv try: current_uv = temp_df[temp_df['分组'] == ab_test_group]['广告uv'].values[0] except Exception as e: continue # 计算uv差值 uv_differ = current_uv - target_uv if abs(uv_differ) <= not_update: continue # 获取当前阈值参数 threshold_param_old = threshold_record_old[ab_test_group].get('group') if uv_differ < 0: # 当前uv < 目标uv,阈值按梯度调低(第一个梯度区间:向上取整,之后:四舍五入) if abs(uv_differ) < gradient: step = math.ceil(abs(uv_differ) / gradient) else: step = round(abs(uv_differ) / gradient) step = max_update_step if step > max_update_step else step threshold_param_new = float(threshold_param_old) - update_range * step elif uv_differ > 0: # 当前uv > 目标uv,阈值按梯度调高(第一个梯度区间:向上取整,之后:四舍五入) if uv_differ < gradient: step = math.ceil(uv_differ / gradient) else: step = round(uv_differ / gradient) step = max_update_step if step > max_update_step else step threshold_param_new = float(threshold_param_old) + update_range * step else: continue if threshold_param_new <= 0: threshold_param_new = 0 log_.info( { 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group, 'targetUv': target_uv, 'currentUv': round(current_uv, 4), 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step, 'range': round(update_range, 4), 'paramOld': round(float(threshold_param_old), 4), 'paramNew': round(threshold_param_new, 4) } ) threshold_record_new[ab_test_id][ab_test_group]['group'] = threshold_param_new threshold_record_new[ab_test_id][ab_test_group]['mean_group'] = threshold_param_new robot_msg_record.append( { 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group, 'targetUv': target_uv, 'currentUv': round(current_uv, 4), 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step, 'range': round(update_range, 4), 'paramOld': round(float(threshold_param_old), 4), 'paramNew': round(threshold_param_new, 4) } ) return threshold_record_new, robot_msg_record def update_threshold(threshold_record_old, threshold_record_new): """更新阈值""" # 获取用户组列表 ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items() for group in group_list] ad_mid_group_list.append("mean_group") ad_mid_group_list = list(set(ad_mid_group_list)) # 获取实验配置列表 ad_abtest_config_mapping = {} abtest_id_list = [] for key, val in config_.AD_ABTEST_CONFIG.items(): abtest_id, abtest_config_tag = key.split('-') if abtest_id in abtest_id_list: ad_abtest_config_mapping[abtest_id].append((abtest_config_tag, val)) else: abtest_id_list.append(abtest_id) ad_abtest_config_mapping[abtest_id] = [(abtest_config_tag, val)] log_.info(f"ad_abtest_config_mapping = {ad_abtest_config_mapping}") # 计算新的阈值并更新 for abtest_id, threshold_param_mapping in threshold_record_new.items(): for abtest_group, threshold_param_new in threshold_param_mapping.items(): threshold_param_old = threshold_record_old[abtest_id].get(abtest_group) if str(threshold_param_old) == str(threshold_param_new): # print(abtest_id, abtest_group, threshold_param_old, threshold_param_new) continue log_.info(f"abtest_id = {abtest_id}, abtest_group = {abtest_group}, " f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}") for abtest_config_tag, config_val in ad_abtest_config_mapping.get(abtest_id, []): for group_key in ad_mid_group_list: # 获取对应的阈值 key_name = \ f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}" threshold_old = redis_helper.get_data_from_redis(key_name=key_name) if threshold_old is None: continue # 原阈值为0时,加10**(-5)兜底处理 if float(threshold_old) == 0: threshold_old = float(threshold_old) + 10**(-5) # 计算新的阈值 if group_key == 'mean_group': if threshold_param_old['mean_group'] == 0: threshold_new = \ float(threshold_old) / 10**(-5) * threshold_param_new['mean_group'] else: threshold_new = \ float(threshold_old) / threshold_param_old['mean_group'] * threshold_param_new['mean_group'] else: if threshold_param_old['group'] == 0: threshold_new = \ float(threshold_old) / 10**(-5) * threshold_param_new['group'] else: threshold_new = \ float(threshold_old) / threshold_param_old['group'] * threshold_param_new['group'] # 更新redis redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new, expire_time=2 * 24 * 3600) log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, " f"abtest_group = {abtest_group}, group_key = {group_key}, " f"threshold_old = {threshold_old}, threshold_new = {threshold_new}") # 关怀模式实验阈值更新 care_model = config_val.get('care_model', None) threshold_rate = config_val.get('threshold_rate', None) if care_model is True: care_model_key_name = \ f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}" care_model_threshold_old = redis_helper.get_data_from_redis(key_name=care_model_key_name) care_model_threshold_new = threshold_new * threshold_rate redis_helper.set_data_to_redis(key_name=care_model_key_name, value=care_model_threshold_new, expire_time=2 * 24 * 3600) log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, " f"abtest_group = {abtest_group}, group_key = {group_key}, " f"care_model_threshold_old = {care_model_threshold_old}, " f"care_model_threshold_new = {care_model_threshold_new}") def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv): # 获取当前阈值参数值 threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD) threshold_record = eval(threshold_record) log_.info(f"threshold_record = {threshold_record}") # 获取uv数据 feature_df = get_feature_data(project=project, table=table, features=features, dt=dt) feature_df['apptype'] = feature_df['apptype'].astype(int) feature_df['广告uv'] = feature_df['广告uv'].astype(float) # 根据广告uv变化计算新的阈值参数 threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv( ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record, ad_target_uv=ad_target_uv) log_.info(f"threshold_record_new = {threshold_record_new}") # 更新阈值 update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new) # 更新阈值参数 redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD, value=str(threshold_record_new), expire_time=2 * 24 * 3600) return robot_msg_record def get_ad_target_uv(now_h): """获取管理后台开启自动调整阈值开关的目标uv值""" ad_target_uv = {} result = request_get(request_url=config_.GET_AD_TARGET_UV_URL) if result is None: log_.info('获取管理后台广告目标uv值失败!') return ad_target_uv if result['code'] != 0: log_.info('获取管理后台广告目标uv值失败!') return ad_target_uv if not result['content']: return ad_target_uv for item in result['content']: app_type = item['productId'] target_uv_mapping = {} target_uv_param = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type)).get('target_uv_param', {}) for uv_item in item['uvTargetDetails']: ab_group = uv_item['abParam'] target_uv = uv_item['uvTarget'] target_uv_param_group = target_uv_param.get(ab_group, None) if target_uv_param_group is not None: update_hours = target_uv_param_group.get('update_hours') update_param = target_uv_param_group.get('update_param') if now_h in update_hours: target_uv *= update_param target_uv_mapping[ab_group] = target_uv ad_target_uv[app_type] = target_uv_mapping return ad_target_uv def get_ad_uv_update_config(app_type, ab_group): """获取对应组自动调整阈值参数:梯度,最大步长""" now_h = datetime.datetime.now().hour update_threshold_params = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type)) gradient = update_threshold_params.get('gradient') max_update_step = update_threshold_params.get('max_update_step') target_uv_param = update_threshold_params.get('target_uv_param', {}) target_uv_param_group = target_uv_param.get(ab_group, None) if target_uv_param_group is not None: special_update_config = target_uv_param_group.get('special_update_config', None) if special_update_config is not None: special_hours = special_update_config.get('special_hours', []) if now_h in special_hours: gradient = special_update_config.get('special_gradient') max_update_step = special_update_config.get('special_max_update_step') return gradient, max_update_step def timer_check(): try: # 获取自动调整阈值参数 ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG # 自动调整阈值参数存储至redis redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_PARAM_RECORD, value=str(ad_abtest_abcode_config), expire_time=24 * 3600) project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project') table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table') now_date = datetime.datetime.today() now_h = datetime.datetime.now().hour now_min = datetime.datetime.now().minute log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}") # 00:00 - 09:00 不做阈值参数调整 if 0 <= now_h < 9: log_.info(f"00:00 - 09:00 不做阈值参数调整") return # 管理后台获取开启自动调整阈值开关的目标uv值 ad_target_uv = get_ad_target_uv(now_h=now_h) log_.info(f"ad_target_uv: {ad_target_uv}") if len(ad_target_uv) == 0: return # 查看当前更新的数据是否已准备好 dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H') data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: log_.info(f"data count = {data_count}") # 数据准备好,进行更新 robot_msg_record = update_ad_abtest_threshold( project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv) if len(robot_msg_record) > 0: robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record]) msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n" else: msg = "无需更新!\n" send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}" ) log_.info(f"threshold update end!") elif now_min > 30: log_.info('threshold update data is None!') send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n" ) else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() except Exception as e: log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': timer_check()