Selaa lähdekoodia

feat:添加模型分析脚本

zhaohaipeng 8 kuukautta sitten
vanhempi
commit
744bf59f1d
4 muutettua tiedostoa jossa 359 lisäystä ja 2 poistoa
  1. 0 0
      model/__init__.py
  2. 183 0
      model/model_predict_analyse.py
  3. 172 0
      model/segment_calibration_check.py
  4. 4 2
      vov/data_download.py

+ 0 - 0
model/__init__.py


+ 183 - 0
model/model_predict_analyse.py

@@ -0,0 +1,183 @@
+import argparse
+import gzip
+import os.path
+from collections import OrderedDict
+
+import pandas as pd
+from hdfs import InsecureClient
+
+client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
+
+SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration_file"
+
+
+def read_predict_from_local_txt(txt_file) -> list:
+    result = []
+    with open(txt_file, "r") as f:
+        for line in f.readlines():
+            sp = line.replace("\n", "").split("\t")
+            if len(sp) == 4:
+                label = int(sp[0])
+                cid = sp[3].split("_")[0]
+                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+                result.append({
+                    "label": label,
+                    "cid": cid,
+                    "score": score
+                })
+    return result
+
+
+def read_predict_from_hdfs(hdfs_path: str) -> list:
+    if not hdfs_path.endswith("/"):
+        hdfs_path += "/"
+    result = []
+    for file in client.list(hdfs_path):
+        with client.read(hdfs_path + file) as reader:
+            with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
+                for line in gz_file.read().decode("utf-8").split("\n"):
+                    split = line.split("\t")
+                    if len(split) == 4:
+                        cid = split[3].split("_")[0]
+                        label = int(split[0])
+                        score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
+                        result.append({
+                            "cid": cid,
+                            "label": label,
+                            "score": score
+                        })
+
+    return result
+
+
+def _segment_v1(scores, step):
+    bins = []
+    for i in range(0, len(scores), int((len(scores) / step))):
+        if i == 0:
+            bins.append(0)
+        else:
+            bins.append(scores[i])
+    bins.append(1)
+    return list(OrderedDict.fromkeys(bins))
+
+
+def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
+    sored_df = df.sort_values(by=['score'])
+    # 评估分数分段
+    scores = sored_df['score'].values
+
+    bins = _segment_v1(scores, step)
+
+    # 等分分桶
+    # split_indices = np.array_split(np.arange(len(scores)), step)
+    # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
+
+    sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
+
+    # 计算分段内分数的差异
+    group_df = sored_df.groupby("score_segment", observed=True).agg(
+        segment_label_sum=('label', 'sum'),
+        segment_label_cnt=('label', 'count'),
+        segment_score_avg=('score', 'mean'),
+    ).reset_index()
+    group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
+    group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
+
+    # 完整的分段文件保存
+    csv_data = group_df.to_csv(sep="\t", index=False)
+    with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
+        writer.write(csv_data)
+
+    filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
+    filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
+    # 每条曝光数据添加对应分数的diff
+    merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
+
+    merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
+    return merged_df, filtered_df
+
+
+def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
+    """
+    读取评估结果,并进行校准
+    """
+    # 本地调试使用
+    # predicts = read_predict_from_local_txt(predict_path)
+    predicts = read_predict_from_hdfs(predict_path)
+    df = pd.DataFrame(predicts)
+
+    # 模型分分段计算与真实ctcvr的dff_rate
+    predict_basename = os.path.basename(predict_path)
+    if predict_basename.endswith("/"):
+        predict_basename = predict_basename[:-1]
+    df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
+
+    # 生成校准后的分数
+    df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
+
+    # 按CID统计真实ctcvr和校准前后的平均模型分
+    grouped_df = df.groupby("cid").agg(
+        view=('cid', 'size'),
+        conv=('label', 'sum'),
+        score_avg=('score', lambda x: round(x.mean(), 6)),
+        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
+    ).reset_index()
+    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
+
+    return grouped_df, segment_df
+
+
+def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
+    old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
+    new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
+
+    # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
+    new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
+
+    # 字段重命名,和列过滤
+    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
+
+    merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
+
+    # 计算与真实ctcvr的差异值
+    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+
+    # 计算校准后的模型分与ctcvr的差异值
+    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+
+    # 按照曝光排序,写入本地文件
+    merged = merged.sort_values(by=['view'], ascending=False)
+    merged = merged[[
+        'cid', 'view', "conv", "true_ctcvr",
+        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
+        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
+    ]]
+
+    # 根据文件名保存不同的格式
+    if analyse_file.endswith(".csv"):
+        merged.to_csv(analyse_file, index=False)
+    else:
+        with open(analyse_file, "w") as writer:
+            writer.write(merged.to_string(index=False))
+    print("0")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description="model_predict_analyse.py")
+    parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
+    parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
+    parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
+    parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
+    args = parser.parse_args()
+
+    _main(
+        old_predict_path=args.old_predict_path,
+        new_predict_path=args.new_predict_path,
+        calibration_file=args.calibration_file,
+        analyse_file=args.analyse_file
+    )

