Browse Source

feat:添加VOV XGB模型训练脚本

zhaohaipeng 9 months ago
parent
commit
0d6d89e35a
3 changed files with 281 additions and 71 deletions
  1. 2 1
      .gitignore
  2. 45 0
      XGB/check_data.py
  3. 234 70
      XGB/xgboost_train.py

+ 2 - 1
.gitignore

@@ -60,4 +60,5 @@ target/
 
 .idea
 
-XGB/new*
+XGB/new*
+XGB/data

+ 45 - 0
XGB/check_data.py

@@ -0,0 +1,45 @@
+import pandas as pd
+
+old_date_train = f"/Users/zhao/Downloads/20241010.csv"
+new_date_train = f"/Users/zhao/Desktop/Code/Python/model_monitor/XGB/data/all/20241010_train.csv"
+
+# 读取两个 CSV 文件
+old_df = pd.read_csv(old_date_train)
+new_df = pd.read_csv(new_date_train)
+
+if old_df.shape[0] != new_df.shape[0]:
+    print(f"新老训练数据集长度不一样 新数据集: {new_df.shape[0]}, 老数据集: {old_df.shape[0]}")
+
+old_df_col = old_df.columns
+new_df_col = new_df.columns
+if len(old_df_col) != len(new_df_col):
+    print(f"两个文件列数不一样 新文件: {new_df_col}, 老文件: {old_df_col}")
+
+for col in old_df_col:
+    if col not in new_df_col:
+        print(f"列 {col} 在老文件存在,新文件不存在")
+
+for col in new_df_col:
+    if col not in old_df_col:
+        print(f"列 {col} 在新文件存在,老文件不存在")
+
+old_df.set_index("vid", inplace=True)
+new_df.set_index("vid", inplace=True)
+
+old_dict = old_df.to_dict(orient="index")
+new_dict = new_df.to_dict(orient="index")
+
+for e in new_dict:
+    if e not in old_dict:
+        print(f"vid {e} 在新文件中存在,在老文件中不存在")
+    new_row = new_dict[e]
+    old_row = old_dict[e]
+    for col in new_df_col:
+        if col not in old_row:
+            print(f"vid {e} 的列 {col} 在老文件中不存在")
+            continue
+        if col in new_row:
+            print(f"vid {e} 的列 {col} 在新文件中不存在")
+            continue
+        if old_row[col] != new_row[col]:
+            print(f"vid {e} 列 {col} 的值在新老文件不一样, 新文件的值: {new_row[col]}, 老文件的值: {old_row[col]}")

+ 234 - 70
XGB/xgboost_train.py

@@ -21,11 +21,26 @@ column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0',
                 '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
                 '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
 
-# 配置日志格式和日志级别
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-
 # 创建一个logger
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("xgboost_train.py")
+logger.setLevel(logging.INFO)  # 设置日志级别
+
+# 创建Handler用于输出到文件
+file_handler = logging.FileHandler('xgboost_train.log')
+file_handler.setLevel(logging.INFO)  # 设置日志级别为INFO
+
+# 创建Handler用于输出到控制台
+console_handler = logging.StreamHandler()
+console_handler.setLevel(logging.INFO)  # 设置日志级别为INFO
+
+# 定义日志格式
+formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+file_handler.setFormatter(formatter)
+console_handler.setFormatter(formatter)
+
+# 将Handler添加到Logger
+logger.addHandler(file_handler)
+logger.addHandler(console_handler)
 
 
 def get_partition_df(table, dt):
@@ -37,89 +52,222 @@ def get_partition_df(table, dt):
         # 将所有数据加载到 DataFrame 中
         df = pd.concat([batch.to_pandas() for batch in reader])
 
-    logger.info(f"下载结束: {table} -- {dt} 的数据")
+    logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
     return df
 
 
-def fetch_label_data(label_dt):
+def fetch_label_data(label_datetime: datetime):
     """
     获取 label数据
     :return:
     """
+    label_dt = label_datetime.strftime("%Y%m%d")
     logger.info(f"fetch_label_data.dt: {label_dt}")
 
