|
@@ -0,0 +1,153 @@
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+import xgboost as xgb
|
|
|
+from xgboost.sklearn import XGBClassifier
|
|
|
+from threading import Timer
|
|
|
+from utils import RedisHelper, data_check
|
|
|
+from config import set_config
|
|
|
+from ad_feature_process import get_feature_data
|
|
|
+redis_helper = RedisHelper()
|
|
|
+config_, _ = set_config()
|
|
|
+xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'apptype',
|
|
|
+ 'subsessionid',
|
|
|
+ 'mid',
|
|
|
+ 'videoid',
|
|
|
+ 'mid_preview_count_30day',
|
|
|
+ 'mid_view_count_30day',
|
|
|
+ 'mid_view_count_pv_30day',
|
|
|
+ 'mid_play_count_30day',
|
|
|
+ 'mid_play_count_pv_30day',
|
|
|
+ 'mid_share_count_30day',
|
|
|
+ 'mid_share_count_pv_30day',
|
|
|
+ 'mid_return_count_30day',
|
|
|
+ 'mid_share_rate_30day',
|
|
|
+ 'mid_return_rate_30day',
|
|
|
+ 'video_preview_count_uv_30day',
|
|
|
+ 'video_preview_count_pv_30day',
|
|
|
+ 'video_view_count_uv_30day',
|
|
|
+ 'video_view_count_pv_30day',
|
|
|
+ 'video_play_count_uv_30day',
|
|
|
+ 'video_play_count_pv_30day',
|
|
|
+ 'video_share_count_uv_30day',
|
|
|
+ 'video_share_count_pv_30day',
|
|
|
+ 'video_return_count_30day',
|
|
|
+ 'video_ctr_uv_30day',
|
|
|
+ 'video_ctr_pv_30day',
|
|
|
+ 'video_share_rate_uv_30day',
|
|
|
+ 'video_share_rate_pv_30day',
|
|
|
+ 'video_return_rate_30day',
|
|
|
+]
|
|
|
+
|
|
|
+# 模型加载
|
|
|
+model = XGBClassifier()
|
|
|
+booster = xgb.Booster()
|
|
|
+booster.load_model('./data/ad_xgb.model')
|
|
|
+model._Booster = booster
|
|
|
+
|
|
|
+
|
|
|
+def threshold_update(project, table, dt, app_type):
|
|
|
+ # 使用前一天的数据进行预测,给定阈值
|
|
|
+ # 1. 获取特征
|
|
|
+ feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt, app_type=app_type)
|
|
|
+ # 缺失值填充
|
|
|
+ feature_initial_df.fillna(0, inplace=True)
|
|
|
+ # 数据类型校正
|
|
|
+ type_int_columns = [
|
|
|
+ 'mid_preview_count_30day',
|
|
|
+ 'mid_view_count_30day',
|
|
|
+ 'mid_view_count_pv_30day',
|
|
|
+ 'mid_play_count_30day',
|
|
|
+ 'mid_play_count_pv_30day',
|
|
|
+ 'mid_share_count_30day',
|
|
|
+ 'mid_share_count_pv_30day',
|
|
|
+ 'mid_return_count_30day',
|
|
|
+ 'video_preview_count_uv_30day',
|
|
|
+ 'video_preview_count_pv_30day',
|
|
|
+ 'video_view_count_uv_30day',
|
|
|
+ 'video_view_count_pv_30day',
|
|
|
+ 'video_play_count_uv_30day',
|
|
|
+ 'video_play_count_pv_30day',
|
|
|
+ 'video_share_count_uv_30day',
|
|
|
+ 'video_share_count_pv_30day',
|
|
|
+ 'video_return_count_30day',
|
|
|
+ ]
|
|
|
+ for column_name in type_int_columns:
|
|
|
+ feature_initial_df[column_name] = feature_initial_df[column_name].astype(int)
|
|
|
+ type_float_columns = [
|
|
|
+ 'mid_share_rate_30day',
|
|
|
+ 'mid_return_rate_30day',
|
|
|
+ 'video_ctr_uv_30day',
|
|
|
+ 'video_ctr_pv_30day',
|
|
|
+ 'video_share_rate_uv_30day',
|
|
|
+ 'video_share_rate_pv_30day',
|
|
|
+ 'video_return_rate_30day',
|
|
|
+ ]
|
|
|
+ for column_name in type_float_columns:
|
|
|
+ feature_initial_df[column_name] = feature_initial_df[column_name].astype(float)
|
|
|
+ print(f"feature_initial_df shape: {feature_initial_df.shape}")
|
|
|
+ # 获取所需的字段
|
|
|
+ predict_df = feature_initial_df[features[4:]]
|
|
|
+
|
|
|
+ # 2. 不出广告情况下的预测
|
|
|
+ predict_df_0 = predict_df.copy()
|
|
|
+ predict_df_0['ad_status'] = 0
|
|
|
+ y_pred_proba_0 = model.predict_proba(predict_df_0)
|
|
|
+ predict_df['y_0'] = [x[1] for x in y_pred_proba_0]
|
|
|
+ print(f"predict_df shape: {predict_df.shape}")
|
|
|
+
|
|
|
+ # 3. 出广告情况下的预测
|
|
|
+ predict_df_1 = predict_df.copy()
|
|
|
+ predict_df_1['ad_status'] = 1
|
|
|
+ y_pred_proba_1 = model.predict_proba(predict_df_1)
|
|
|
+ predict_df['y_1'] = [x[1] for x in y_pred_proba_1]
|
|
|
+ print(f"predict_df shape: {predict_df.shape}")
|
|
|
+
|
|
|
+ # 4. 做差值
|
|
|
+ predict_df['res_predict'] = predict_df['y_0'] - predict_df['y_1']
|
|
|
+ print(f"predict_df shape: {predict_df.shape}")
|
|
|
+
|
|
|
+ # 5. 计算阈值
|
|
|
+ # 获取对应实验id
|
|
|
+ abtest_id_mapping = xgb_config['abtest_id_mapping']
|
|
|
+ abtest_id = abtest_id_mapping[app_type]
|
|
|
+ # 获取阈值参数记录
|
|
|
+ threshold_record = redis_helper.get_data_from_redis(key_name=xgb_config['threshold_record'])
|
|
|
+ threshold_record = eval(threshold_record)
|
|
|
+ record = threshold_record[abtest_id]
|
|
|
+ # 分实验组进行阈值计算
|
|
|
+ predict_mean = predict_df['res_predict'].mean()
|
|
|
+ for ab_code, param in record.items():
|
|
|
+ threshold = predict_mean * param
|
|
|
+ # 写入redis
|
|
|
+ threshold_key = f"{xgb_config['threshold_key_prefix']}{abtest_id}:{ab_code}"
|
|
|
+ redis_helper.set_data_to_redis(key_name=threshold_key, value=threshold, expire_time=48 * 3600)
|
|
|
+ print("update threshold finished!")
|
|
|
+
|
|
|
+
|
|
|
+def timer_check():
|
|
|
+ project = 'loghubods'
|
|
|
+ table = 'admodel_data_train'
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
|
+ # 查看当前更新的数据是否已准备好
|
|
|
+ data_count = data_check(project=project, table=table, dt=dt)
|
|
|
+ if data_count > 0:
|
|
|
+ print(f"data count = {data_count}")
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ for app_type, _ in xgb_config['abtest_id_mapping'].items():
|
|
|
+ print(f"app_type: {app_type} threshold update start...")
|
|
|
+ threshold_update(project=project, table=table, dt=dt, app_type=app_type)
|
|
|
+ print(f"app_type: {app_type} threshold update finished!")
|
|
|
+ print(f"threshold update end!")
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, timer_check).start()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ st_time = time.time()
|
|
|
+ timer_check()
|
|
|
+ print(f"execute time: {time.time() - st_time}s")
|