+ 172 - 0
model/segment_calibration_check.py

@@ -0,0 +1,172 @@
+import logging
+
+import pandas as pd
+
+# client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
+
+SEGMENT_BASE_PATH = "/Users/zhao/Desktop/tzld/XGB/segment_csv"
+logger = logging.getLogger(__name__)
+
+
+def read_predict_from_local_txt(txt_file) -> list:
+    result = []
+    with open(txt_file, "r") as f:
+        for line in f.readlines():
+            sp = line.replace("\n", "").split("\t")
+            if len(sp) == 4:
+                label = int(sp[0])
+                cid = sp[3].split("_")[0]
+                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+                result.append({
+                    "label": label,
+                    "cid": cid,
+                    "score": score
+                })
+    return result
+
+
+def read_predict_from_hdfs(hdfs_path: str) -> list:
+    if not hdfs_path.endswith("/"):
+        hdfs_path += "/"
+    result = []
+    # for file in client.list(hdfs_path):
+    #     with client.read(hdfs_path + file) as reader:
+    #         with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
+    #             for line in gz_file.read().decode("utf-8").split("\n"):
+    #                 split = line.split("\t")
+    #                 if len(split) == 4:
+    #                     continue
+    #                 cid = split[3].split("_")[0]
+    #                 label = int(split[0])
+    #                 score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
+    #
+    #                 result.append({
+    #                     "cid": cid,
+    #                     "label": label,
+    #                     "score": score
+    #                 })
+    #
+    return result
+
+
+def read_cpa() -> pd.DataFrame:
+    # 添加CPA
+    cpa_target_df = pd.read_csv("/Users/zhao/Desktop/tzld/XGB/creative_package_target.csv")
+    cpa_target_df = cpa_target_df[['creative_id', 'cpa_target']]
+    cpa_target_df.rename(columns={'creative_id': 'cid'}, inplace=True)
+    cpa_target_df['cid'] = cpa_target_df['cid'].astype(str)
+
+    return cpa_target_df
+
+
+def read_calibration_file(filepath: str) -> list:
+    segment = []
+    with open(filepath, "r") as f:
+        for line in f.readlines():
+            s = line.split("\t")
+            if len(s) != 2:
+                continue
+
+            score = s[0].split(",")
+            min_score = float(score[0].replace("(", ""))
+            max_score = float(score[1].replace("]", ""))
+            alpha = float(s[1])
+            segment.append((min_score, max_score, alpha))
+    return segment
+
+
+def score_calibration(score, segment: list):
+    for e in segment:
+        if (score >= e[0]) & (score <= e[1]):
+            return score * (1 + e[2])
+    return score
+
+
+def read_and_calibration_predict(predict_path: str, is_hdfs=True) -> [pd.DataFrame]:
+    """
+    读取评估结果,并进行校准
+    """
+    if is_hdfs:
+        # 文件路径处理
+        predicts = read_predict_from_hdfs(predict_path)
+    else:
+        predicts = read_predict_from_local_txt(predict_path)
+    df = pd.DataFrame(predicts)
+
+    # 每条曝光添加对应创意的CPA,并计算CPM
+    cpa_target_df = read_cpa()
+    df = pd.merge(df, cpa_target_df, on='cid', how='left')
+    df['p_cpm'] = df['score'] * df['cpa_target'] * 1000
+    df['t_cpm'] = df['score'] * df['cpa_target'] * 1000 * df['label']
+
+    segment = read_calibration_file("/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt")
+    # 生成校准后的分数并计算对应的p_cpm和t_cpm
+    df['score_2'] = df['score'].apply(lambda x: score_calibration(x, segment))
+    df['p_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000
+    df['t_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 * df['label']
+
+    # 按CID统计真实ctcvr和校准前后的平均模型分
+    grouped_df = df.groupby("cid").agg(
+        view=('cid', 'size'),
+        conv=('label', 'sum'),
+        score_avg=('score', lambda x: round(x.mean(), 6)),
+        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
+    ).reset_index()
+    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
+
+    return grouped_df
+
+
+def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
+    old_group_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
+    new_group_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
+
+    # 字段重命名,和列过滤
+    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
+
+    # 拼接CPA
+    cpa_target_df = read_cpa()
+    merged = (pd.merge(old_group_df, new_group_df, on='cid', how='left')
+              .merge(cpa_target_df, on='cid', how='left'))
+
+    # 计算与真实ctcvr的差异值
+    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged['new_cpm'] = merged['new_score_avg'] * merged['cpa_target'] * 1000
+    merged['old_cpm'] = merged['old_score_avg'] * merged['cpa_target'] * 1000
+
+    # 计算校准后的模型分与ctcvr的差异值
+    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged['new2_cpm'] = merged['new_score_2_avg'] * merged['cpa_target'] * 1000
+    merged['old2_cpm'] = merged['old_score_2_avg'] * merged['cpa_target'] * 1000
+
+    # 计算真实的CPM
+    merged['true_cpm'] = merged['true_ctcvr'] * merged['cpa_target'] * 1000
+    # 按照曝光排序,写入本地文件
+    merged = merged.sort_values(by=['view'], ascending=False)
+    merged = merged[[
+        'cid', 'view', "conv", "true_ctcvr", 'true_cpm',
+        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm",
+        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm",
+    ]]
+
+    # 根据文件名保存不同的格式
+    if analyse_file.endswith(".csv"):
+        merged.to_csv(analyse_file, index=False)
+    else:
+        with open(analyse_file, "w") as writer:
+            writer.write(merged.to_string(index=False))
+    print("0")
+
+
+if __name__ == '__main__':
+    _main(
+        f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241103_351_1000_1028_1102.txt",
+        f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241104_351_1000_1028_1102.txt",
+        "/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt",
+        f"/Users/zhao/Desktop/tzld/XGB/20241104.csv"
+    )

+ 4 - 2
vov/data_download.py

@@ -1,12 +1,14 @@
 from concurrent.futures import ThreadPoolExecutor, as_completed
+from datetime import timedelta
 
 import pandas as pd
+from odps.src.types_c import datetime
 
 from client import ODPSClient
 
 odps_client = ODPSClient.ODPSClient()
 
-dt_list = ['20241029']
+dt_list = ["20241030", "20241031", "20241101", "20241102"]
 hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
            "18", "19", "20", "21", "22", "23"]
 
@@ -17,7 +19,7 @@ VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
 def read_odps(dt: str, hh: str):
     # 读取SQL文件
     sql = ""
-    with open(f"{VOV_BASE_PATH}/sql/vov排序_552_562.sql", "r") as f:
+    with open(f"{VOV_BASE_PATH}/sql/vovh24_feature.sql", "r") as f:
         sql = f.read()
 
     real_sql = (sql.replace("${bizdate}", dt)