import time import datetime import xgboost as xgb from xgboost.sklearn import XGBClassifier from threading import Timer from utils import RedisHelper, data_check from config import set_config from ad_feature_process import get_feature_data redis_helper = RedisHelper() config_, _ = set_config() xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb'] features = [ 'apptype', 'subsessionid', 'mid', 'videoid', 'mid_preview_count_30day', 'mid_view_count_30day', 'mid_view_count_pv_30day', 'mid_play_count_30day', 'mid_play_count_pv_30day', 'mid_share_count_30day', 'mid_share_count_pv_30day', 'mid_return_count_30day', 'mid_share_rate_30day', 'mid_return_rate_30day', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] # 模型加载 model = XGBClassifier() booster = xgb.Booster() booster.load_model('./data/ad_xgb.model') model._Booster = booster def threshold_update(project, table, dt, app_type): # 使用前一天的数据进行预测,给定阈值 # 1. 获取特征 feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt, app_type=app_type) # 缺失值填充 feature_initial_df.fillna(0, inplace=True) # 数据类型校正 type_int_columns = [ 'mid_preview_count_30day', 'mid_view_count_30day', 'mid_view_count_pv_30day', 'mid_play_count_30day', 'mid_play_count_pv_30day', 'mid_share_count_30day', 'mid_share_count_pv_30day', 'mid_return_count_30day', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', ] for column_name in type_int_columns: feature_initial_df[column_name] = feature_initial_df[column_name].astype(int) type_float_columns = [ 'mid_share_rate_30day', 'mid_return_rate_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] for column_name in type_float_columns: feature_initial_df[column_name] = feature_initial_df[column_name].astype(float) print(f"feature_initial_df shape: {feature_initial_df.shape}") # 获取所需的字段 predict_df = feature_initial_df[features[4:]] print(f"predict_df shape: {predict_df.shape}") # 2. 不出广告情况下的预测 predict_df_0 = predict_df.copy() predict_df_0['ad_status'] = 0 y_pred_proba_0 = model.predict_proba(predict_df_0) # 3. 出广告情况下的预测 predict_df_1 = predict_df.copy() predict_df_1['ad_status'] = 1 y_pred_proba_1 = model.predict_proba(predict_df_1) predict_df['y_0'] = [x[1] for x in y_pred_proba_0] print(f"predict_df shape: {predict_df.shape}") predict_df['y_1'] = [x[1] for x in y_pred_proba_1] print(f"predict_df shape: {predict_df.shape}") # 4. 做差值 predict_df['res_predict'] = predict_df['y_0'] - predict_df['y_1'] print(f"predict_df shape: {predict_df.shape}") # 5. 计算阈值 # 获取对应实验id abtest_id_mapping = xgb_config['abtest_id_mapping'] abtest_id = abtest_id_mapping[app_type] # 获取阈值参数记录 threshold_record = redis_helper.get_data_from_redis(key_name=xgb_config['threshold_record']) threshold_record = eval(threshold_record) record = threshold_record[abtest_id] # 分实验组进行阈值计算 predict_mean = predict_df['res_predict'].mean() for ab_code, param in record.items(): threshold = predict_mean * param print(f"{abtest_id}-{ab_code}: {threshold}") # 写入redis threshold_key = f"{xgb_config['threshold_key_prefix']}{abtest_id}:{ab_code}" redis_helper.set_data_to_redis(key_name=threshold_key, value=threshold, expire_time=48 * 3600) redis_helper.set_data_to_redis(key_name=xgb_config['threshold_record'], value=str(threshold_record), expire_time=2 * 24 * 3600) print("update threshold finished!") def timer_check(): project = 'loghubods' table = 'admodel_data_train' now_date = datetime.datetime.today() dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d') # 查看当前更新的数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: print(f"data count = {data_count}") # 数据准备好,进行更新 for app_type, _ in xgb_config['abtest_id_mapping'].items(): print(f"app_type: {app_type} threshold update start...") threshold_update(project=project, table=table, dt=dt, app_type=app_type) print(f"app_type: {app_type} threshold update finished!") print(f"threshold update end!") else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() if __name__ == '__main__': st_time = time.time() timer_check() print(f"execute time: {time.time() - st_time}s")