vov_xgboost_train.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. import concurrent.futures
  2. import json
  3. import logging
  4. from datetime import datetime, timedelta
  5. import pandas as pd
  6. import xgboost as xgb
  7. from client import ODPSClient
  8. from config import ConfigManager
  9. from helper import RedisHelper
  10. from util import feishu_inform_util
  11. odps_client = ODPSClient.ODPSClient()
  12. config_manager = ConfigManager.ConfigManager()
  13. features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
  14. '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
  15. '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
  16. column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
  17. '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
  18. '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
  19. '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
  20. '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
  21. '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
  22. # 创建一个logger
  23. logger = logging.getLogger("vov_xgboost_train.py")
  24. logger.setLevel(logging.INFO) # 设置日志级别
  25. # 创建Handler用于输出到文件
  26. file_handler = logging.FileHandler('xgboost_train.log')
  27. file_handler.setLevel(logging.INFO) # 设置日志级别为INFO
  28. # 创建Handler用于输出到控制台
  29. console_handler = logging.StreamHandler()
  30. console_handler.setLevel(logging.INFO) # 设置日志级别为INFO
  31. # 定义日志格式
  32. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  33. file_handler.setFormatter(formatter)
  34. console_handler.setFormatter(formatter)
  35. # 将Handler添加到Logger
  36. logger.addHandler(file_handler)
  37. logger.addHandler(console_handler)
  38. def get_partition_df(table, dt):
  39. logger.info(f"开始下载: {table} -- {dt} 的数据")
  40. download_session = odps_client.get_download_session(table, dt)
  41. logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
  42. with download_session.open_arrow_reader(0, download_session.count) as reader:
  43. # 将所有数据加载到 DataFrame 中
  44. df = pd.concat([batch.to_pandas() for batch in reader])
  45. logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
  46. return df
  47. def fetch_label_data(label_datetime: datetime):
  48. """
  49. 获取 label数据
  50. :return:
  51. """
  52. label_dt = label_datetime.strftime("%Y%m%d")
  53. logger.info(f"fetch_label_data.dt: {label_dt}")
  54. # 获取数据
  55. label_df = get_partition_df("alg_vid_vov_new", label_dt)
  56. extracted_data = [
  57. {
  58. 'vid': int(row['vid']),
  59. }
  60. for _, row in label_df.iterrows()
  61. ]
  62. # 构造新的 DataFrame
  63. applied_df = pd.DataFrame(extracted_data)
  64. # 添加 title 列
  65. applied_df['title'] = "title"
  66. return applied_df
  67. def fetch_view_rate_data(view_date: datetime):
  68. """
  69. 获取曝光数据
  70. :return:
  71. """
  72. view_rate_dt = view_date.strftime("%Y%m%d")
  73. logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
  74. try:
  75. # 获取数据
  76. view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
  77. extracted_data = [
  78. {
  79. 'vid': int(row['vid']),
  80. '分母': int(feature['1_vov0_分母']),
  81. '分子': feature['1_vov0_分子'],
  82. 'vov0': feature['1_vov0']
  83. }
  84. for _, row in view_rate_df.iterrows()
  85. if (feature := json.loads(row['feature']))
  86. ]
  87. # 构造新的 DataFrame
  88. applied_df = pd.DataFrame(extracted_data)
  89. # 计算曝光占比,矢量化操作
  90. view_sum = applied_df['分母'].sum()
  91. applied_df['曝光占比'] = applied_df['分母'] / view_sum
  92. return applied_df
  93. except Exception as e:
  94. return pd.DataFrame({
  95. "vid": [-1],
  96. "分母": [0],
  97. "分子": [0],
  98. "vov0": [0],
  99. "曝光占比": [0]
  100. })
  101. def fetch_feature_data_dt(dt: str, index):
  102. """
  103. 查询某一天的特征数据,方便做特征数据时并行处理
  104. :param dt:
  105. :param index:
  106. :return:
  107. """
  108. logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
  109. df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
  110. today_dist_view_pv = df['today_dist_view_pv'].astype(int)
  111. today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
  112. day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
  113. day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
  114. # all_return_to_dist_view_pv
  115. t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
  116. t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
  117. # all_vov
  118. t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  119. t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
  120. t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  121. t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
  122. t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
  123. t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
  124. # 构造结果DataFrame
  125. result_df = pd.DataFrame({
  126. 'vid': df['videoid'],
  127. f'{index}_vov0': t_0_all_vov,
  128. f'{index}_vov0_分子': today_return_to_dist_view_pv,
  129. f'{index}_vov0_分母': today_dist_view_pv,
  130. f'{index}_vov01': t_1_all_vov,
  131. f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
  132. f'{index}_vov01_分母': today_dist_view_pv,
  133. f'{index}_vov012': t_2_all_vov,
  134. f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
  135. f'{index}_vov012_分母': today_dist_view_pv,
  136. })
  137. logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
  138. return result_df
  139. def fetch_feature_data(t_1_datetime: datetime):
  140. """
  141. 获取feature数据
  142. :return:
  143. """
  144. logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
  145. with concurrent.futures.ThreadPoolExecutor(5) as executor:
  146. t_1_feature_task = executor.submit(
  147. fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
  148. )
  149. t_2_feature_task = executor.submit(
  150. fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
  151. )
  152. t_3_feature_task = executor.submit(
  153. fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
  154. )
  155. t_4_feature_task = executor.submit(
  156. fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
  157. )
  158. t_5_feature_task = executor.submit(
  159. fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
  160. )
  161. t_1_feature = t_1_feature_task.result()
  162. t_2_feature = t_2_feature_task.result()
  163. t_3_feature = t_3_feature_task.result()
  164. t_4_feature = t_4_feature_task.result()
  165. t_5_feature = t_5_feature_task.result()
  166. t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
  167. t_2_feature = t_2_feature[
  168. ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
  169. ]
  170. return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
  171. def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
  172. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  173. label_future = executor.submit(fetch_label_data, label_datetime)
  174. feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
  175. view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
  176. label_apply_df = label_future.result()
  177. t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
  178. view_rate = view_rate_future.result()
  179. df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
  180. .merge(t_1_feature, on="vid", how='left')
  181. .merge(t_2_feature, on="vid", how='left')
  182. .merge(t_3_feature, on="vid", how='left')
  183. .merge(t_4_feature, on="vid", how='left')
  184. .merge(t_5_feature, on="vid", how='left')
  185. )
  186. df.fillna(0, inplace=True)
  187. df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
  188. for col in column_names:
  189. df[col] = pd.to_numeric(df[col], errors='coerce')
  190. df["12_change"] = df["1_vov0"] - df["2_vov0"]
  191. df["23_change"] = df["2_vov0"] - df["3_vov0"]
  192. df["34_change"] = df["3_vov0"] - df["4_vov0"]
  193. df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
  194. return df
  195. def xgb_multi_dt_data(t_1_label_dt: datetime):
  196. with concurrent.futures.ThreadPoolExecutor(3) as executor:
  197. logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}")
  198. t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt)
  199. t_2_label_dt = t_1_label_dt - timedelta(1)
  200. logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}")
  201. t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt)
  202. t_3_label_dt = t_1_label_dt - timedelta(2)
  203. logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
  204. t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
  205. t_1_label_df = t_1_label_future.result()
  206. t_2_label_df = t_2_label_future.result()
  207. t_3_label_df = t_3_label_future.result()
  208. return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
  209. def _main():
  210. logger.info(f"XGB模型训练")
  211. train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=3)))
  212. trains_array = train_df[features_name].values
  213. trains_label_array = train_df['label'].values
  214. logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
  215. model = xgb.XGBClassifier(
  216. n_estimators=1000,
  217. learning_rate=0.01,
  218. max_depth=5,
  219. min_child_weight=1,
  220. gamma=0,
  221. subsample=0.8,
  222. colsample_bytree=0.8,
  223. objective='binary:logistic',
  224. nthread=8,
  225. scale_pos_weight=1,
  226. random_state=2024,
  227. seed=2024,
  228. )
  229. model.fit(trains_array, trains_label_array)
  230. logger.info("获取评测数据")
  231. start_label_datetime = datetime.now() - timedelta(days=2)
  232. feature_start_datetime = start_label_datetime
  233. predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
  234. tests_array = predict_df[features_name].values
  235. y_pred = model.predict_proba(tests_array)[:, 1]
  236. predict_df["y_pred"] = y_pred
  237. condition_choose = (
  238. (predict_df['y_pred'] <= 0.1) &
  239. (
  240. (predict_df['2_vov0_分母'] > 50) |
  241. (predict_df['3_vov0_分母'] > 50) |
  242. (predict_df['4_vov0_分母'] > 50)
  243. ) &
  244. (
  245. (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1)
  246. )
  247. )
  248. profit_threshold = 0.3
  249. condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
  250. predict_df["condition_choose"] = condition_choose
  251. predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
  252. "./file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
  253. sep="\t",
  254. index=False
  255. )
  256. choose_bad = condition_choose.sum()
  257. choose_bad_real_bad = condition_choose_real.sum()
  258. acc = choose_bad_real_bad / choose_bad
  259. logger.info(
  260. f"acc:{acc} "
  261. f"分子={choose_bad_real_bad} "
  262. f"分母={choose_bad} "
  263. f"总视频数={predict_df.shape[0]} "
  264. f"盈利计算标注vov0大于:{profit_threshold}"
  265. )
  266. surface = predict_df.loc[condition_choose, '曝光占比'].sum()
  267. surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
  268. logger.info(
  269. f"总影响面:{round(surface, 6)} "
  270. f"盈利影响面:{round(surface_income, 6)} "
  271. f"亏损影响面:{round(surface - surface_income, 6)}"
  272. )
  273. predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
  274. profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
  275. profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
  276. logger.info(
  277. f"总盈亏:{round(profit_loss_value, 1)} "
  278. f"纯盈利:{round(profit_value, 1)} "
  279. f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
  280. f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
  281. )
  282. filtered_vid = predict_df.loc[condition_choose, 'vid'].unique()
  283. # 写入Redis
  284. redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}"
  285. logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}")
  286. host, port, password = config_manager.get_algorithm_redis_info()
  287. alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password)
  288. for vid in filtered_vid.tolist():
  289. alg_redis.add_number_to_set(redis_key, vid)
  290. alg_redis.set_expire(redis_key, 86400)
  291. if __name__ == '__main__':
  292. card_json = {
  293. "config": {},
  294. "i18n_elements": {
  295. "zh_cn": [
  296. {
  297. "tag": "markdown",
  298. "content": "",
  299. "text_align": "left",
  300. "text_size": "normal"
  301. }
  302. ]
  303. },
  304. "i18n_header": {
  305. "zh_cn": {
  306. "title": {
  307. "tag": "plain_text",
  308. "content": "XGB模型训练预测完成"
  309. },
  310. "template": "info"
  311. }
  312. }
  313. }
  314. try:
  315. # _main()
  316. msg_text = f"\n- 所属项目: model_monitor" \
  317. f"\n- 所属环境: {config_manager.get_env()}" \
  318. f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤"
  319. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  320. except Exception as e:
  321. logger.error("VOV过滤XGB模型训练异常: ", e)
  322. msg_text = f"\n- 所属项目: rov-offline" \
  323. f"\n- 告警名称: XGB模型训练失败" \
  324. f"\n- 所属环境: {config_manager.get_env()}" \
  325. f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤"
  326. card_json['i18n_header']['zh_cn']['template'] = "error"
  327. card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败"
  328. card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
  329. # 发送通知
  330. feishu_inform_util.send_card_msg_to_feishu(
  331. webhook=config_manager.get_vov_model_inform_feishu_webhook(),
  332. card_json=card_json
  333. )