-    def extract_label(row):
-        feature = json.loads(row['feature'])
-        return pd.Series({
-            'vid': row['vid'],
-            '分母': int(feature['1_vov0_分母']),
-            "分子": feature['1_vov0_分子'],
-            'vov0': feature['1_vov0']
+    # 获取数据
+    label_df = get_partition_df("alg_vid_vov_new", label_dt)
+    extracted_data = [
+        {
+            'vid': int(row['vid']),
+        }
+        for _, row in label_df.iterrows()
+    ]
+    # 构造新的 DataFrame
+    applied_df = pd.DataFrame(extracted_data)
+    # 添加 title 列
+    applied_df['title'] = "title"
+
+    return applied_df
+
+
+def fetch_view_rate_data(view_date: datetime):
+    """
+    获取曝光数据
+    :return:
+    """
+    view_rate_dt = view_date.strftime("%Y%m%d")
+    logger.info(f"fetch_view_rate_date.dt: {view_rate_dt}")
+    try:
+        # 获取数据
+        view_rate_df = get_partition_df("alg_vid_vov_new", view_rate_dt)
+        extracted_data = [
+            {
+                'vid': int(row['vid']),
+                '分母': int(feature['1_vov0_分母']),
+                '分子': feature['1_vov0_分子'],
+                'vov0': feature['1_vov0']
+            }
+            for _, row in view_rate_df.iterrows()
+            if (feature := json.loads(row['feature']))
+        ]
+        # 构造新的 DataFrame
+        applied_df = pd.DataFrame(extracted_data)
+        # 计算曝光占比,矢量化操作
+        view_sum = applied_df['分母'].sum()
+        applied_df['曝光占比'] = applied_df['分母'] / view_sum
+
+        return applied_df
+    except Exception as e:
+        return pd.DataFrame({
+            "vid": [-1],
+            "分母": [0],
+            "分子": [0],
+            "vov0": [0],
+            "曝光占比": [0]
         })
 
-    train_df = get_partition_df("alg_vid_vov_new", label_dt)
-    applied_df = train_df.apply(extract_label, axis=1)
 
-    # 计算曝光占比
-    view_sum = applied_df['分母'].sum()
-    applied_df['曝光占比'] = round(applied_df['分母'] / view_sum, 6)
-    return applied_df
+def fetch_feature_data_dt(dt: str, index):
+    """
+    查询某一天的特征数据,方便做特征数据时并行处理
+    :param dt:
+    :param index:
+    :return:
+    """
+
+    logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
+
+    df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
 
+    today_dist_view_pv = df['today_dist_view_pv'].astype(int)
+    today_return_to_dist_view_pv = df['today_return_to_dist_view_pv'].astype(int)
+    day1_return_to_dist_view_pv = df['day1_return_to_dist_view_pv'].astype(int)
+    day2_return_to_dist_view_pv = df['day2_return_to_dist_view_pv'].astype(int)
 
-def fetch_feature_data(feature_dt):
+    # all_return_to_dist_view_pv
+    t_1_all_return_to_dist_view_pv = today_return_to_dist_view_pv + day1_return_to_dist_view_pv
+    t_2_all_return_to_dist_view_pv = t_1_all_return_to_dist_view_pv + day2_return_to_dist_view_pv
+
+    # all_vov
+    t_0_all_vov = today_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
+    t_0_all_vov = t_0_all_vov.where(today_dist_view_pv > 0, 0)
+
+    t_1_all_vov = t_1_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
+    t_1_all_vov = t_1_all_vov.where(today_dist_view_pv > 0, 0)
+
+    t_2_all_vov = t_2_all_return_to_dist_view_pv / today_dist_view_pv.where(today_dist_view_pv > 0, 1)
+    t_2_all_vov = t_2_all_vov.where(today_dist_view_pv > 0, 0)
+
+    # 构造结果DataFrame
+    result_df = pd.DataFrame({
+        'vid': df['videoid'],
+
+        f'{index}_vov0': t_0_all_vov,
+        f'{index}_vov0_分子': today_return_to_dist_view_pv,
+        f'{index}_vov0_分母': today_dist_view_pv,
+
+        f'{index}_vov01': t_1_all_vov,
+        f'{index}_vov01_分子': t_1_all_return_to_dist_view_pv,
+        f'{index}_vov01_分母': today_dist_view_pv,
+
+        f'{index}_vov012': t_2_all_vov,
+        f'{index}_vov012_分子': t_2_all_return_to_dist_view_pv,
+        f'{index}_vov012_分母': today_dist_view_pv,
+    })
+    logger.info(f"完成处理 videoid_vov_base_data -- {dt} 的数据")
+
+    return result_df
+
+
+def fetch_feature_data(t_1_datetime: datetime):
     """
     获取feature数据
     :return:
     """
 
-    logger.info(f"fetch_feature_data.dt: {feature_dt}")
+    logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
+
+    with concurrent.futures.ThreadPoolExecutor(5) as executor:
+        t_1_feature_task = executor.submit(
+            fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
+        )
+        t_2_feature_task = executor.submit(
+            fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
+        )
+        t_3_feature_task = executor.submit(
+            fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
+        )
+        t_4_feature_task = executor.submit(
+            fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
+        )
+        t_5_feature_task = executor.submit(
+            fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
+        )
+
+        t_1_feature = t_1_feature_task.result()
+        t_2_feature = t_2_feature_task.result()
+        t_3_feature = t_3_feature_task.result()
+        t_4_feature = t_4_feature_task.result()
+        t_5_feature = t_5_feature_task.result()
+
+        t_1_feature = t_1_feature[['vid', "1_vov0", "1_vov0_分子", "1_vov0_分母"]]
+        t_2_feature = t_2_feature[
+            ['vid', "2_vov0", "2_vov0_分子", "2_vov0_分母", "2_vov01", "2_vov01_分子", "2_vov01_分母"]
+        ]
+
+    return t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature
+
+
+def fetch_data(label_datetime: datetime, feature_start_datetime: datetime, view_rate_datetime: datetime):
+    with concurrent.futures.ThreadPoolExecutor(3) as executor:
+        label_future = executor.submit(fetch_label_data, label_datetime)
+        feature_future = executor.submit(fetch_feature_data, feature_start_datetime)
+        view_rate_future = executor.submit(fetch_view_rate_data, view_rate_datetime)
 
-    def extract_feature(row):
-        feature = json.loads(row['feature'])
-        return pd.Series({
-            'vid': row['vid'],
-            **feature
-        })
+        label_apply_df = label_future.result()
+        t_1_feature, t_2_feature, t_3_feature, t_4_feature, t_5_feature = feature_future.result()
+        view_rate = view_rate_future.result()
 
-    feature_df = get_partition_df("alg_vid_vov_new", feature_dt)
-    return feature_df.apply(extract_feature, axis=1)
+        df = (pd.merge(label_apply_df, view_rate, on="vid", how='left')
+              .merge(t_1_feature, on="vid", how='left')
+              .merge(t_2_feature, on="vid", how='left')
+              .merge(t_3_feature, on="vid", how='left')
+              .merge(t_4_feature, on="vid", how='left')
+              .merge(t_5_feature, on="vid", how='left')
+              )
+        df.fillna(0, inplace=True)
+        df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
 
+        for col in column_names:
+            df[col] = pd.to_numeric(df[col], errors='coerce')
 
-def fetch_data(label_datetime: datetime):
-    label_dt = label_datetime.strftime("%Y%m%d")
-    feature_dt = (label_datetime - timedelta(days=1)).strftime("%Y%m%d")
+        df["12_change"] = df["1_vov0"] - df["2_vov0"]
+        df["23_change"] = df["2_vov0"] - df["3_vov0"]
+        df["34_change"] = df["3_vov0"] - df["4_vov0"]
 
-    with concurrent.futures.ThreadPoolExecutor(2) as executor:
-        label_future = executor.submit(fetch_label_data, label_dt)
-        feature_future = executor.submit(fetch_feature_data, feature_dt)
-        label_apply_df = label_future.result()
-        feature_apply_df = feature_future.result()
+        df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
+
+    return df
 
-    df = pd.merge(label_apply_df, feature_apply_df, on="vid", how='left')
-    df.fillna(0, inplace=True)
-    df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
 
-    for col in column_names:
-        df[col] = pd.to_numeric(df[col], errors='coerce')
+def xgb_multi_dt_data(t_1_label_dt: datetime):
+    with concurrent.futures.ThreadPoolExecutor(3) as executor:
+        logger.info(f"VOV模型特征数据处理:t_1_label_future.label_datetime: {t_1_label_dt.strftime('%Y%m%d')}")
+        t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_label_dt - timedelta(2), t_1_label_dt)
 
-    df["12_change"] = df["1_vov0"] - df["2_vov0"]
-    df["23_change"] = df["2_vov0"] - df["3_vov0"]
-    df["34_change"] = df["3_vov0"] - df["4_vov0"]
+        t_2_label_dt = t_1_label_dt - timedelta(1)
+        logger.info(f"VOV模型特征数据处理:t_2_label_future.label_datetime: {t_2_label_dt.strftime('%Y%m%d')}")
+        t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_label_dt - timedelta(1), t_2_label_dt)
 
