|  | @@ -2,6 +2,9 @@ import datetime
 | 
											
												
													
														|  |  import traceback
 |  |  import traceback
 | 
											
												
													
														|  |  import numpy as np
 |  |  import numpy as np
 | 
											
												
													
														|  |  from threading import Timer
 |  |  from threading import Timer
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +import pandas as pd
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 |  |  from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 | 
											
												
													
														|  |  from config import set_config
 |  |  from config import set_config
 | 
											
												
													
														|  |  from log import Log
 |  |  from log import Log
 | 
											
										
											
												
													
														|  | @@ -20,6 +23,7 @@ features = [
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
 |  |  def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
 | 
											
												
													
														|  |      """根据活跃人数变化计算新的阈值参数"""
 |  |      """根据活跃人数变化计算新的阈值参数"""
 | 
											
												
													
														|  | 
 |  | +    robot_msg_record = []
 | 
											
												
													
														|  |      threshold_record_new = threshold_record.copy()
 |  |      threshold_record_new = threshold_record.copy()
 | 
											
												
													
														|  |      for app_type, config_params in ad_abtest_abcode_config.items():
 |  |      for app_type, config_params in ad_abtest_abcode_config.items():
 | 
											
												
													
														|  |          # 获取对应端的数据, 更新阈值参数
 |  |          # 获取对应端的数据, 更新阈值参数
 | 
											
										
											
												
													
														|  | @@ -34,14 +38,19 @@ def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_reco
 | 
											
												
													
														|  |              if len(ab_code_list) > 0:
 |  |              if len(ab_code_list) > 0:
 | 
											
												
													
														|  |                  b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
 |  |                  b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
 | 
											
												
													
														|  |                  if b_mean < 0:
 |  |                  if b_mean < 0:
 | 
											
												
													
														|  | 
 |  | +                    # 阈值调高
 | 
											
												
													
														|  |                      threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update
 |  |                      threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update
 | 
											
												
													
														|  |                  elif b_mean > 0.1:
 |  |                  elif b_mean > 0.1:
 | 
											
												
													
														|  | -                    threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update
 |  | 
 | 
											
												
													
														|  | 
 |  | +                    # 阈值调低
 | 
											
												
													
														|  | 
 |  | +                    threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - threshold_update
 | 
											
												
													
														|  |                  else:
 |  |                  else:
 | 
											
												
													
														|  |                      continue
 |  |                      continue
 | 
											
												
													
														|  |                  if threshold_param_new > 0:
 |  |                  if threshold_param_new > 0:
 | 
											
												
													
														|  |                      threshold_record_new[ad_abtest_tag] = threshold_param_new
 |  |                      threshold_record_new[ad_abtest_tag] = threshold_param_new
 | 
											
												
													
														|  | -    return threshold_record_new
 |  | 
 | 
											
												
													
														|  | 
 |  | +                    robot_msg_record.append({'appType': app_type, 'ad_abtest_tag': ad_abtest_tag,
 | 
											
												
													
														|  | 
 |  | +                                             'param_old': float(threshold_record.get(ad_abtest_tag)),
 | 
											
												
													
														|  | 
 |  | +                                             'param_new': threshold_param_new})
 | 
											
												
													
														|  | 
 |  | +    return threshold_record_new, robot_msg_record
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  def update_threshold(threshold_record_old, threshold_record_new):
 |  |  def update_threshold(threshold_record_old, threshold_record_new):
 | 
											
										
											
												
													
														|  | @@ -78,15 +87,15 @@ def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
 | 
											
												
													
														|  |      feature_df['apptype'] = feature_df['apptype'].astype(int)
 |  |      feature_df['apptype'] = feature_df['apptype'].astype(int)
 | 
											
												
													
														|  |      feature_df['b'] = feature_df['b'].astype(float)
 |  |      feature_df['b'] = feature_df['b'].astype(float)
 | 
											
												
													
														|  |      # 根据活跃人数变化计算新的阈值参数
 |  |      # 根据活跃人数变化计算新的阈值参数
 | 
											
												
													
														|  | -    threshold_record_new = get_threshold_record_new(ad_abtest_abcode_config=ad_abtest_abcode_config,
 |  | 
 | 
											
												
													
														|  | -                                                    feature_df=feature_df, threshold_record=threshold_record)
 |  | 
 | 
											
												
													
														|  | 
 |  | +    threshold_record_new, robot_msg_record = get_threshold_record_new(
 | 
											
												
													
														|  | 
 |  | +        ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record)
 | 
											
												
													
														|  |      log_.info(f"threshold_record_new = {threshold_record_new}")
 |  |      log_.info(f"threshold_record_new = {threshold_record_new}")
 | 
											
												
													
														|  |      # 更新阈值
 |  |      # 更新阈值
 | 
											
												
													
														|  |      update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
 |  |      update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
 | 
											
												
													
														|  |      # 更新阈值参数
 |  |      # 更新阈值参数
 | 
											
												
													
														|  |      redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
 |  |      redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
 | 
											
												
													
														|  |                                     value=str(threshold_record_new))
 |  |                                     value=str(threshold_record_new))
 | 
											
												
													
														|  | -    return threshold_record, threshold_record_new
 |  | 
 | 
											
												
													
														|  | 
 |  | +    return robot_msg_record
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  def timer_check():
 |  |  def timer_check():
 | 
											
										
											
												
													
														|  | @@ -104,14 +113,18 @@ def timer_check():
 | 
											
												
													
														|  |          if data_count > 0:
 |  |          if data_count > 0:
 | 
											
												
													
														|  |              log_.info(f"data count = {data_count}")
 |  |              log_.info(f"data count = {data_count}")
 | 
											
												
													
														|  |              # 数据准备好,进行更新
 |  |              # 数据准备好,进行更新
 | 
											
												
													
														|  | -            threshold_record, threshold_record_new = update_ad_abtest_threshold(
 |  | 
 | 
											
												
													
														|  | 
 |  | +            robot_msg_record = update_ad_abtest_threshold(
 | 
											
												
													
														|  |                  project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config)
 |  |                  project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config)
 | 
											
												
													
														|  | 
 |  | +            if len(robot_msg_record) > 0:
 | 
											
												
													
														|  | 
 |  | +                robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
 | 
											
												
													
														|  | 
 |  | +                msg = f"threshold_param_update: \n{robot_msg_record_text}\n"
 | 
											
												
													
														|  | 
 |  | +            else:
 | 
											
												
													
														|  | 
 |  | +                msg = "无需更新!\n"
 | 
											
												
													
														|  |              send_msg_to_feishu(
 |  |              send_msg_to_feishu(
 | 
											
												
													
														|  |                  webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
 |  |                  webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
 | 
											
												
													
														|  |                  key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
 |  |                  key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
 | 
											
												
													
														|  | -                msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n"
 |  | 
 | 
											
												
													
														|  | -                         f"threshold_param_old: {threshold_record}\n"
 |  | 
 | 
											
												
													
														|  | -                         f"threshold_param_new: {threshold_record_new}\n"
 |  | 
 | 
											
												
													
														|  | 
 |  | +                msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |              )
 |  |              )
 | 
											
												
													
														|  |              log_.info(f"threshold update end!")
 |  |              log_.info(f"threshold update end!")
 | 
											
												
													
														|  |          elif now_min > 30:
 |  |          elif now_min > 30:
 |