xgboost_train.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import pandas as pd
  6. import xgboost as xgb
  7. from client import ODPSClient
  8. odps_client = ODPSClient.ODPSClient()
  9. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  10. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  11. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  12. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  13. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  14. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  15. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  16. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  17. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  18. # 配置日志格式和日志级别
  19. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  20. # 创建一个logger
  21. logger = logging.getLogger(__name__)
  22. def get_partition_df(table, dt):
  23. logger.info(f"开始下载: {table} -- {dt} 的数据")
  24. download_session = odps_client.get_download_session(table, dt)
  25. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  26. with download_session.open_arrow_reader(0, download_session.count) as reader:
  27. # 将所有数据加载到 DataFrame 中
  28. df = pd.concat([batch.to_pandas() for batch in reader])
  29. logger.info(f"下载结束: {table} -- {dt} 的数据")
  30. return df
  31. def fetch_label_data(label_dt):
  32. """
  33. 获取 label数据
  34. :return:
  35. """
  36. logger.info(f"fetch_label_data.dt: {label_dt}")
  37. def extract_label(row):
  38. feature = json.loads(row['feature'])
  39. return pd.Series({
  40. 'vid': row['vid'],
  41. '分母': int(feature['1_vov0_分母']),
  42. "分子": feature['1_vov0_分子'],
  43. 'vov0': feature['1_vov0']
  44. })
  45. train_df = get_partition_df("alg_vid_vov_new", label_dt)
  46. applied_df = train_df.apply(extract_label, axis=1)
  47. # 计算曝光占比
  48. view_sum = applied_df['分母'].sum()
  49. applied_df['曝光占比'] = round(applied_df['分母'] / view_sum, 6)
  50. return applied_df
  51. def fetch_feature_data(feature_dt):
  52. """
  53. 获取feature数据
  54. :return:
  55. """
  56. logger.info(f"fetch_feature_data.dt: {feature_dt}")
  57. def extract_feature(row):
  58. feature = json.loads(row['feature'])
  59. return pd.Series({
  60. 'vid': row['vid'],
  61. **feature
  62. })
  63. feature_df = get_partition_df("alg_vid_vov_new", feature_dt)
  64. return feature_df.apply(extract_feature, axis=1)
  65. def fetch_data(label_datetime: datetime):
  66. label_dt = label_datetime.strftime("%Y%m%d")
  67. feature_dt = (label_datetime - timedelta(days=1)).strftime("%Y%m%d")
  68. with concurrent.futures.ThreadPoolExecutor(2) as executor:
  69. label_future = executor.submit(fetch_label_data, label_dt)
  70. feature_future = executor.submit(fetch_feature_data, feature_dt)
  71. label_apply_df = label_future.result()
  72. feature_apply_df = feature_future.result()
  73. df = pd.merge(label_apply_df, feature_apply_df, on="vid", how='left')
  74. df.fillna(0, inplace=True)
  75. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  76. for col in column_names:
  77. df[col] = pd.to_numeric(df[col], errors='coerce')
  78. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  79. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  80. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  81. feature_array = df[features_name].values
  82. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  83. label_array = df["label"].values
  84. return df, feature_array, label_array
  85. def _main():
  86. logger.info(f"XGB模型训练")
  87. df, trains_array, trains_label_array = fetch_data((datetime.now() - timedelta(days=2)))
  88. logger.info("特征获取完成,开始训练")
  89. model = xgb.XGBClassifier(
  90. n_estimators=100,
  91. learning_rate=0.01,
  92. max_depth=5,
  93. min_child_weight=1,
  94. gamma=0,
  95. subsample=0.8,
  96. colsample_bytree=0.8,
  97. objective='binary:logistic',
  98. nthread=8,
  99. scale_pos_weight=1,
  100. random_state=2024,
  101. seed=2024,
  102. )
  103. model.fit(trains_array, trains_label_array, verbose=True)
  104. logger.info("获取评测数据")
  105. df_test, tests_array, _ = fetch_data(datetime.now() - timedelta(days=1))
  106. y_pred = model.predict_proba(tests_array)[:, 1]
  107. df_test["y_pred"] = y_pred
  108. condition_choose = ((df_test['y_pred'] <= 0.2)
  109. # & ((df_test['1_vov0_分母'] > 50) | (df_test['2_vov0_分母'] > 50) | (df_test['3_vov0_分母'] > 50))
  110. & (df_test.index <= 10000)
  111. )
  112. profit_threshold = 0.3
  113. condition_choose_real = condition_choose & (df_test['vov0'] <= profit_threshold)
  114. df_test["condition_choose"] = condition_choose
  115. df_test[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  116. "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False)
  117. choose_bad = condition_choose.sum()
  118. choose_bad_real_bad = condition_choose_real.sum()
  119. acc = choose_bad_real_bad / choose_bad
  120. logger.info(
  121. f"acc:{acc} "
  122. f"分子={choose_bad_real_bad} "
  123. f"分母={choose_bad} "
  124. f"总视频数={df_test.size} "
  125. f"盈利计算标注vov0大于:{profit_threshold}"
  126. )
  127. surface = df_test.loc[condition_choose, '曝光占比'].sum()
  128. surface_income = df_test.loc[condition_choose_real, '曝光占比'].sum()
  129. logger.info(
  130. f"总影响面:{round(surface, 6)} "
  131. f"盈利影响面:{round(surface_income, 6)} "
  132. f"亏损影响面:{round(surface - surface_income, 6)}"
  133. )
  134. df_test["profit_loss_value"] = df_test['分母'] * (df_test['vov0'] - profit_threshold)
  135. profit_loss_value = df_test.loc[condition_choose, 'profit_loss_value'].sum()
  136. profit_value = df_test.loc[condition_choose_real, 'profit_loss_value'].sum()
  137. logger.info(
  138. f"总盈亏:{round(profit_loss_value, 1)} "
  139. f"纯盈利:{round(profit_value, 1)} "
  140. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  141. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  142. )
  143. if __name__ == '__main__':
  144. try:
  145. _main()
  146. except Exception as e:
  147. logger.error("VOV过滤XGB模型训练异常: ", e)