xgboost_train.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import pandas as pd
  6. import xgboost as xgb
  7. from client import ODPSClient
  8. odps_client = ODPSClient.ODPSClient()
  9. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  10. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  11. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  12. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  13. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  14. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  15. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  16. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  17. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  18. # 创建一个logger
  19. logger = logging.getLogger("xgboost_train.py")
  20. logger.setLevel(logging.INFO) # 设置日志级别
  21. # 创建Handler用于输出到文件
  22. file_handler = logging.FileHandler('xgboost_train.log')
  23. file_handler.setLevel(logging.INFO) # 设置日志级别为INFO
  24. # 创建Handler用于输出到控制台
  25. console_handler = logging.StreamHandler()
  26. console_handler.setLevel(logging.INFO) # 设置日志级别为INFO
  27. # 定义日志格式
  28. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  29. file_handler.setFormatter(formatter)
  30. console_handler.setFormatter(formatter)
  31. # 将Handler添加到Logger
  32. logger.addHandler(file_handler)
  33. logger.addHandler(console_handler)
  34. def get_partition_df(table, dt):
  35. logger.info(f"开始下载: {table} -- {dt} 的数据")
  36. download_session = odps_client.get_download_session(table, dt)
  37. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  38. with download_session.open_arrow_reader(0, download_session.count) as reader:
  39. # 将所有数据加载到 DataFrame 中
  40. df = pd.concat([batch.to_pandas() for batch in reader])
  41. logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
  42. return df
  43. def fetch_label_data(label_datetime: datetime):
  44. """
  45. 获取 label数据
  46. :return:
  47. """
  48. label_dt = label_datetime.strftime("%Y%m%d")
  49. logger.info(f"fetch_label_data.dt: {label_dt}")
  50. # 获取数据
  51. label_df = get_partition_df("alg_vid_vov_new", label_dt)
  52. extracted_data = [
  53. {
  54. 'vid': int(row['vid']),
  55. }
  56. for _, row in label_df.iterrows()
  57. ]
  58. # 构造新的 DataFrame
  59. applied_df = pd.DataFrame(extracted_data)
  60. # 添加 title 列
  61. applied_df['title'] = "title"
  62. return applied_df
  63. def fetch_view_rate_data(view_date: datetime):
  64. """
  65. 获取曝光数据
  66. :return:
  67. """
  68. view_rate_dt = view_date.strftime("%Y%m%d")
  69. logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
  70. try:
  71. # 获取数据
  72. view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
  73. extracted_data = [
  74. {
  75. 'vid': int(row['vid']),
  76. '分母': int(feature['1_vov0_分母']),
  77. '分子': feature['1_vov0_分子'],
  78. 'vov0': feature['1_vov0']
  79. }
  80. for _, row in view_rate_df.iterrows()
  81. if (feature := json.loads(row['feature']))
  82. ]
  83. # 构造新的 DataFrame
  84. applied_df = pd.DataFrame(extracted_data)
  85. # 计算曝光占比,矢量化操作
  86. view_sum = applied_df['分母'].sum()
  87. applied_df['曝光占比'] = applied_df['分母'] / view_sum
  88. return applied_df
  89. except Exception as e:
  90. return pd.DataFrame({
  91. "vid": [-1],
  92. "分母": [0],
  93. "分子": [0],
  94. "vov0": [0],
  95. "曝光占比": [0]
  96. })
  97. def fetch_feature_data_dt(dt: str, index):
  98. """
  99. 查询某一天的特征数据,方便做特征数据时并行处理
  100. :param dt:
  101. :param index:
  102. :return:
  103. """
  104. logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
  105. df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
  106. today_dist_view_pv = df['today_dist_view_pv'].astype(int)
  107. today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
  108. day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
  109. day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
  110. # all_return_to_dist_view_pv
  111. t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
  112. t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
  113. # all_vov
  114. t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  115. t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
  116. t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  117. t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
  118. t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  119. t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
  120. # 构造结果DataFrame
  121. result_df = pd.DataFrame({
  122. 'vid': df['videoid'],
  123. f'{index}_vov0': t_0_all_vov,
  124. f'{index}_vov0_分子': today_return_to_dist_view_pv,
  125. f'{index}_vov0_分母': today_dist_view_pv,
  126. f'{index}_vov01': t_1_all_vov,
  127. f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
  128. f'{index}_vov01_分母': today_dist_view_pv,
  129. f'{index}_vov012': t_2_all_vov,
  130. f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
  131. f'{index}_vov012_分母': today_dist_view_pv,
  132. })
  133. logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
  134. return result_df
  135. def fetch_feature_data(t_1_datetime: datetime):
  136. """
  137. 获取feature数据
  138. :return:
  139. """
  140. logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
  141. with concurrent.futures.ThreadPoolExecutor(5) as executor:
  142. t_1_feature_task = executor.submit(
  143. fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
  144. )
  145. t_2_feature_task = executor.submit(
  146. fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
  147. )
  148. t_3_feature_task = executor.submit(
  149. fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
  150. )
  151. t_4_feature_task = executor.submit(
  152. fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
  153. )
  154. t_5_feature_task = executor.submit(
  155. fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
  156. )
  157. t_1_feature = t_1_feature_task.result()
  158. t_2_feature = t_2_feature_task.result()
  159. t_3_feature = t_3_feature_task.result()
  160. t_4_feature = t_4_feature_task.result()
  161. t_5_feature = t_5_feature_task.result()
  162. t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
  163. t_2_feature = t_2_feature[
  164. ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
  165. ]
  166. return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
  167. def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
  168. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  169. label_future = executor.submit(fetch_label_data, label_datetime)
  170. feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
  171. view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
  172. label_apply_df = label_future.result()
  173. t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
  174. view_rate = view_rate_future.result()
  175. df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
  176. .merge(t_1_feature, on="vid", how='left')
  177. .merge(t_2_feature, on="vid", how='left')
  178. .merge(t_3_feature, on="vid", how='left')
  179. .merge(t_4_feature, on="vid", how='left')
  180. .merge(t_5_feature, on="vid", how='left')
  181. )
  182. df.fillna(0, inplace=True)
  183. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  184. for col in column_names:
  185. df[col] = pd.to_numeric(df[col], errors='coerce')
  186. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  187. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  188. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  189. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  190. return df
  191. def xgb_multi_dt_data(t_1_label_dt: datetime):
  192. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  193. logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}")
  194. t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt)
  195. t_2_label_dt = t_1_label_dt - timedelta(1)
  196. logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}")
  197. t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt)
  198. t_3_label_dt = t_1_label_dt - timedelta(2)
  199. logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
  200. t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
  201. t_1_label_df = t_1_label_future.result()
  202. t_2_label_df = t_2_label_future.result()
  203. t_3_label_df = t_3_label_future.result()
  204. return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
  205. def _main():
  206. logger.info(f"XGB模型训练")
  207. train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=2)))
  208. trains_array = train_df[features_name].values
  209. trains_label_array = train_df['label'].values
  210. logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
  211. model = xgb.XGBClassifier(
  212. n_estimators=1000,
  213. learning_rate=0.01,
  214. max_depth=5,
  215. min_child_weight=1,
  216. gamma=0,
  217. subsample=0.8,
  218. colsample_bytree=0.8,
  219. objective='binary:logistic',
  220. nthread=8,
  221. scale_pos_weight=1,
  222. random_state=2024,
  223. seed=2024,
  224. )
  225. model.fit(trains_array, trains_label_array)
  226. logger.info("获取评测数据")
  227. start_label_datetime = datetime.now() - timedelta(days=1)
  228. feature_start_datetime = start_label_datetime - timedelta(1)
  229. predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
  230. tests_array = predict_df[features_name].values
  231. y_pred = model.predict_proba(tests_array)[:, 1]
  232. predict_df["y_pred"] = y_pred
  233. condition_choose = (
  234. (predict_df['y_pred'] <= 0.1) &
  235. (
  236. (predict_df['4_vov0_分母'] > 50) |
  237. (predict_df['2_vov0_分母'] > 50) |
  238. (predict_df['3_vov0_分母'] > 50)
  239. ) &
  240. (
  241. (predict_df['1_vov0'] - predict_df['2_vov0'] <= 0.1)
  242. )
  243. )
  244. profit_threshold = 0.3
  245. condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
  246. predict_df["condition_choose"] = condition_choose
  247. predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  248. "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
  249. sep="\t",
  250. index=False
  251. )
  252. choose_bad = condition_choose.sum()
  253. choose_bad_real_bad = condition_choose_real.sum()
  254. acc = choose_bad_real_bad / choose_bad
  255. logger.info(
  256. f"acc:{acc} "
  257. f"分子={choose_bad_real_bad} "
  258. f"分母={choose_bad} "
  259. f"总视频数={predict_df.shape[0]} "
  260. f"盈利计算标注vov0大于:{profit_threshold}"
  261. )
  262. surface = predict_df.loc[condition_choose, '曝光占比'].sum()
  263. surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
  264. logger.info(
  265. f"总影响面:{round(surface, 6)} "
  266. f"盈利影响面:{round(surface_income, 6)} "
  267. f"亏损影响面:{round(surface - surface_income, 6)}"
  268. )
  269. predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
  270. profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
  271. profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
  272. logger.info(
  273. f"总盈亏:{round(profit_loss_value, 1)} "
  274. f"纯盈利:{round(profit_value, 1)} "
  275. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  276. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  277. )
  278. filtered_vid = predict_df.loc[condition_choose_real, 'vid'].unique()
  279. print(f"要过滤掉的视频ID为: {filtered_vid}")
  280. if __name__ == '__main__':
  281. try:
  282. _main()
  283. except Exception as e:
  284. logger.error("VOV过滤XGB模型训练异常: ", e)