vov_xgboost_train.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import numpy as np
  6. import pandas as pd
  7. import xgboost as xgb
  8. from client import ODPSClient
  9. from config import ConfigManager
  10. from util import feishu_inform_util
  11. odps_client = ODPSClient.ODPSClient()
  12. config_manager = ConfigManager.ConfigManager()
  13. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  14. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  15. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  16. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  17. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  18. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  19. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  20. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  21. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  22. # 创建一个logger
  23. logger = logging.getLogger("vov_xgboost_train.py")
  24. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  25. def get_partition_df(table, dt):
  26. logger.info(f"开始下载: {table} -- {dt} 的数据")
  27. download_session = odps_client.get_download_session(table, dt)
  28. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  29. with download_session.open_arrow_reader(0, download_session.count) as reader:
  30. # 将所有数据加载到 DataFrame 中
  31. df = pd.concat([batch.to_pandas() for batch in reader])
  32. logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
  33. return df
  34. def fetch_label_data(label_datetime: datetime):
  35. """
  36. 获取 label数据
  37. :return:
  38. """
  39. label_dt = label_datetime.strftime("%Y%m%d")
  40. logger.info(f"fetch_label_data.dt: {label_dt}")
  41. # 获取数据
  42. label_df = get_partition_df("alg_vid_vov_new", label_dt)
  43. extracted_data = [
  44. {
  45. 'vid': int(row['vid']),
  46. }
  47. for _, row in label_df.iterrows()
  48. ]
  49. # 构造新的 DataFrame
  50. applied_df = pd.DataFrame(extracted_data)
  51. # 添加 title 列
  52. applied_df['title'] = "title"
  53. return applied_df
  54. def fetch_view_rate_data(view_date: datetime):
  55. """
  56. 获取曝光数据
  57. :return:
  58. """
  59. view_rate_dt = view_date.strftime("%Y%m%d")
  60. logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
  61. try:
  62. # 获取数据
  63. view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
  64. extracted_data = [
  65. {
  66. 'vid': int(row['vid']),
  67. '分母': int(feature['1_vov0_分母']),
  68. '分子': feature['1_vov0_分子'],
  69. 'vov0': feature['1_vov0']
  70. }
  71. for _, row in view_rate_df.iterrows()
  72. if (feature := json.loads(row['feature']))
  73. ]
  74. # 构造新的 DataFrame
  75. applied_df = pd.DataFrame(extracted_data)
  76. # 计算曝光占比,矢量化操作
  77. view_sum = applied_df['分母'].sum()
  78. applied_df['曝光占比'] = applied_df['分母'] / view_sum
  79. return applied_df
  80. except Exception as e:
  81. return pd.DataFrame({
  82. "vid": [-1],
  83. "分母": [0],
  84. "分子": [0],
  85. "vov0": [0],
  86. "曝光占比": [0]
  87. })
  88. def fetch_feature_data_dt(dt: str, index):
  89. """
  90. 查询某一天的特征数据,方便做特征数据时并行处理
  91. :param dt:
  92. :param index:
  93. :return:
  94. """
  95. logger.info(f"fetch_feature_data_dt.dt -- {dt} 的数据")
  96. df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
  97. today_dist_view_pv = df['today_dist_view_pv'].astype(int)
  98. today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
  99. day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
  100. day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
  101. # all_return_to_dist_view_pv
  102. t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
  103. t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
  104. # all_vov
  105. t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  106. t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
  107. t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  108. t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
  109. t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  110. t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
  111. # 构造结果DataFrame
  112. result_df = pd.DataFrame({
  113. 'vid': df['videoid'],
  114. f'{index}_vov0': t_0_all_vov,
  115. f'{index}_vov0_分子': today_return_to_dist_view_pv,
  116. f'{index}_vov0_分母': today_dist_view_pv,
  117. f'{index}_vov01': t_1_all_vov,
  118. f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
  119. f'{index}_vov01_分母': today_dist_view_pv,
  120. f'{index}_vov012': t_2_all_vov,
  121. f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
  122. f'{index}_vov012_分母': today_dist_view_pv,
  123. })
  124. logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
  125. return result_df
  126. def fetch_feature_data(t_1_datetime: datetime):
  127. """
  128. 获取feature数据
  129. :return:
  130. """
  131. with concurrent.futures.ThreadPoolExecutor(5) as executor:
  132. t_1_feature_task = executor.submit(
  133. fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
  134. )
  135. t_2_datetime = t_1_datetime - timedelta(days=1)
  136. t_2_feature_task = executor.submit(
  137. fetch_feature_data_dt, t_2_datetime.strftime("%Y%m%d"), 2
  138. )
  139. t_3_datetime = t_1_datetime - timedelta(days=2)
  140. t_3_feature_task = executor.submit(
  141. fetch_feature_data_dt, t_3_datetime.strftime("%Y%m%d"), 3
  142. )
  143. t_4_datetime = t_1_datetime - timedelta(days=3)
  144. t_4_feature_task = executor.submit(
  145. fetch_feature_data_dt, t_4_datetime.strftime("%Y%m%d"), 4
  146. )
  147. t_5_datetime = t_1_datetime - timedelta(days=4)
  148. t_5_feature_task = executor.submit(
  149. fetch_feature_data_dt, t_5_datetime.strftime("%Y%m%d"), 5
  150. )
  151. logger.info(
  152. f"fetch_feature_data:"
  153. f"\t t_1_feature_task.datetime: {t_1_datetime.strftime('%Y%m%d')}"
  154. f"\t t_2_feature_task.datetime: {t_2_datetime.strftime('%Y%m%d')}"
  155. f"\t t_3_feature_task.datetime: {t_3_datetime.strftime('%Y%m%d')}"
  156. f"\t t_4_feature_task.datetime: {t_4_datetime.strftime('%Y%m%d')}"
  157. f"\t t_5_feature_task.datetime: {t_5_datetime.strftime('%Y%m%d')}"
  158. )
  159. t_1_feature = t_1_feature_task.result()
  160. t_2_feature = t_2_feature_task.result()
  161. t_3_feature = t_3_feature_task.result()
  162. t_4_feature = t_4_feature_task.result()
  163. t_5_feature = t_5_feature_task.result()
  164. t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
  165. t_2_feature = t_2_feature[
  166. ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
  167. ]
  168. return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
  169. def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
  170. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  171. label_future = executor.submit(fetch_label_data, label_datetime)
  172. feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
  173. view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
  174. label_apply_df = label_future.result()
  175. t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
  176. view_rate = view_rate_future.result()
  177. df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
  178. .merge(t_1_feature, on="vid", how='left')
  179. .merge(t_2_feature, on="vid", how='left')
  180. .merge(t_3_feature, on="vid", how='left')
  181. .merge(t_4_feature, on="vid", how='left')
  182. .merge(t_5_feature, on="vid", how='left')
  183. )
  184. df.fillna(0, inplace=True)
  185. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  186. for col in column_names:
  187. df[col] = pd.to_numeric(df[col], errors='coerce')
  188. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  189. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  190. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  191. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  192. return df
  193. def xgb_train_multi_dt_data(t_1_label_dt: datetime):
  194. """
  195. XGB模型多天训练数据
  196. :param t_1_label_dt:
  197. :return:
  198. """
  199. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  200. t_1_feature_dt = t_1_label_dt - timedelta(2)
  201. logger.info(
  202. f"VOV模型特征数据处理 --- t_1_label_future:"
  203. f"\t label_datetime: {t_1_label_dt.strftime('%Y%m%d')} "
  204. f"\t feature_datetime: {t_1_feature_dt.strftime('%Y%m%d')} "
  205. f"\t view_rate_datetime: {t_1_label_dt.strftime('%Y%m%d')} "
  206. )
  207. t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_feature_dt, t_1_label_dt)
  208. t_2_label_dt = t_1_label_dt - timedelta(1)
  209. t_2_feature_dt = t_2_label_dt - timedelta(1)
  210. logger.info(
  211. f"VOV模型特征数据处理 --- t_2_label_future:"
  212. f"\t label_datetime: {t_2_label_dt.strftime('%Y%m%d')} "
  213. f"\t feature_datetime: {t_2_feature_dt.strftime('%Y%m%d')} "
  214. f"\t view_rate_datetime: {t_2_label_dt.strftime('%Y%m%d')} "
  215. )
  216. t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_feature_dt, t_2_label_dt)
  217. t_3_label_dt = t_1_label_dt - timedelta(2)
  218. t_3_feature_dt = t_3_label_dt - timedelta(1)
  219. logger.info(
  220. f"VOV模型特征数据处理 --- t_3_label_future:"
  221. f"\t label_datetime: {t_3_label_dt.strftime('%Y%m%d')} "
  222. f"\t feature_datetime: {t_3_feature_dt.strftime('%Y%m%d')} "
  223. f"\t view_rate_datetime: {t_3_label_dt.strftime('%Y%m%d')} "
  224. )
  225. t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_feature_dt, t_3_label_dt)
  226. t_1_label_df = t_1_label_future.result()
  227. t_2_label_df = t_2_label_future.result()
  228. t_3_label_df = t_3_label_future.result()
  229. return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
  230. def xgb_predict_dt_data(label_datetime: datetime):
  231. """
  232. 获取预估数据
  233. :param label_datetime:
  234. :return:
  235. """
  236. feature_start_datetime = label_datetime
  237. view_rate_datetime = label_datetime + timedelta(2)
  238. logger.info(
  239. f"VOV模型预测数据处理 --- predict_df: "
  240. f"\t label_datetime: {label_datetime.strftime('%Y%m%d')} "
  241. f"\t feature_datetime: {feature_start_datetime.strftime('%Y%m%d')} "
  242. f"\t view_rate_datetime: {view_rate_datetime.strftime('%Y%m%d')} "
  243. )
  244. return fetch_data(label_datetime, feature_start_datetime, view_rate_datetime)
  245. def _main():
  246. logger.info(f"XGB模型训练")
  247. train_df = xgb_train_multi_dt_data((datetime.now() - timedelta(days=4)))
  248. trains_array = train_df[features_name].values
  249. trains_label_array = train_df['label'].values
  250. logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
  251. model = xgb.XGBClassifier(
  252. n_estimators=1000,
  253. learning_rate=0.01,
  254. max_depth=5,
  255. min_child_weight=1,
  256. gamma=0,
  257. subsample=0.8,
  258. colsample_bytree=0.8,
  259. objective='binary:logistic',
  260. nthread=8,
  261. scale_pos_weight=1,
  262. random_state=2024,
  263. seed=2024,
  264. )
  265. model.fit(trains_array, trains_label_array)
  266. logger.info("获取评测数据")
  267. predict_df = xgb_predict_dt_data((datetime.now() - timedelta(days=3)))
  268. tests_array = predict_df[features_name].values
  269. y_pred = model.predict_proba(tests_array)[:, 1]
  270. predict_df["y_pred"] = y_pred
  271. condition_choose = (
  272. (predict_df['y_pred'] <= 0.1) &
  273. (
  274. (predict_df['2_vov0_分母'] > 50) |
  275. (predict_df['3_vov0_分母'] > 50) |
  276. (predict_df['4_vov0_分母'] > 50)
  277. ) &
  278. (
  279. (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1)
  280. )
  281. )
  282. profit_threshold = 0.3
  283. condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
  284. predict_df["condition_choose"] = condition_choose
  285. predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  286. f"{config_manager.project_home}/XGB/file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
  287. sep="\t",
  288. index=False
  289. )
  290. choose_bad = condition_choose.sum()
  291. choose_bad_real_bad = condition_choose_real.sum()
  292. acc = choose_bad_real_bad / choose_bad
  293. logger.info(
  294. f"acc:{acc} "
  295. f"分子={choose_bad_real_bad} "
  296. f"分母={choose_bad} "
  297. f"总视频数={predict_df.shape[0]} "
  298. f"盈利计算标注vov0大于:{profit_threshold}"
  299. )
  300. surface = predict_df.loc[condition_choose, '曝光占比'].sum()
  301. surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
  302. logger.info(
  303. f"总影响面:{round(surface, 6)} "
  304. f"盈利影响面:{round(surface_income, 6)} "
  305. f"亏损影响面:{round(surface - surface_income, 6)}"
  306. )
  307. predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
  308. profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
  309. profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
  310. logger.info(
  311. f"总盈亏:{round(profit_loss_value, 1)} "
  312. f"纯盈利:{round(profit_value, 1)} "
  313. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  314. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  315. )
  316. filtered_vid = predict_df.loc[condition_choose, 'vid'].unique()
  317. # 写入本地文件
  318. np.savetxt(
  319. f"{config_manager.project_home}/XGB/file/filtered_vid_{datetime.now().strftime('%Y%m%d')}.csv",
  320. filtered_vid,
  321. fmt="%d",
  322. delimiter=","
  323. )
  324. # 写入Redis
  325. # redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}"
  326. #
  327. # logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}")
  328. # host, port, password = config_manager.get_algorithm_redis_info()
  329. # alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password)
  330. # for vid in filtered_vid.tolist():
  331. # alg_redis.add_number_to_set(redis_key, vid)
  332. #
  333. # alg_redis.set_expire(redis_key, 86400)
  334. if __name__ == '__main__':
  335. card_json = {
  336. "config": {},
  337. "i18n_elements": {
  338. "zh_cn": [
  339. {
  340. "tag": "markdown",
  341. "content": "",
  342. "text_align": "left",
  343. "text_size": "normal"
  344. }
  345. ]
  346. },
  347. "i18n_header": {
  348. "zh_cn": {
  349. "title": {
  350. "tag": "plain_text",
  351. "content": "XGB模型训练预测完成"
  352. },
  353. "template": "turquoise"
  354. }
  355. }
  356. }
  357. try:
  358. _main()
  359. msg_text = f"\n- 所属项目: model_monitor" \
  360. f"\n- 所属环境: {config_manager.get_env()}" \
  361. f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤"
  362. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  363. except Exception as e:
  364. logger.error("VOV过滤XGB模型训练异常: ", e)
  365. msg_text = f"\n- 所属项目: model_monitor" \
  366. f"\n- 所属环境: {config_manager.get_env()}" \
  367. f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤"
  368. card_json['i18n_header']['zh_cn']['template'] = "red"
  369. card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败"
  370. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  371. if config_manager.get_env() == "pro":
  372. # 发送通知
  373. feishu_inform_util.send_card_msg_to_feishu(
  374. webhook=config_manager.get_vov_model_inform_feishu_webhook(),
  375. card_json=card_json
  376. )