-    feature_array = df[features_name].values
-    df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
-    label_array = df["label"].values
+        t_3_label_dt = t_1_label_dt - timedelta(2)
+        logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
+        t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
+        t_1_label_df = t_1_label_future.result()
+        t_2_label_df = t_2_label_future.result()
+        t_3_label_df = t_3_label_future.result()
 
-    return df, feature_array, label_array
+    return pd.concat([t_1_label_df, t_2_label_df, t_3_label_df], ignore_index=True)
 
 
 def _main():
     logger.info(f"XGB模型训练")
+    train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=2)))
+    trains_array = train_df[features_name].values
+    trains_label_array = train_df['label'].values
 
-    df, trains_array, trains_label_array = fetch_data((datetime.now() - timedelta(days=2)))
-    logger.info("特征获取完成,开始训练")
+    logger.info(f"特征获取完成,开始训练。 训练使用的数据量: {train_df.shape[0]}")
     model = xgb.XGBClassifier(
-        n_estimators=100,
+        n_estimators=1000,
         learning_rate=0.01,
         max_depth=5,
         min_child_weight=1,
@@ -132,22 +280,35 @@ def _main():
         random_state=2024,
         seed=2024,
     )
-    model.fit(trains_array, trains_label_array, verbose=True)
+    model.fit(trains_array, trains_label_array)
 
     logger.info("获取评测数据")
