| 
					
				 | 
			
			
				@@ -0,0 +1,140 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import datetime 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import traceback 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import numpy as np 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from threading import Timer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from config import set_config 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from log import Log 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+config_, _ = set_config() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+log_ = Log() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+redis_helper = RedisHelper() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+features = [ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'apptype', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'adcode', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'visit_uv_today', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'visit_uv_yesterday', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'b' 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """根据活跃人数变化计算新的阈值参数""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    threshold_record_new = threshold_record.copy() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for app_type, config_params in ad_abtest_abcode_config.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # 获取对应端的数据, 更新阈值参数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # log_.info(f"app_type = {app_type}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        temp_df = feature_df[feature_df['apptype'] == app_type] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ab_test_id = config_params.get('ab_test_id') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ab_test_config = config_params.get('ab_test_config') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        threshold_update = config_params.get('threshold_update') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for config_name, ab_code_list in ab_test_config.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ad_abtest_tag = f"{ab_test_id}-{config_name}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # log_.info(f"ad_abtest_tag = {ad_abtest_tag}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if len(ab_code_list) > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if b_mean < 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                elif b_mean > 0.1: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    continue 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if threshold_param_new > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    threshold_record_new[ad_abtest_tag] = threshold_param_new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return threshold_record_new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def update_threshold(threshold_record_old, threshold_record_new): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """更新阈值""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         for group in group_list] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ad_mid_group_list = list(set(ad_mid_group_list)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for ad_abtest_tag, threshold_param_new in threshold_record_new.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        threshold_param_old = threshold_record_old.get(ad_abtest_tag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.info(f"ad_abtest_tag = {ad_abtest_tag}, " 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                  f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tag_list = ad_abtest_tag.split('-') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for group_key in ad_mid_group_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 获取对应的阈值 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            threshold_old = redis_helper.get_data_from_redis(key_name=key_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if threshold_old is None: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                continue 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 计算新的阈值 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, " 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      f"threshold_old = {threshold_old}, threshold_new = {threshold_new}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 更新redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 获取当前阈值参数值 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    threshold_record = eval(threshold_record) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"threshold_record = {threshold_record}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 获取uv数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    feature_df = get_feature_data(project=project, table=table, features=features, dt=dt) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    feature_df['apptype'] = feature_df['apptype'].astype(int) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    feature_df['b'] = feature_df['b'].astype(float) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 根据活跃人数变化计算新的阈值参数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    threshold_record_new = get_threshold_record_new(ad_abtest_abcode_config=ad_abtest_abcode_config, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                    feature_df=feature_df, threshold_record=threshold_record) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"threshold_record_new = {threshold_record_new}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 更新阈值 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 更新阈值参数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                   value=str(threshold_record_new)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return threshold_record, threshold_record_new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def timer_check(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        now_date = datetime.datetime.today() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        now_min = datetime.datetime.now().minute 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # 查看当前更新的数据是否已准备好 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        data_count = data_check(project=project, table=table, dt=dt) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if data_count > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log_.info(f"data count = {data_count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 数据准备好,进行更新 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            threshold_record, threshold_record_new = update_ad_abtest_threshold( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            send_msg_to_feishu( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"threshold_param_old: {threshold_record}\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"threshold_param_new: {threshold_record_new}\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log_.info(f"threshold update end!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        elif now_min > 30: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log_.info('threshold update data is None!') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            send_msg_to_feishu( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 数据没准备好,1分钟后重新检查 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Timer(60, timer_check).start() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception as e: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        send_msg_to_feishu( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                     f"exception: {e}\n" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                     f"traceback: {traceback.format_exc()}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+if __name__ == '__main__': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    timer_check() 
			 |