import concurrent.futures import json import logging from datetime import datetime, timedelta import pandas as pd import numpy as np import xgboost as xgb from client import ODPSClient from config import ConfigManager from helper import RedisHelper from util import feishu_inform_util odps_client = ODPSClient.ODPSClient() config_manager = ConfigManager.ConfigManager() features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012'] column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母', '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母', '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母', '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母', '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母'] # 创建一个logger logger = logging.getLogger("vov_xgboost_train.py") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def get_partition_df(table, dt): logger.info(f"开始下载: {table} -- {dt} 的数据") download_session = odps_client.get_download_session(table, dt) logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据") with download_session.open_arrow_reader(0, download_session.count) as reader: # 将所有数据加载到 DataFrame 中 df = pd.concat([batch.to_pandas() for batch in reader]) logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据") return df def fetch_label_data(label_datetime: datetime): """ 获取 label数据 :return: """ label_dt = label_datetime.strftime("%Y%m%d") logger.info(f"fetch_label_data.dt: {label_dt}") # 获取数据 label_df = get_partition_df("alg_vid_vov_new", label_dt) extracted_data = [ { 'vid': int(row['vid']), } for _, row in label_df.iterrows() ] # 构造新的 DataFrame applied_df = pd.DataFrame(extracted_data) # 添加 title 列 applied_df['title'] = "title" return applied_df def fetch_view_rate_data(view_date: datetime): """ 获取曝光数据 :return: """ view_rate_dt = view_date.strftime("%Y%m%d") logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}") try: # 获取数据 view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt) extracted_data = [ { 'vid': int(row['vid']), '分母': int(feature['1_vov0_分母']), '分子': feature['1_vov0_分子'], 'vov0': feature['1_vov0'] } for _, row in view_rate_df.iterrows() if (feature := json.loads(row['feature'])) ] # 构造新的 DataFrame applied_df = pd.DataFrame(extracted_data) # 计算曝光占比,矢量化操作 view_sum = applied_df['分母'].sum() applied_df['曝光占比'] = applied_df['分母'] / view_sum return applied_df except Exception as e: return pd.DataFrame({ "vid": [-1], "分母": [0], "分子": [0], "vov0": [0], "曝光占比": [0] }) def fetch_feature_data_dt(dt: str, index): """ 查询某一天的特征数据,方便做特征数据时并行处理 :param dt: :param index: :return: """ logger.info(f"fetch_feature_data_dt.dt -- {dt} 的数据") df = get_partition_df("videoid_vov_base_data", dt).fillna(0) today_dist_view_pv = df['today_dist_view_pv'].astype(int) today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int) day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int) day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int) # all_return_to_dist_view_pv t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv # all_vov t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0) t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0) t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0) # 构造结果DataFrame result_df = pd.DataFrame({ 'vid': df['videoid'], f'{index}_vov0': t_0_all_vov, f'{index}_vov0_分子': today_return_to_dist_view_pv, f'{index}_vov0_分母': today_dist_view_pv, f'{index}_vov01': t_1_all_vov, f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv, f'{index}_vov01_分母': today_dist_view_pv, f'{index}_vov012': t_2_all_vov, f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv, f'{index}_vov012_分母': today_dist_view_pv, }) logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据") return result_df def fetch_feature_data(t_1_datetime: datetime): """ 获取feature数据 :return: """ with concurrent.futures.ThreadPoolExecutor(5) as executor: t_1_feature_task = executor.submit( fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1 ) t_2_datetime = t_1_datetime - timedelta(days=1) t_2_feature_task = executor.submit( fetch_feature_data_dt, t_2_datetime.strftime("%Y%m%d"), 2 ) t_3_datetime = t_1_datetime - timedelta(days=2) t_3_feature_task = executor.submit( fetch_feature_data_dt, t_3_datetime.strftime("%Y%m%d"), 3 ) t_4_datetime = t_1_datetime - timedelta(days=3) t_4_feature_task = executor.submit( fetch_feature_data_dt, t_4_datetime.strftime("%Y%m%d"), 4 ) t_5_datetime = t_1_datetime - timedelta(days=4) t_5_feature_task = executor.submit( fetch_feature_data_dt, t_5_datetime.strftime("%Y%m%d"), 5 ) logger.info( f"fetch_feature_data:" f"\t t_1_feature_task.datetime: {t_1_datetime.strftime('%Y%m%d')}" f"\t t_2_feature_task.datetime: {t_2_datetime.strftime('%Y%m%d')}" f"\t t_3_feature_task.datetime: {t_3_datetime.strftime('%Y%m%d')}" f"\t t_4_feature_task.datetime: {t_4_datetime.strftime('%Y%m%d')}" f"\t t_5_feature_task.datetime: {t_5_datetime.strftime('%Y%m%d')}" ) t_1_feature = t_1_feature_task.result() t_2_feature = t_2_feature_task.result() t_3_feature = t_3_feature_task.result() t_4_feature = t_4_feature_task.result() t_5_feature = t_5_feature_task.result() t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]] t_2_feature = t_2_feature[ ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"] ] return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime): with concurrent.futures.ThreadPoolExecutor(3) as executor: label_future = executor.submit(fetch_label_data, label_datetime) feature_future = executor.submit(fetch_feature_data, feature_start_datetime) view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime) label_apply_df = label_future.result() t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result() view_rate = view_rate_future.result() df = (pd.merge(label_apply_df, view_rate, on="vid", how='left') .merge(t_1_feature, on="vid", how='left') .merge(t_2_feature, on="vid", how='left') .merge(t_3_feature, on="vid", how='left') .merge(t_4_feature, on="vid", how='left') .merge(t_5_feature, on="vid", how='left') ) df.fillna(0, inplace=True) df.sort_values(by=['曝光占比'], ascending=False, inplace=True) for col in column_names: df[col] = pd.to_numeric(df[col], errors='coerce') df["12_change"] = df["1_vov0"] - df["2_vov0"] df["23_change"] = df["2_vov0"] - df["3_vov0"] df["34_change"] = df["3_vov0"] - df["4_vov0"] df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0) return df def xgb_train_multi_dt_data(t_1_label_dt: datetime): """ XGB模型多天训练数据 :param t_1_label_dt: :return: """ with concurrent.futures.ThreadPoolExecutor(3) as executor: t_1_feature_dt = t_1_label_dt - timedelta(1) logger.info( f"VOV模型特征数据处理 --- t_1_label_future:" f"\t label_datetime: {t_1_label_dt.strftime('%Y%m%d')} " f"\t feature_datetime: {t_1_feature_dt.strftime('%Y%m%d')} " f"\t view_rate_datetime: {t_1_label_dt.strftime('%Y%m%d')} " ) t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_feature_dt, t_1_label_dt) t_2_label_dt = t_1_label_dt - timedelta(1) t_2_feature_dt = t_2_label_dt - timedelta(1) logger.info( f"VOV模型特征数据处理 --- t_2_label_future:" f"\t label_datetime: {t_2_label_dt.strftime('%Y%m%d')} " f"\t feature_datetime: {t_2_feature_dt.strftime('%Y%m%d')} " f"\t view_rate_datetime: {t_2_label_dt.strftime('%Y%m%d')} " ) t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_feature_dt, t_2_label_dt) t_3_label_dt = t_1_label_dt - timedelta(2) t_3_feature_dt = t_3_label_dt - timedelta(1) logger.info( f"VOV模型特征数据处理 --- t_3_label_future:" f"\t label_datetime: {t_3_label_dt.strftime('%Y%m%d')} " f"\t feature_datetime: {t_3_feature_dt.strftime('%Y%m%d')} " f"\t view_rate_datetime: {t_3_label_dt.strftime('%Y%m%d')} " ) t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_feature_dt, t_3_label_dt) t_1_label_df = t_1_label_future.result() t_2_label_df = t_2_label_future.result() t_3_label_df = t_3_label_future.result() return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True) def xgb_predict_dt_data(label_datetime: datetime): """ 获取预估数据 :param label_datetime: :return: """ feature_start_datetime = label_datetime logger.info( f"VOV模型预测数据处理 --- predict_df: " f"\t label_datetime: {label_datetime.strftime('%Y%m%d')} " f"\t feature_datetime: {feature_start_datetime.strftime('%Y%m%d')} " f"\t view_rate_datetime: {label_datetime.strftime('%Y%m%d')} " ) return fetch_data(label_datetime, feature_start_datetime, label_datetime) def _main(): logger.info(f"XGB模型训练") train_df = xgb_train_multi_dt_data((datetime.now() - timedelta(days=1))) trains_array = train_df[features_name].values trains_label_array = train_df['label'].values logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}") model = xgb.XGBClassifier( n_estimators=1000, learning_rate=0.01, max_depth=5, min_child_weight=1, gamma=0, subsample=0.8, colsample_bytree=0.8, objective='binary:logistic', nthread=8, scale_pos_weight=1, random_state=2024, seed=2024, ) model.fit(trains_array, trains_label_array) logger.info("获取评测数据") predict_df = xgb_predict_dt_data((datetime.now() - timedelta(days=1))) tests_array = predict_df[features_name].values y_pred = model.predict_proba(tests_array)[:, 1] predict_df["y_pred"] = y_pred condition_choose = ( (predict_df['y_pred'] <= 0.1) & ( (predict_df['2_vov0_分母'] > 50) | (predict_df['3_vov0_分母'] > 50) | (predict_df['4_vov0_分母'] > 50) ) & ( (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1) ) ) profit_threshold = 0.3 condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold) predict_df["condition_choose"] = condition_choose predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv( f"{config_manager.project_home}/XGB/file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False ) choose_bad = condition_choose.sum() choose_bad_real_bad = condition_choose_real.sum() acc = choose_bad_real_bad / choose_bad logger.info( f"acc:{acc} " f"分子={choose_bad_real_bad} " f"分母={choose_bad} " f"总视频数={predict_df.shape[0]} " f"盈利计算标注vov0大于:{profit_threshold}" ) surface = predict_df.loc[condition_choose, '曝光占比'].sum() surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum() logger.info( f"总影响面:{round(surface, 6)} " f"盈利影响面:{round(surface_income, 6)} " f"亏损影响面:{round(surface - surface_income, 6)}" ) predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold) profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum() profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum() logger.info( f"总盈亏:{round(profit_loss_value, 1)} " f"纯盈利:{round(profit_value, 1)} " f"纯亏损:{round(profit_loss_value - profit_value, 1)} " f"盈利效率:{round(profit_loss_value / profit_value, 6)}" ) filtered_vid = predict_df.loc[condition_choose, 'vid'].unique() # 写入本地文件 np.savetxt( f"{config_manager.project_home}/XGB/file/filtered_vid_{datetime.now().strftime('%Y%m%d')}.csv", filtered_vid, fmt="%d", delimiter="," ) # 写入Redis redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}" logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}") host, port, password = config_manager.get_algorithm_redis_info() alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password) for vid in filtered_vid.tolist(): alg_redis.add_number_to_set(redis_key, vid) alg_redis.set_expire(redis_key, 86400) if __name__ == '__main__': card_json = { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "XGB模型训练预测完成" }, "template": "turquoise" } } } try: _main() msg_text = f"\n- 所属项目: model_monitor" \ f"\n- 所属环境: {config_manager.get_env()}" \ f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤" card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text except Exception as e: logger.error("VOV过滤XGB模型训练异常: ", e) msg_text = f"\n- 所属项目: model_monitor" \ f"\n- 所属环境: {config_manager.get_env()}" \ f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤" card_json['i18n_header']['zh_cn']['template'] = "red" card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败" card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text # 发送通知 feishu_inform_util.send_card_msg_to_feishu( webhook=config_manager.get_vov_model_inform_feishu_webhook(), card_json=card_json )