vov_xgboost_train.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import pandas as pd
  6. import xgboost as xgb
  7. from client import ODPSClient
  8. from config import ConfigManager
  9. from helper import RedisHelper
  10. from util import feishu_inform_util
  11. odps_client = ODPSClient.ODPSClient()
  12. config_manager = ConfigManager.ConfigManager()
  13. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  14. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  15. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  16. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  17. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  18. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  19. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  20. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  21. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  22. # 创建一个logger
  23. logger = logging.getLogger("vov_xgboost_train.py")
  24. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  25. def get_partition_df(table, dt):
  26. logger.info(f"开始下载: {table} -- {dt} 的数据")
  27. download_session = odps_client.get_download_session(table, dt)
  28. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  29. with download_session.open_arrow_reader(0, download_session.count) as reader:
  30. # 将所有数据加载到 DataFrame 中
  31. df = pd.concat([batch.to_pandas() for batch in reader])
  32. logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
  33. return df
  34. def fetch_label_data(label_datetime: datetime):
  35. """
  36. 获取 label数据
  37. :return:
  38. """
  39. label_dt = label_datetime.strftime("%Y%m%d")
  40. logger.info(f"fetch_label_data.dt: {label_dt}")
  41. # 获取数据
  42. label_df = get_partition_df("alg_vid_vov_new", label_dt)
  43. extracted_data = [
  44. {
  45. 'vid': int(row['vid']),
  46. }
  47. for _, row in label_df.iterrows()
  48. ]
  49. # 构造新的 DataFrame
  50. applied_df = pd.DataFrame(extracted_data)
  51. # 添加 title 列
  52. applied_df['title'] = "title"
  53. return applied_df
  54. def fetch_view_rate_data(view_date: datetime):
  55. """
  56. 获取曝光数据
  57. :return:
  58. """
  59. view_rate_dt = view_date.strftime("%Y%m%d")
  60. logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
  61. try:
  62. # 获取数据
  63. view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
  64. extracted_data = [
  65. {
  66. 'vid': int(row['vid']),
  67. '分母': int(feature['1_vov0_分母']),
  68. '分子': feature['1_vov0_分子'],
  69. 'vov0': feature['1_vov0']
  70. }
  71. for _, row in view_rate_df.iterrows()
  72. if (feature := json.loads(row['feature']))
  73. ]
  74. # 构造新的 DataFrame
  75. applied_df = pd.DataFrame(extracted_data)
  76. # 计算曝光占比,矢量化操作
  77. view_sum = applied_df['分母'].sum()
  78. applied_df['曝光占比'] = applied_df['分母'] / view_sum
  79. return applied_df
  80. except Exception as e:
  81. return pd.DataFrame({
  82. "vid": [-1],
  83. "分母": [0],
  84. "分子": [0],
  85. "vov0": [0],
  86. "曝光占比": [0]
  87. })
  88. def fetch_feature_data_dt(dt: str, index):
  89. """
  90. 查询某一天的特征数据,方便做特征数据时并行处理
  91. :param dt:
  92. :param index:
  93. :return:
  94. """
  95. logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
  96. df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
  97. today_dist_view_pv = df['today_dist_view_pv'].astype(int)
  98. today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
  99. day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
  100. day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
  101. # all_return_to_dist_view_pv
  102. t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
  103. t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
  104. # all_vov
  105. t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  106. t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
  107. t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  108. t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
  109. t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  110. t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
  111. # 构造结果DataFrame
  112. result_df = pd.DataFrame({
  113. 'vid': df['videoid'],
  114. f'{index}_vov0': t_0_all_vov,
  115. f'{index}_vov0_分子': today_return_to_dist_view_pv,
  116. f'{index}_vov0_分母': today_dist_view_pv,
  117. f'{index}_vov01': t_1_all_vov,
  118. f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
  119. f'{index}_vov01_分母': today_dist_view_pv,
  120. f'{index}_vov012': t_2_all_vov,
  121. f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
  122. f'{index}_vov012_分母': today_dist_view_pv,
  123. })
  124. logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
  125. return result_df
  126. def fetch_feature_data(t_1_datetime: datetime):
  127. """
  128. 获取feature数据
  129. :return:
  130. """
  131. logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
  132. with concurrent.futures.ThreadPoolExecutor(5) as executor:
  133. t_1_feature_task = executor.submit(
  134. fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
  135. )
  136. t_2_feature_task = executor.submit(
  137. fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
  138. )
  139. t_3_feature_task = executor.submit(
  140. fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
  141. )
  142. t_4_feature_task = executor.submit(
  143. fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
  144. )
  145. t_5_feature_task = executor.submit(
  146. fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
  147. )
  148. t_1_feature = t_1_feature_task.result()
  149. t_2_feature = t_2_feature_task.result()
  150. t_3_feature = t_3_feature_task.result()
  151. t_4_feature = t_4_feature_task.result()
  152. t_5_feature = t_5_feature_task.result()
  153. t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
  154. t_2_feature = t_2_feature[
  155. ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
  156. ]
  157. return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
  158. def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
  159. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  160. label_future = executor.submit(fetch_label_data, label_datetime)
  161. feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
  162. view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
  163. label_apply_df = label_future.result()
  164. t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
  165. view_rate = view_rate_future.result()
  166. df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
  167. .merge(t_1_feature, on="vid", how='left')
  168. .merge(t_2_feature, on="vid", how='left')
  169. .merge(t_3_feature, on="vid", how='left')
  170. .merge(t_4_feature, on="vid", how='left')
  171. .merge(t_5_feature, on="vid", how='left')
  172. )
  173. df.fillna(0, inplace=True)
  174. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  175. for col in column_names:
  176. df[col] = pd.to_numeric(df[col], errors='coerce')
  177. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  178. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  179. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  180. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  181. return df
  182. def xgb_multi_dt_data(t_1_label_dt: datetime):
  183. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  184. logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}")
  185. t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt)
  186. t_2_label_dt = t_1_label_dt - timedelta(1)
  187. logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}")
  188. t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt)
  189. t_3_label_dt = t_1_label_dt - timedelta(2)
  190. logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
  191. t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
  192. t_1_label_df = t_1_label_future.result()
  193. t_2_label_df = t_2_label_future.result()
  194. t_3_label_df = t_3_label_future.result()
  195. return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
  196. def _main():
  197. logger.info(f"XGB模型训练")
  198. train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=3)))
  199. trains_array = train_df[features_name].values
  200. trains_label_array = train_df['label'].values
  201. logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
  202. model = xgb.XGBClassifier(
  203. n_estimators=1000,
  204. learning_rate=0.01,
  205. max_depth=5,
  206. min_child_weight=1,
  207. gamma=0,
  208. subsample=0.8,
  209. colsample_bytree=0.8,
  210. objective='binary:logistic',
  211. nthread=8,
  212. scale_pos_weight=1,
  213. random_state=2024,
  214. seed=2024,
  215. )
  216. model.fit(trains_array, trains_label_array)
  217. logger.info("获取评测数据")
  218. start_label_datetime = datetime.now() - timedelta(days=2)
  219. feature_start_datetime = start_label_datetime
  220. predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
  221. tests_array = predict_df[features_name].values
  222. y_pred = model.predict_proba(tests_array)[:, 1]
  223. predict_df["y_pred"] = y_pred
  224. condition_choose = (
  225. (predict_df['y_pred'] <= 0.1) &
  226. (
  227. (predict_df['2_vov0_分母'] > 50) |
  228. (predict_df['3_vov0_分母'] > 50) |
  229. (predict_df['4_vov0_分母'] > 50)
  230. ) &
  231. (
  232. (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1)
  233. )
  234. )
  235. profit_threshold = 0.3
  236. condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
  237. predict_df["condition_choose"] = condition_choose
  238. predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  239. "./file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
  240. sep="\t",
  241. index=False
  242. )
  243. choose_bad = condition_choose.sum()
  244. choose_bad_real_bad = condition_choose_real.sum()
  245. acc = choose_bad_real_bad / choose_bad
  246. logger.info(
  247. f"acc:{acc} "
  248. f"分子={choose_bad_real_bad} "
  249. f"分母={choose_bad} "
  250. f"总视频数={predict_df.shape[0]} "
  251. f"盈利计算标注vov0大于:{profit_threshold}"
  252. )
  253. surface = predict_df.loc[condition_choose, '曝光占比'].sum()
  254. surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
  255. logger.info(
  256. f"总影响面:{round(surface, 6)} "
  257. f"盈利影响面:{round(surface_income, 6)} "
  258. f"亏损影响面:{round(surface - surface_income, 6)}"
  259. )
  260. predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
  261. profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
  262. profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
  263. logger.info(
  264. f"总盈亏:{round(profit_loss_value, 1)} "
  265. f"纯盈利:{round(profit_value, 1)} "
  266. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  267. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  268. )
  269. filtered_vid = predict_df.loc[condition_choose, 'vid'].unique()
  270. # 写入Redis
  271. redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}"
  272. logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}")
  273. host, port, password = config_manager.get_algorithm_redis_info()
  274. alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password)
  275. for vid in filtered_vid.tolist():
  276. alg_redis.add_number_to_set(redis_key, vid)
  277. alg_redis.set_expire(redis_key, 86400)
  278. if __name__ == '__main__':
  279. card_json = {
  280. "config": {},
  281. "i18n_elements": {
  282. "zh_cn": [
  283. {
  284. "tag": "markdown",
  285. "content": "",
  286. "text_align": "left",
  287. "text_size": "normal"
  288. }
  289. ]
  290. },
  291. "i18n_header": {
  292. "zh_cn": {
  293. "title": {
  294. "tag": "plain_text",
  295. "content": "XGB模型训练预测完成"
  296. },
  297. "template": "info"
  298. }
  299. }
  300. }
  301. try:
  302. _main()
  303. msg_text = f"\n- 所属项目: model_monitor" \
  304. f"\n- 所属环境: {config_manager.get_env()}" \
  305. f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤"
  306. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  307. except Exception as e:
  308. logger.error("VOV过滤XGB模型训练异常: ", e)
  309. msg_text = f"\n- 所属项目: rov-offline" \
  310. f"\n- 告警名称: XGB模型训练失败" \
  311. f"\n- 所属环境: {config_manager.get_env()}" \
  312. f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤"
  313. card_json['i18n_header']['zh_cn']['template'] = "error"
  314. card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败"
  315. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  316. # 发送通知
  317. feishu_inform_util.send_card_msg_to_feishu(
  318. webhook=config_manager.get_vov_model_inform_feishu_webhook(),
  319. card_json=card_json
  320. )