ad_xgboost_threshold_update.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import time
  2. import datetime
  3. import xgboost as xgb
  4. from xgboost.sklearn import XGBClassifier
  5. from threading import Timer
  6. from utils import RedisHelper, data_check
  7. from config import set_config
  8. from ad_feature_process import get_feature_data
  9. redis_helper = RedisHelper()
  10. config_, _ = set_config()
  11. xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
  12. features = [
  13. 'apptype',
  14. 'subsessionid',
  15. 'mid',
  16. 'videoid',
  17. 'mid_preview_count_30day',
  18. 'mid_view_count_30day',
  19. 'mid_view_count_pv_30day',
  20. 'mid_play_count_30day',
  21. 'mid_play_count_pv_30day',
  22. 'mid_share_count_30day',
  23. 'mid_share_count_pv_30day',
  24. 'mid_return_count_30day',
  25. 'mid_share_rate_30day',
  26. 'mid_return_rate_30day',
  27. 'video_preview_count_uv_30day',
  28. 'video_preview_count_pv_30day',
  29. 'video_view_count_uv_30day',
  30. 'video_view_count_pv_30day',
  31. 'video_play_count_uv_30day',
  32. 'video_play_count_pv_30day',
  33. 'video_share_count_uv_30day',
  34. 'video_share_count_pv_30day',
  35. 'video_return_count_30day',
  36. 'video_ctr_uv_30day',
  37. 'video_ctr_pv_30day',
  38. 'video_share_rate_uv_30day',
  39. 'video_share_rate_pv_30day',
  40. 'video_return_rate_30day',
  41. ]
  42. # 模型加载
  43. model = XGBClassifier()
  44. booster = xgb.Booster()
  45. booster.load_model('./data/ad_xgb.model')
  46. model._Booster = booster
  47. def threshold_update(project, table, dt, app_type):
  48. # 使用前一天的数据进行预测,给定阈值
  49. # 1. 获取特征
  50. feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt, app_type=app_type)
  51. # 缺失值填充
  52. feature_initial_df.fillna(0, inplace=True)
  53. # 数据类型校正
  54. type_int_columns = [
  55. 'mid_preview_count_30day',
  56. 'mid_view_count_30day',
  57. 'mid_view_count_pv_30day',
  58. 'mid_play_count_30day',
  59. 'mid_play_count_pv_30day',
  60. 'mid_share_count_30day',
  61. 'mid_share_count_pv_30day',
  62. 'mid_return_count_30day',
  63. 'video_preview_count_uv_30day',
  64. 'video_preview_count_pv_30day',
  65. 'video_view_count_uv_30day',
  66. 'video_view_count_pv_30day',
  67. 'video_play_count_uv_30day',
  68. 'video_play_count_pv_30day',
  69. 'video_share_count_uv_30day',
  70. 'video_share_count_pv_30day',
  71. 'video_return_count_30day',
  72. ]
  73. for column_name in type_int_columns:
  74. feature_initial_df[column_name] = feature_initial_df[column_name].astype(int)
  75. type_float_columns = [
  76. 'mid_share_rate_30day',
  77. 'mid_return_rate_30day',
  78. 'video_ctr_uv_30day',
  79. 'video_ctr_pv_30day',
  80. 'video_share_rate_uv_30day',
  81. 'video_share_rate_pv_30day',
  82. 'video_return_rate_30day',
  83. ]
  84. for column_name in type_float_columns:
  85. feature_initial_df[column_name] = feature_initial_df[column_name].astype(float)
  86. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  87. # 获取所需的字段
  88. predict_df = feature_initial_df[features[4:]]
  89. print(f"predict_df shape: {predict_df.shape}")
  90. # 2. 不出广告情况下的预测
  91. predict_df_0 = predict_df.copy()
  92. predict_df_0['ad_status'] = 0
  93. y_pred_proba_0 = model.predict_proba(predict_df_0)
  94. # 3. 出广告情况下的预测
  95. predict_df_1 = predict_df.copy()
  96. predict_df_1['ad_status'] = 1
  97. y_pred_proba_1 = model.predict_proba(predict_df_1)
  98. predict_df['y_0'] = [x[1] for x in y_pred_proba_0]
  99. print(f"predict_df shape: {predict_df.shape}")
  100. predict_df['y_1'] = [x[1] for x in y_pred_proba_1]
  101. print(f"predict_df shape: {predict_df.shape}")
  102. # 4. 做差值
  103. predict_df['res_predict'] = predict_df['y_0'] - predict_df['y_1']
  104. print(f"predict_df shape: {predict_df.shape}")
  105. # 5. 计算阈值
  106. # 获取对应实验id
  107. abtest_id_mapping = xgb_config['abtest_id_mapping']
  108. abtest_id = abtest_id_mapping[app_type]
  109. # 获取阈值参数记录
  110. threshold_record = redis_helper.get_data_from_redis(key_name=xgb_config['threshold_record'])
  111. threshold_record = eval(threshold_record)
  112. record = threshold_record[abtest_id]
  113. # 分实验组进行阈值计算
  114. predict_mean = predict_df['res_predict'].mean()
  115. for ab_code, param in record.items():
  116. threshold = predict_mean * param
  117. print(f"{abtest_id}-{ab_code}: {threshold}")
  118. # 写入redis
  119. threshold_key = f"{xgb_config['threshold_key_prefix']}{abtest_id}:{ab_code}"
  120. redis_helper.set_data_to_redis(key_name=threshold_key, value=threshold, expire_time=48 * 3600)
  121. redis_helper.set_data_to_redis(key_name=xgb_config['threshold_record'],
  122. value=str(threshold_record),
  123. expire_time=2 * 24 * 3600)
  124. print("update threshold finished!")
  125. def timer_check():
  126. project = 'loghubods'
  127. table = 'admodel_data_train'
  128. now_date = datetime.datetime.today()
  129. dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  130. # 查看当前更新的数据是否已准备好
  131. data_count = data_check(project=project, table=table, dt=dt)
  132. if data_count > 0:
  133. print(f"data count = {data_count}")
  134. # 数据准备好,进行更新
  135. for app_type, _ in xgb_config['abtest_id_mapping'].items():
  136. print(f"app_type: {app_type} threshold update start...")
  137. threshold_update(project=project, table=table, dt=dt, app_type=app_type)
  138. print(f"app_type: {app_type} threshold update finished!")
  139. print(f"threshold update end!")
  140. else:
  141. # 数据没准备好,1分钟后重新检查
  142. Timer(60, timer_check).start()
  143. if __name__ == '__main__':
  144. st_time = time.time()
  145. timer_check()
  146. print(f"execute time: {time.time() - st_time}s")