import concurrent.futures import json import logging from datetime import datetime, timedelta import pandas as pd import xgboost as xgb from client import ODPSClient odps_client = ODPSClient.ODPSClient() features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012'] column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母', '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母', '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母', '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母', '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母'] # 配置日志格式和日志级别 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 创建一个logger logger = logging.getLogger(__name__) def get_partition_df(table, dt): logger.info(f"开始下载: {table} -- {dt} 的数据") download_session = odps_client.get_download_session(table, dt) logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据") with download_session.open_arrow_reader(0, download_session.count) as reader: # 将所有数据加载到 DataFrame 中 df = pd.concat([batch.to_pandas() for batch in reader]) logger.info(f"下载结束: {table} -- {dt} 的数据") return df def fetch_label_data(label_dt): """ 获取 label数据 :return: """ logger.info(f"fetch_label_data.dt: {label_dt}") def extract_label(row): feature = json.loads(row['feature']) return pd.Series({ 'vid': row['vid'], '分母': int(feature['1_vov0_分母']), "分子": feature['1_vov0_分子'], 'vov0': feature['1_vov0'] }) train_df = get_partition_df("alg_vid_vov_new", label_dt) applied_df = train_df.apply(extract_label, axis=1) # 计算曝光占比 view_sum = applied_df['分母'].sum() applied_df['曝光占比'] = round(applied_df['分母'] / view_sum, 6) return applied_df def fetch_feature_data(feature_dt): """ 获取feature数据 :return: """ logger.info(f"fetch_feature_data.dt: {feature_dt}") def extract_feature(row): feature = json.loads(row['feature']) return pd.Series({ 'vid': row['vid'], **feature }) feature_df = get_partition_df("alg_vid_vov_new", feature_dt) return feature_df.apply(extract_feature, axis=1) def fetch_data(label_datetime: datetime): label_dt = label_datetime.strftime("%Y%m%d") feature_dt = (label_datetime - timedelta(days=1)).strftime("%Y%m%d") with concurrent.futures.ThreadPoolExecutor(2) as executor: label_future = executor.submit(fetch_label_data, label_dt) feature_future = executor.submit(fetch_feature_data, feature_dt) label_apply_df = label_future.result() feature_apply_df = feature_future.result() df = pd.merge(label_apply_df, feature_apply_df, on="vid", how='left') df.fillna(0, inplace=True) df.sort_values(by=['曝光占比'], ascending=False, inplace=True) for col in column_names: df[col] = pd.to_numeric(df[col], errors='coerce') df["12_change"] = df["1_vov0"] - df["2_vov0"] df["23_change"] = df["2_vov0"] - df["3_vov0"] df["34_change"] = df["3_vov0"] - df["4_vov0"] feature_array = df[features_name].values df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0) label_array = df["label"].values return df, feature_array, label_array def _main(): logger.info(f"XGB模型训练") df, trains_array, trains_label_array = fetch_data((datetime.now() - timedelta(days=2))) logger.info("特征获取完成,开始训练") model = xgb.XGBClassifier( n_estimators=100, learning_rate=0.01, max_depth=5, min_child_weight=1, gamma=0, subsample=0.8, colsample_bytree=0.8, objective='binary:logistic', nthread=8, scale_pos_weight=1, random_state=2024, seed=2024, ) model.fit(trains_array, trains_label_array, verbose=True) logger.info("获取评测数据") df_test, tests_array, _ = fetch_data(datetime.now() - timedelta(days=1)) y_pred = model.predict_proba(tests_array)[:, 1] df_test["y_pred"] = y_pred condition_choose = ((df_test['y_pred'] <= 0.2) # & ((df_test['1_vov0_分母'] > 50) | (df_test['2_vov0_分母'] > 50) | (df_test['3_vov0_分母'] > 50)) & (df_test.index <= 10000) ) profit_threshold = 0.3 condition_choose_real = condition_choose & (df_test['vov0'] <= profit_threshold) df_test["condition_choose"] = condition_choose df_test[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv( "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False) choose_bad = condition_choose.sum() choose_bad_real_bad = condition_choose_real.sum() acc = choose_bad_real_bad / choose_bad logger.info( f"acc:{acc} " f"分子={choose_bad_real_bad} " f"分母={choose_bad} " f"总视频数={df_test.size} " f"盈利计算标注vov0大于:{profit_threshold}" ) surface = df_test.loc[condition_choose, '曝光占比'].sum() surface_income = df_test.loc[condition_choose_real, '曝光占比'].sum() logger.info( f"总影响面:{round(surface, 6)} " f"盈利影响面:{round(surface_income, 6)} " f"亏损影响面:{round(surface - surface_income, 6)}" ) df_test["profit_loss_value"] = df_test['分母'] * (df_test['vov0'] - profit_threshold) profit_loss_value = df_test.loc[condition_choose, 'profit_loss_value'].sum() profit_value = df_test.loc[condition_choose_real, 'profit_loss_value'].sum() logger.info( f"总盈亏:{round(profit_loss_value, 1)} " f"纯盈利:{round(profit_value, 1)} " f"纯亏损:{round(profit_loss_value - profit_value, 1)} " f"盈利效率:{round(profit_loss_value / profit_value, 6)}" ) if __name__ == '__main__': try: _main() except Exception as e: logger.error("VOV过滤XGB模型训练异常: ", e)