import logging import pandas as pd # client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark") SEGMENT_BASE_PATH = "/Users/zhao/Desktop/tzld/XGB/segment_csv" logger = logging.getLogger(__name__) def read_predict_from_local_txt(txt_file) -> list: result = [] with open(txt_file, "r") as f: for line in f.readlines(): sp = line.replace("\n", "").split("\t") if len(sp) == 4: label = int(sp[0]) cid = sp[3].split("_")[0] score = float(sp[2].replace("[", "").replace("]", "").split(",")[1]) result.append({ "label": label, "cid": cid, "score": score }) return result def read_predict_from_hdfs(hdfs_path: str) -> list: if not hdfs_path.endswith("/"): hdfs_path += "/" result = [] # for file in client.list(hdfs_path): # with client.read(hdfs_path + file) as reader: # with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file: # for line in gz_file.read().decode("utf-8").split("\n"): # split = line.split("\t") # if len(split) == 4: # continue # cid = split[3].split("_")[0] # label = int(split[0]) # score = float(split[2].replace("[", "").replace("]", "").split(",")[1]) # # result.append({ # "cid": cid, # "label": label, # "score": score # }) # return result def read_cpa() -> pd.DataFrame: # 添加CPA cpa_target_df = pd.read_csv("/Users/zhao/Desktop/tzld/XGB/creative_package_target.csv") cpa_target_df = cpa_target_df[['creative_id', 'cpa_target']] cpa_target_df.rename(columns={'creative_id': 'cid'}, inplace=True) cpa_target_df['cid'] = cpa_target_df['cid'].astype(str) return cpa_target_df def read_calibration_file(filepath: str) -> list: segment = [] with open(filepath, "r") as f: for line in f.readlines(): s = line.split("\t") if len(s) != 2: continue score = s[0].split(",") min_score = float(score[0].replace("(", "")) max_score = float(score[1].replace("]", "")) alpha = float(s[1]) segment.append((min_score, max_score, alpha)) return segment def score_calibration(score, segment: list): for e in segment: if (score >= e[0]) & (score <= e[1]): return score * (1 + e[2]) return score def read_and_calibration_predict(predict_path: str, is_hdfs=True) -> [pd.DataFrame]: """ 读取评估结果,并进行校准 """ if is_hdfs: # 文件路径处理 predicts = read_predict_from_hdfs(predict_path) else: predicts = read_predict_from_local_txt(predict_path) df = pd.DataFrame(predicts) # 每条曝光添加对应创意的CPA,并计算CPM cpa_target_df = read_cpa() df = pd.merge(df, cpa_target_df, on='cid', how='left') df['p_cpm'] = df['score'] * df['cpa_target'] * 1000 df['t_cpm'] = df['score'] * df['cpa_target'] * 1000 * df['label'] segment = read_calibration_file("/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt") # 生成校准后的分数并计算对应的p_cpm和t_cpm df['score_2'] = df['score'].apply(lambda x: score_calibration(x, segment)) df['p_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 df['t_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 * df['label'] # 按CID统计真实ctcvr和校准前后的平均模型分 grouped_df = df.groupby("cid").agg( view=('cid', 'size'), conv=('label', 'sum'), score_avg=('score', lambda x: round(x.mean(), 6)), score_2_avg=('score_2', lambda x: round(x.mean(), 6)), ).reset_index() grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view'] return grouped_df def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str): old_group_df = read_and_calibration_predict(old_predict_path, is_hdfs=False) new_group_df = read_and_calibration_predict(new_predict_path, is_hdfs=False) # 字段重命名,和列过滤 old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True) new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True) old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']] new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']] # 拼接CPA cpa_target_df = read_cpa() merged = (pd.merge(old_group_df, new_group_df, on='cid', how='left') .merge(cpa_target_df, on='cid', how='left')) # 计算与真实ctcvr的差异值 merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged['new_cpm'] = merged['new_score_avg'] * merged['cpa_target'] * 1000 merged['old_cpm'] = merged['old_score_avg'] * merged['cpa_target'] * 1000 # 计算校准后的模型分与ctcvr的差异值 merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) merged['new2_cpm'] = merged['new_score_2_avg'] * merged['cpa_target'] * 1000 merged['old2_cpm'] = merged['old_score_2_avg'] * merged['cpa_target'] * 1000 # 计算真实的CPM merged['true_cpm'] = merged['true_ctcvr'] * merged['cpa_target'] * 1000 # 按照曝光排序,写入本地文件 merged = merged.sort_values(by=['view'], ascending=False) merged = merged[[ 'cid', 'view', "conv", "true_ctcvr", 'true_cpm', "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm", "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm", ]] # 根据文件名保存不同的格式 if analyse_file.endswith(".csv"): merged.to_csv(analyse_file, index=False) else: with open(analyse_file, "w") as writer: writer.write(merged.to_string(index=False)) print("0") if __name__ == '__main__': _main( f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241103_351_1000_1028_1102.txt", f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241104_351_1000_1028_1102.txt", "/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt", f"/Users/zhao/Desktop/tzld/XGB/20241104.csv" )