vov_xgboost_train.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import pandas as pd
  6. import numpy as np
  7. import xgboost as xgb
  8. from client import ODPSClient
  9. from config import ConfigManager
  10. from helper import RedisHelper
  11. from util import feishu_inform_util
  12. odps_client = ODPSClient.ODPSClient()
  13. config_manager = ConfigManager.ConfigManager()
  14. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  15. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  16. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  17. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  18. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  19. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  20. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  21. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  22. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  23. # 创建一个logger
  24. logger = logging.getLogger("vov_xgboost_train.py")
  25. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  26. def get_partition_df(table, dt):
  27. logger.info(f"开始下载: {table} -- {dt} 的数据")
  28. download_session = odps_client.get_download_session(table, dt)
  29. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  30. with download_session.open_arrow_reader(0, download_session.count) as reader:
  31. # 将所有数据加载到 DataFrame 中
  32. df = pd.concat([batch.to_pandas() for batch in reader])
  33. logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
  34. return df
  35. def fetch_label_data(label_datetime: datetime):
  36. """
  37. 获取 label数据
  38. :return:
  39. """
  40. label_dt = label_datetime.strftime("%Y%m%d")
  41. logger.info(f"fetch_label_data.dt: {label_dt}")
  42. # 获取数据
  43. label_df = get_partition_df("alg_vid_vov_new", label_dt)
  44. extracted_data = [
  45. {
  46. 'vid': int(row['vid']),
  47. }
  48. for _, row in label_df.iterrows()
  49. ]
  50. # 构造新的 DataFrame
  51. applied_df = pd.DataFrame(extracted_data)
  52. # 添加 title 列
  53. applied_df['title'] = "title"
  54. return applied_df
  55. def fetch_view_rate_data(view_date: datetime):
  56. """
  57. 获取曝光数据
  58. :return:
  59. """
  60. view_rate_dt = view_date.strftime("%Y%m%d")
  61. logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
  62. try:
  63. # 获取数据
  64. view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
  65. extracted_data = [
  66. {
  67. 'vid': int(row['vid']),
  68. '分母': int(feature['1_vov0_分母']),
  69. '分子': feature['1_vov0_分子'],
  70. 'vov0': feature['1_vov0']
  71. }
  72. for _, row in view_rate_df.iterrows()
  73. if (feature := json.loads(row['feature']))
  74. ]
  75. # 构造新的 DataFrame
  76. applied_df = pd.DataFrame(extracted_data)
  77. # 计算曝光占比,矢量化操作
  78. view_sum = applied_df['分母'].sum()
  79. applied_df['曝光占比'] = applied_df['分母'] / view_sum
  80. return applied_df
  81. except Exception as e:
  82. return pd.DataFrame({
  83. "vid": [-1],
  84. "分母": [0],
  85. "分子": [0],
  86. "vov0": [0],
  87. "曝光占比": [0]
  88. })
  89. def fetch_feature_data_dt(dt: str, index):
  90. """
  91. 查询某一天的特征数据,方便做特征数据时并行处理
  92. :param dt:
  93. :param index:
  94. :return:
  95. """
  96. logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
  97. df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
  98. today_dist_view_pv = df['today_dist_view_pv'].astype(int)
  99. today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
  100. day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
  101. day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
  102. # all_return_to_dist_view_pv
  103. t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
  104. t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
  105. # all_vov
  106. t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  107. t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
  108. t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  109. t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
  110. t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  111. t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
  112. # 构造结果DataFrame
  113. result_df = pd.DataFrame({
  114. 'vid': df['videoid'],
  115. f'{index}_vov0': t_0_all_vov,
  116. f'{index}_vov0_分子': today_return_to_dist_view_pv,
  117. f'{index}_vov0_分母': today_dist_view_pv,
  118. f'{index}_vov01': t_1_all_vov,
  119. f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
  120. f'{index}_vov01_分母': today_dist_view_pv,
  121. f'{index}_vov012': t_2_all_vov,
  122. f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
  123. f'{index}_vov012_分母': today_dist_view_pv,
  124. })
  125. logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
  126. return result_df
  127. def fetch_feature_data(t_1_datetime: datetime):
  128. """
  129. 获取feature数据
  130. :return:
  131. """
  132. logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
  133. with concurrent.futures.ThreadPoolExecutor(5) as executor:
  134. t_1_feature_task = executor.submit(
  135. fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
  136. )
  137. t_2_feature_task = executor.submit(
  138. fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
  139. )
  140. t_3_feature_task = executor.submit(
  141. fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
  142. )
  143. t_4_feature_task = executor.submit(
  144. fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
  145. )
  146. t_5_feature_task = executor.submit(
  147. fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
  148. )
  149. t_1_feature = t_1_feature_task.result()
  150. t_2_feature = t_2_feature_task.result()
  151. t_3_feature = t_3_feature_task.result()
  152. t_4_feature = t_4_feature_task.result()
  153. t_5_feature = t_5_feature_task.result()
  154. t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
  155. t_2_feature = t_2_feature[
  156. ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
  157. ]
  158. return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
  159. def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
  160. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  161. label_future = executor.submit(fetch_label_data, label_datetime)
  162. feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
  163. view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
  164. label_apply_df = label_future.result()
  165. t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
  166. view_rate = view_rate_future.result()
  167. df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
  168. .merge(t_1_feature, on="vid", how='left')
  169. .merge(t_2_feature, on="vid", how='left')
  170. .merge(t_3_feature, on="vid", how='left')
  171. .merge(t_4_feature, on="vid", how='left')
  172. .merge(t_5_feature, on="vid", how='left')
  173. )
  174. df.fillna(0, inplace=True)
  175. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  176. for col in column_names:
  177. df[col] = pd.to_numeric(df[col], errors='coerce')
  178. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  179. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  180. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  181. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  182. return df
  183. def xgb_multi_dt_data(t_1_label_dt: datetime):
  184. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  185. logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}")
  186. t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt)
  187. t_2_label_dt = t_1_label_dt - timedelta(1)
  188. logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}")
  189. t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt)
  190. t_3_label_dt = t_1_label_dt - timedelta(2)
  191. logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
  192. t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
  193. t_1_label_df = t_1_label_future.result()
  194. t_2_label_df = t_2_label_future.result()
  195. t_3_label_df = t_3_label_future.result()
  196. return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
  197. def _main():
  198. logger.info(f"XGB模型训练")
  199. train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=3)))
  200. trains_array = train_df[features_name].values
  201. trains_label_array = train_df['label'].values
  202. logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
  203. model = xgb.XGBClassifier(
  204. n_estimators=1000,
  205. learning_rate=0.01,
  206. max_depth=5,
  207. min_child_weight=1,
  208. gamma=0,
  209. subsample=0.8,
  210. colsample_bytree=0.8,
  211. objective='binary:logistic',
  212. nthread=8,
  213. scale_pos_weight=1,
  214. random_state=2024,
  215. seed=2024,
  216. )
  217. model.fit(trains_array, trains_label_array)
  218. logger.info("获取评测数据")
  219. start_label_datetime = datetime.now() - timedelta(days=2)
  220. feature_start_datetime = start_label_datetime
  221. predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
  222. tests_array = predict_df[features_name].values
  223. y_pred = model.predict_proba(tests_array)[:, 1]
  224. predict_df["y_pred"] = y_pred
  225. condition_choose = (
  226. (predict_df['y_pred'] <= 0.1) &
  227. (
  228. (predict_df['2_vov0_分母'] > 50) |
  229. (predict_df['3_vov0_分母'] > 50) |
  230. (predict_df['4_vov0_分母'] > 50)
  231. ) &
  232. (
  233. (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1)
  234. )
  235. )
  236. profit_threshold = 0.3
  237. condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
  238. predict_df["condition_choose"] = condition_choose
  239. predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  240. f"{config_manager.project_home}/XGB/file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
  241. sep="\t",
  242. index=False
  243. )
  244. choose_bad = condition_choose.sum()
  245. choose_bad_real_bad = condition_choose_real.sum()
  246. acc = choose_bad_real_bad / choose_bad
  247. logger.info(
  248. f"acc:{acc} "
  249. f"分子={choose_bad_real_bad} "
  250. f"分母={choose_bad} "
  251. f"总视频数={predict_df.shape[0]} "
  252. f"盈利计算标注vov0大于:{profit_threshold}"
  253. )
  254. surface = predict_df.loc[condition_choose, '曝光占比'].sum()
  255. surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
  256. logger.info(
  257. f"总影响面:{round(surface, 6)} "
  258. f"盈利影响面:{round(surface_income, 6)} "
  259. f"亏损影响面:{round(surface - surface_income, 6)}"
  260. )
  261. predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
  262. profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
  263. profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
  264. logger.info(
  265. f"总盈亏:{round(profit_loss_value, 1)} "
  266. f"纯盈利:{round(profit_value, 1)} "
  267. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  268. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  269. )
  270. filtered_vid = predict_df.loc[condition_choose, 'vid'].unique()
  271. # 写入本地文件
  272. np.savetxt(
  273. f"{config_manager.project_home}/XGB/file/filtered_vid_{datetime.now().strftime('%Y%m%d')}.csv",
  274. filtered_vid,
  275. fmt="%d",
  276. delimiter=","
  277. )
  278. # 写入Redis
  279. redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}"
  280. logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}")
  281. host, port, password = config_manager.get_algorithm_redis_info()
  282. alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password)
  283. for vid in filtered_vid.tolist():
  284. alg_redis.add_number_to_set(redis_key, vid)
  285. alg_redis.set_expire(redis_key, 86400)
  286. if __name__ == '__main__':
  287. card_json = {
  288. "config": {},
  289. "i18n_elements": {
  290. "zh_cn": [
  291. {
  292. "tag": "markdown",
  293. "content": "",
  294. "text_align": "left",
  295. "text_size": "normal"
  296. }
  297. ]
  298. },
  299. "i18n_header": {
  300. "zh_cn": {
  301. "title": {
  302. "tag": "plain_text",
  303. "content": "XGB模型训练预测完成"
  304. },
  305. "template": "info"
  306. }
  307. }
  308. }
  309. try:
  310. _main()
  311. msg_text = f"\n- 所属项目: model_monitor" \
  312. f"\n- 所属环境: {config_manager.get_env()}" \
  313. f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤"
  314. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  315. except Exception as e:
  316. logger.error("VOV过滤XGB模型训练异常: ", e)
  317. msg_text = f"\n- 所属项目: rov-offline" \
  318. f"\n- 告警名称: XGB模型训练失败" \
  319. f"\n- 所属环境: {config_manager.get_env()}" \
  320. f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤"
  321. card_json['i18n_header']['zh_cn']['template'] = "error"
  322. card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败"
  323. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  324. # 发送通知
  325. feishu_inform_util.send_card_msg_to_feishu(
  326. webhook=config_manager.get_vov_model_inform_feishu_webhook(),
  327. card_json=card_json
  328. )