import datetime import traceback import numpy as np from threading import Timer import pandas as pd from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu from config import set_config from log import Log config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() features = [ 'apptype', 'adcode', 'visit_uv_today', 'visit_uv_yesterday', 'b' ] def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record): """根据活跃人数变化计算新的阈值参数""" robot_msg_record = [] threshold_record_new = threshold_record.copy() for app_type, config_params in ad_abtest_abcode_config.items(): # 获取对应端的数据, 更新阈值参数 # log_.info(f"app_type = {app_type}") temp_df = feature_df[feature_df['apptype'] == app_type] ab_test_id = config_params.get('ab_test_id') ab_test_config = config_params.get('ab_test_config') threshold_update = config_params.get('threshold_update') for config_name, ab_code_list in ab_test_config.items(): ad_abtest_tag = f"{ab_test_id}-{config_name}" # log_.info(f"ad_abtest_tag = {ad_abtest_tag}") if len(ab_code_list) > 0: b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean() if b_mean < 0: # 阈值调高 threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update elif b_mean > 0.1: # 阈值调低 threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - threshold_update else: continue if threshold_param_new > 0: threshold_record_new[ad_abtest_tag] = threshold_param_new robot_msg_record.append({'appType': app_type, 'ad_abtest_tag': ad_abtest_tag, 'param_old': float(threshold_record.get(ad_abtest_tag)), 'param_new': threshold_param_new}) return threshold_record_new, robot_msg_record def update_threshold(threshold_record_old, threshold_record_new): """更新阈值""" ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items() for group in group_list] ad_mid_group_list = list(set(ad_mid_group_list)) for ad_abtest_tag, threshold_param_new in threshold_record_new.items(): threshold_param_old = threshold_record_old.get(ad_abtest_tag) log_.info(f"ad_abtest_tag = {ad_abtest_tag}, " f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}") tag_list = ad_abtest_tag.split('-') for group_key in ad_mid_group_list: # 获取对应的阈值 key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}" threshold_old = redis_helper.get_data_from_redis(key_name=key_name) if threshold_old is None: continue # 计算新的阈值 threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, " f"threshold_old = {threshold_old}, threshold_new = {threshold_new}") # 更新redis redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new) def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config): # 获取当前阈值参数值 threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD) threshold_record = eval(threshold_record) log_.info(f"threshold_record = {threshold_record}") # 获取uv数据 feature_df = get_feature_data(project=project, table=table, features=features, dt=dt) feature_df['apptype'] = feature_df['apptype'].astype(int) feature_df['b'] = feature_df['b'].astype(float) # 根据活跃人数变化计算新的阈值参数 threshold_record_new, robot_msg_record = get_threshold_record_new( ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record) log_.info(f"threshold_record_new = {threshold_record_new}") # 更新阈值 update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new) # 更新阈值参数 redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD, value=str(threshold_record_new)) return robot_msg_record def timer_check(): try: ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project') table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table') now_date = datetime.datetime.today() now_min = datetime.datetime.now().minute log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}") dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H') # 查看当前更新的数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: log_.info(f"data count = {data_count}") # 数据准备好,进行更新 robot_msg_record = update_ad_abtest_threshold( project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config) if len(robot_msg_record) > 0: robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record]) msg = f"threshold_param_update: \n{robot_msg_record_text}\n" else: msg = "无需更新!\n" send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}" ) log_.info(f"threshold update end!") elif now_min > 30: log_.info('threshold update data is None!') send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n" ) else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() except Exception as e: log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': timer_check()