import argparse import gzip import os.path import pandas as pd from hdfs import InsecureClient client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark") SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_model_attachment/score_calibration_file") PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache") def parse_predict_line(line: str) -> [bool, dict]: sp = line.replace("\n", "").split("\t") if len(sp) == 4: label = int(sp[0]) cid = sp[3].split("_")[0] score = float(sp[2].replace("[", "").replace("]", "").split(",")[1]) return True, { "label": label, "cid": cid, "score": score } return False, {} def read_predict_file(file_path: str) -> pd.DataFrame: result = [] if file_path.startswith("/dw"): if not file_path.endswith("/"): file_path += "/" for file in client.list(file_path): with client.read(file_path + file) as reader: with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file: for line in gz_file.read().decode("utf-8").split("\n"): b, d = parse_predict_line(line) if b: result.append(d) else: with open(file_path, "r") as f: for line in f.readlines(): b, d = parse_predict_line(line) if b: result.append(d) return pd.DataFrame(result) def calibration_file_save(df: pd.DataFrame, file_path: str): if file_path.startswith("/dw"): # 完整的分段文件保存 with client.write(file_path, encoding='utf-8', overwrite=True) as writer: writer.write(df.to_csv(sep="\t", index=False)) else: df.tocsv(file_path, sep="\t", index=False) def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame): """ 本地保存一份评估结果, 计算AUC使用 """ d = {"old": old_df, "new": new_df} for key in d: df = d[key] if 'score' in df.columns: score_df = df[['label', "score"]] score_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False) if 'score_2' in df.columns: score_2_df = d[key][['label', "score_2"]] score_2_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False) def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str): if segment_file_path.startswith("/dw"): # 完整的分段文件保存 with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer: writer.write(df.to_csv(sep="\t", index=False)) else: df.to_csv(segment_file_path, sep="\t", index=False) def get_predict_calibration_file(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]: """ 计算模型分的diff_rate """ agg_df = predict_df_agg(df) agg_df['diff_rate'] = (agg_df['score_avg'] / agg_df['true_ctcvr'] - 1).mask(agg_df['true_ctcvr'] == 0, 0) condition = 'view > 1000 and diff_rate >= 0.2' save_full_calibration_file(agg_df, f"{SEGMENT_BASE_PATH}/{predict_basename}.txt") calibration = agg_df.query(condition) return calibration def get_predict_basename(predict_path) -> [str]: """ 获取文件路径的最后一部分,作为与模型关联的文件名 """ predict_basename = os.path.basename(predict_path) if predict_basename.endswith("/"): predict_basename = predict_basename[:-1] return predict_basename def calc_calibration_score2(df: pd.DataFrame, calibration_df: pd.DataFrame) -> [pd.DataFrame]: calibration_df = calibration_df[['cid', 'diff_rate']] df = pd.merge(df, calibration_df, on='cid', how='left').fillna(0) df['score_2'] = df['score'] / (1 + df['diff_rate']) return df def predict_df_agg(df: pd.DataFrame) -> [pd.DataFrame]: # 基础聚合操作 agg_operations = { 'view': ('cid', 'size'), 'conv': ('label', 'sum'), 'score_avg': ('score', lambda x: round(x.mean(), 6)), } # 如果存在 score_2 列,则增加相关聚合 if "score_2" in df.columns: agg_operations['score_2_avg'] = ('score_2', lambda x: round(x.mean(), 6)) grouped_df = df.groupby("cid").agg(**agg_operations).reset_index() grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view'] return grouped_df def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str): old_df = read_predict_file(old_predict_path) new_df = read_predict_file(new_predict_path) old_calibration_df = get_predict_calibration_file(old_df, get_predict_basename(old_predict_path)) old_df = calc_calibration_score2(old_df, old_calibration_df) new_calibration_df = get_predict_calibration_file(new_df, get_predict_basename(new_predict_path)) new_df = calc_calibration_score2(new_df, new_calibration_df) # 本地保存label、score以及校准后的score,用于计算AUC等信息 predict_local_save_for_auc(old_df, new_df) # 新模型校准文件保存本地,用于同步OSS new_calibration_df[['cid', 'diff_rate']].to_csv(calibration_file, sep="\t", index=False, header=False) old_agg_df = predict_df_agg(old_df) new_agg_df = predict_df_agg(new_df) # 字段重命名,和列过滤 old_agg_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True) new_agg_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True) old_group_df = old_agg_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']] new_group_df = new_agg_df[['cid', 'new_score_avg', 'new_score_2_avg']] merged = pd.merge(old_group_df, new_group_df, on='cid', how='left') # 计算与真实ctcvr的差异值 merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # 计算校准后的模型分与ctcvr的差异值 merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # 按照曝光排序,写入本地文件 merged = merged.sort_values(by=['view'], ascending=False) merged = merged[[ 'cid', 'view', "conv", "true_ctcvr", "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", ]] # 根据文件名保存不同的格式 if analyse_file.endswith(".csv"): merged.to_csv(analyse_file, index=False) else: with open(analyse_file, "w") as writer: writer.write(merged.to_string(index=False)) print("0") if __name__ == '__main__': parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py") parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果") parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果") parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径") parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径") args = parser.parse_args() _main( old_predict_path=args.old_predict_path, new_predict_path=args.new_predict_path, calibration_file=args.calibration_file, analyse_file=args.analyse_file )