123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import time
- import datetime
- import xgboost as xgb
- from xgboost.sklearn import XGBClassifier
- from threading import Timer
- from utils import RedisHelper, data_check
- from config import set_config
- from ad_feature_process import get_feature_data
- redis_helper = RedisHelper()
- config_, _ = set_config()
- xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
- features = [
- 'apptype',
- 'subsessionid',
- 'mid',
- 'videoid',
- 'mid_preview_count_30day',
- 'mid_view_count_30day',
- 'mid_view_count_pv_30day',
- 'mid_play_count_30day',
- 'mid_play_count_pv_30day',
- 'mid_share_count_30day',
- 'mid_share_count_pv_30day',
- 'mid_return_count_30day',
- 'mid_share_rate_30day',
- 'mid_return_rate_30day',
- 'video_preview_count_uv_30day',
- 'video_preview_count_pv_30day',
- 'video_view_count_uv_30day',
- 'video_view_count_pv_30day',
- 'video_play_count_uv_30day',
- 'video_play_count_pv_30day',
- 'video_share_count_uv_30day',
- 'video_share_count_pv_30day',
- 'video_return_count_30day',
- 'video_ctr_uv_30day',
- 'video_ctr_pv_30day',
- 'video_share_rate_uv_30day',
- 'video_share_rate_pv_30day',
- 'video_return_rate_30day',
- ]
- # 模型加载
- model = XGBClassifier()
- booster = xgb.Booster()
- booster.load_model('./data/ad_xgb.model')
- model._Booster = booster
- def threshold_update(project, table, dt, app_type):
- # 使用前一天的数据进行预测,给定阈值
- # 1. 获取特征
- feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt, app_type=app_type)
- # 缺失值填充
- feature_initial_df.fillna(0, inplace=True)
- # 数据类型校正
- type_int_columns = [
- 'mid_preview_count_30day',
- 'mid_view_count_30day',
- 'mid_view_count_pv_30day',
- 'mid_play_count_30day',
- 'mid_play_count_pv_30day',
- 'mid_share_count_30day',
- 'mid_share_count_pv_30day',
- 'mid_return_count_30day',
- 'video_preview_count_uv_30day',
- 'video_preview_count_pv_30day',
- 'video_view_count_uv_30day',
- 'video_view_count_pv_30day',
- 'video_play_count_uv_30day',
- 'video_play_count_pv_30day',
- 'video_share_count_uv_30day',
- 'video_share_count_pv_30day',
- 'video_return_count_30day',
- ]
- for column_name in type_int_columns:
- feature_initial_df[column_name] = feature_initial_df[column_name].astype(int)
- type_float_columns = [
- 'mid_share_rate_30day',
- 'mid_return_rate_30day',
- 'video_ctr_uv_30day',
- 'video_ctr_pv_30day',
- 'video_share_rate_uv_30day',
- 'video_share_rate_pv_30day',
- 'video_return_rate_30day',
- ]
- for column_name in type_float_columns:
- feature_initial_df[column_name] = feature_initial_df[column_name].astype(float)
- print(f"feature_initial_df shape: {feature_initial_df.shape}")
- # 获取所需的字段
- predict_df = feature_initial_df[features[4:]]
- # 2. 不出广告情况下的预测
- predict_df_0 = predict_df.copy()
- predict_df_0['ad_status'] = 0
- y_pred_proba_0 = model.predict_proba(predict_df_0)
- predict_df['y_0'] = [x[1] for x in y_pred_proba_0]
- print(f"predict_df shape: {predict_df.shape}")
- # 3. 出广告情况下的预测
- predict_df_1 = predict_df.copy()
- predict_df_1['ad_status'] = 1
- y_pred_proba_1 = model.predict_proba(predict_df_1)
- predict_df['y_1'] = [x[1] for x in y_pred_proba_1]
- print(f"predict_df shape: {predict_df.shape}")
- # 4. 做差值
- predict_df['res_predict'] = predict_df['y_0'] - predict_df['y_1']
- print(f"predict_df shape: {predict_df.shape}")
- # 5. 计算阈值
- # 获取对应实验id
- abtest_id_mapping = xgb_config['abtest_id_mapping']
- abtest_id = abtest_id_mapping[app_type]
- # 获取阈值参数记录
- threshold_record = redis_helper.get_data_from_redis(key_name=xgb_config['threshold_record'])
- threshold_record = eval(threshold_record)
- record = threshold_record[abtest_id]
- # 分实验组进行阈值计算
- predict_mean = predict_df['res_predict'].mean()
- for ab_code, param in record.items():
- threshold = predict_mean * param
- # 写入redis
- threshold_key = f"{xgb_config['threshold_key_prefix']}{abtest_id}:{ab_code}"
- redis_helper.set_data_to_redis(key_name=threshold_key, value=threshold, expire_time=48 * 3600)
- print("update threshold finished!")
- def timer_check():
- project = 'loghubods'
- table = 'admodel_data_train'
- now_date = datetime.datetime.today()
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
- # 查看当前更新的数据是否已准备好
- data_count = data_check(project=project, table=table, dt=dt)
- if data_count > 0:
- print(f"data count = {data_count}")
- # 数据准备好,进行更新
- for app_type, _ in xgb_config['abtest_id_mapping'].items():
- print(f"app_type: {app_type} threshold update start...")
- threshold_update(project=project, table=table, dt=dt, app_type=app_type)
- print(f"app_type: {app_type} threshold update finished!")
- print(f"threshold update end!")
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, timer_check).start()
- if __name__ == '__main__':
- st_time = time.time()
- timer_check()
- print(f"execute time: {time.time() - st_time}s")
|