-    df_test, tests_array, _ = fetch_data(datetime.now() - timedelta(days=1))
-    y_pred = model.predict_proba(tests_array)[:, 1]
-    df_test["y_pred"] = y_pred
+    start_label_datetime = datetime.now() - timedelta(days=1)
+    feature_start_datetime = start_label_datetime - timedelta(1)
 
-    condition_choose = ((df_test['y_pred'] <= 0.2)
-                        # & ((df_test['1_vov0_分母'] > 50) | (df_test['2_vov0_分母'] > 50) | (df_test['3_vov0_分母'] > 50))
-                        & (df_test.index <= 10000)
-                        )
+    predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
+    tests_array = predict_df[features_name].values
+    y_pred = model.predict_proba(tests_array)[:, 1]
+    predict_df["y_pred"] = y_pred
+    condition_choose = (
+            (predict_df['y_pred'] <= 0.1) &
+            (
+                    (predict_df['4_vov0_分母'] > 50) |
+                    (predict_df['2_vov0_分母'] > 50) |
+                    (predict_df['3_vov0_分母'] > 50)
+            ) &
+            (
+                (predict_df['1_vov0'] - predict_df['2_vov0'] <= 0.1)
+            )
+    )
     profit_threshold = 0.3
-    condition_choose_real = condition_choose & (df_test['vov0'] <= profit_threshold)
-    df_test["condition_choose"] = condition_choose
-    df_test[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
-        "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False)
+    condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
+    predict_df["condition_choose"] = condition_choose
+    predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
+        "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
+        sep="\t",
+        index=False
+    )
 
     choose_bad = condition_choose.sum()
     choose_bad_real_bad = condition_choose_real.sum()
@@ -156,21 +317,21 @@ def _main():
         f"acc:{acc} "
         f"分子={choose_bad_real_bad} "
         f"分母={choose_bad} "
-        f"总视频数={df_test.size} "
+        f"总视频数={predict_df.shape[0]} "
         f"盈利计算标注vov0大于:{profit_threshold}"
     )
 
-    surface = df_test.loc[condition_choose, '曝光占比'].sum()
-    surface_income = df_test.loc[condition_choose_real, '曝光占比'].sum()
+    surface = predict_df.loc[condition_choose, '曝光占比'].sum()
+    surface_income = predict_df.loc[condition_choose_real, '曝光占比'].sum()
     logger.info(
         f"总影响面:{round(surface, 6)} "
         f"盈利影响面:{round(surface_income, 6)} "
         f"亏损影响面:{round(surface - surface_income, 6)}"
     )
 
-    df_test["profit_loss_value"] = df_test['分母'] * (df_test['vov0'] - profit_threshold)
-    profit_loss_value = df_test.loc[condition_choose, 'profit_loss_value'].sum()
-    profit_value = df_test.loc[condition_choose_real, 'profit_loss_value'].sum()
+    predict_df["profit_loss_value"] = predict_df['分母'] * (predict_df['vov0'] - profit_threshold)
+    profit_loss_value = predict_df.loc[condition_choose, 'profit_loss_value'].sum()
+    profit_value = predict_df.loc[condition_choose_real, 'profit_loss_value'].sum()
     logger.info(
         f"总盈亏:{round(profit_loss_value, 1)} "
         f"纯盈利:{round(profit_value, 1)} "
@@ -178,6 +339,9 @@ def _main():
         f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
     )
 
+    filtered_vid = predict_df.loc[condition_choose_real, 'vid'].unique()
+    print(f"要过滤掉的视频ID为: {filtered_vid}")
+
 
 if __name__ == '__main__':
     try: