import concurrent.futures import json import logging import os import sys from datetime import datetime, timedelta import pandas as pd import xgboost as xgb print(sys.path) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) from client import ODPSClient from config import ConfigManager from helper import RedisHelper from util import feishu_inform_util odps_client = ODPSClient.ODPSClient() config_manager = ConfigManager.ConfigManager() features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012'] column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母', '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母', '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母', '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母', '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母'] # 创建一个logger logger = logging.getLogger("vov_xgboost_train.py") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def get_partition_df(table, dt): logger.info(f"开始下载: {table} -- {dt} 的数据") download_session = odps_client.get_download_session(table, dt) logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据") with download_session.open_arrow_reader(0, download_session.count) as reader: # 将所有数据加载到 DataFrame 中 df = pd.concat([batch.to_pandas() for batch in reader]) logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据") return df def fetch_label_data(label_datetime: datetime): """ 获取 label数据 :return: """ label_dt = label_datetime.strftime("%Y%m%d") logger.info(f"fetch_label_data.dt: {label_dt}") # 获取数据 label_df = get_partition_df("alg_vid_vov_new", label_dt) extracted_data = [ { 'vid': int(row['vid']), } for _, row in label_df.iterrows() ] # 构造新的 DataFrame applied_df = pd.DataFrame(extracted_data) # 添加 title 列 applied_df['title'] = "title" return applied_df def fetch_view_rate_data(view_date: datetime): """ 获取曝光数据 :return: """ view_rate_dt = view_date.strftime("%Y%m%d") logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}") try: # 获取数据 view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt) extracted_data = [ { 'vid': int(row['vid']), '分母': int(feature['1_vov0_分母']), '分子': feature['1_vov0_分子'], 'vov0': feature['1_vov0'] } for _, row in view_rate_df.iterrows() if (feature := json.loads(row['feature'])) ] # 构造新的 DataFrame applied_df = pd.DataFrame(extracted_data) # 计算曝光占比,矢量化操作 view_sum = applied_df['分母'].sum() applied_df['曝光占比'] = applied_df['分母'] / view_sum return applied_df except Exception as e: return pd.DataFrame({ "vid": [-1], "分母": [0], "分子": [0], "vov0": [0], "曝光占比": [0] }) def fetch_feature_data_dt(dt: str, index): """ 查询某一天的特征数据,方便做特征数据时并行处理 :param dt: :param index: :return: """ logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据") df = get_partition_df("videoid_vov_base_data", dt).fillna(0) today_dist_view_pv = df['today_dist_view_pv'].astype(int) today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int) day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int) day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int) # all_return_to_dist_view_pv t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv # all_vov t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0) t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0) t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1) t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0) # 构造结果DataFrame result_df = pd.DataFrame({ 'vid': df['videoid'], f'{index}_vov0': t_0_all_vov, f'{index}_vov0_分子': today_return_to_dist_view_pv, f'{index}_vov0_分母': today_dist_view_pv, f'{index}_vov01': t_1_all_vov, f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv, f'{index}_vov01_分母': today_dist_view_pv, f'{index}_vov012': t_2_all_vov, f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv, f'{index}_vov012_分母': today_dist_view_pv, }) logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据") return result_df def fetch_feature_data(t_1_datetime: datetime): """ 获取feature数据 :return: """ logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}") with concurrent.futures.ThreadPoolExecutor(5) as executor: t_1_feature_task = executor.submit( fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1 ) t_2_feature_task = executor.submit( fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2 ) t_3_feature_task = executor.submit( fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3 ) t_4_feature_task = executor.submit( fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4 ) t_5_feature_task = executor.submit( fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5 ) t_1_feature = t_1_feature_task.result() t_2_feature = t_2_feature_task.result() t_3_feature = t_3_feature_task.result() t_4_feature = t_4_feature_task.result() t_5_feature = t_5_feature_task.result() t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]] t_2_feature = t_2_feature[ ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"] ] return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime): with concurrent.futures.ThreadPoolExecutor(3) as executor: label_future = executor.submit(fetch_label_data, label_datetime) feature_future = executor.submit(fetch_feature_data, feature_start_datetime) view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime) label_apply_df = label_future.result() t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result() view_rate = view_rate_future.result() df = (pd.merge(label_apply_df, view_rate, on="vid", how='left') .merge(t_1_feature, on="vid", how='left') .merge(t_2_feature, on="vid", how='left') .merge(t_3_feature, on="vid", how='left') .merge(t_4_feature, on="vid", how='left') .merge(t_5_feature, on="vid", how='left') ) df.fillna(0, inplace=True) df.sort_values(by=['曝光占比'], ascending=False, inplace=True) for col in column_names: df[col] = pd.to_numeric(df[col], errors='coerce') df["12_change"] = df["1_vov0"] - df["2_vov0"] df["23_change"] = df["2_vov0"] - df["3_vov0"] df["34_change"] = df["3_vov0"] - df["4_vov0"] df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0) return df def xgb_multi_dt_data(t_1_label_dt: datetime): with concurrent.futures.ThreadPoolExecutor(3) as executor: logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}") t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt) t_2_label_dt = t_1_label_dt - timedelta(1) logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}") t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt) t_3_label_dt = t_1_label_dt - timedelta(2) logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}") t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt) t_1_label_df = t_1_label_future.result() t_2_label_df = t_2_label_future.result() t_3_label_df = t_3_label_future.result() return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True) def _main(): logger.info(f"XGB模型训练") train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=3))) trains_array = train_df[features_name].values trains_label_array = train_df['label'].values logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}") model = xgb.XGBClassifier( n_estimators=1000, learning_rate=0.01, max_depth=5, min_child_weight=1, gamma=0, subsample=0.8, colsample_bytree=0.8, objective='binary:logistic', nthread=8, scale_pos_weight=1, random_state=2024, seed=2024, ) model.fit(trains_array, trains_label_array) logger.info("获取评测数据") start_label_datetime = datetime.now() - timedelta(days=2) feature_start_datetime = start_label_datetime predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime) tests_array = predict_df[features_name].values y_pred = model.predict_proba(tests_array)[:, 1] predict_df["y_pred"] = y_pred condition_choose = ( (predict_df['y_pred'] <= 0.1) & ( (predict_df['2_vov0_分母'] > 50) | (predict_df['3_vov0_分母'] > 50) | (predict_df['4_vov0_分母'] > 50) ) & ( (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1) ) ) profit_threshold = 0.3 condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold) predict_df["condition_choose"] = condition_choose predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv( "./file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False ) choose_bad = condition_choose.sum() choose_bad_real_bad = condition_choose_real.sum() acc = choose_bad_real_bad / choose_bad logger.info( f"acc:{acc} " f"分子={choose_bad_real_bad} " f"分母={choose_bad} " f"总视频数={predict_df.shape[0]} " f"盈利计算标注vov0大于:{profit_threshold}" ) surface = predict_df.loc[condition_choose, '曝光占比'].sum() surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum() logger.info( f"总影响面:{round(surface, 6)} " f"盈利影响面:{round(surface_income, 6)} " f"亏损影响面:{round(surface - surface_income, 6)}" ) predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold) profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum() profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum() logger.info( f"总盈亏:{round(profit_loss_value, 1)} " f"纯盈利:{round(profit_value, 1)} " f"纯亏损:{round(profit_loss_value - profit_value, 1)} " f"盈利效率:{round(profit_loss_value / profit_value, 6)}" ) filtered_vid = predict_df.loc[condition_choose, 'vid'].unique() # 写入Redis redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}" logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}") host, port, password = config_manager.get_algorithm_redis_info() alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password) for vid in filtered_vid.tolist(): alg_redis.add_number_to_set(redis_key, vid) alg_redis.set_expire(redis_key, 86400) if __name__ == '__main__': card_json = { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "XGB模型训练预测完成" }, "template": "info" } } } try: # _main() msg_text = f"\n- 所属项目: model_monitor" \ f"\n- 所属环境: {config_manager.get_env()}" \ f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤" card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text except Exception as e: logger.error("VOV过滤XGB模型训练异常: ", e) msg_text = f"\n- 所属项目: rov-offline" \ f"\n- 告警名称: XGB模型训练失败" \ f"\n- 所属环境: {config_manager.get_env()}" \ f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤" card_json['i18n_header']['zh_cn']['template'] = "error" card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败" card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text # 发送通知 feishu_inform_util.send_card_msg_to_feishu( webhook=config_manager.get_vov_model_inform_feishu_webhook(), card_json=card_json )