# import argparse # import gzip # import os.path # from collections import OrderedDict # # import pandas as pd # from hdfs import InsecureClient # # client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark") # # SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file") # PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache") # # # def read_predict_from_local_txt(txt_file) -> list: # result = [] # with open(txt_file, "r") as f: # for line in f.readlines(): # sp = line.replace("\n", "").split("\t") # if len(sp) == 4: # label = int(sp[0]) # cid = sp[3].split("_")[0] # score = float(sp[2].replace("[", "").replace("]", "").split(",")[1]) # result.append({ # "label": label, # "cid": cid, # "score": score # }) # return result # # # def read_predict_from_hdfs(hdfs_path: str) -> list: # if not hdfs_path.endswith("/"): # hdfs_path += "/" # result = [] # for file in client.list(hdfs_path): # with client.read(hdfs_path + file) as reader: # with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file: # for line in gz_file.read().decode("utf-8").split("\n"): # split = line.split("\t") # if len(split) == 4: # cid = split[3].split("_")[0] # label = int(split[0]) # score = float(split[2].replace("[", "").replace("]", "").split(",")[1]) # result.append({ # "cid": cid, # "label": label, # "score": score # }) # # return result # # # def _segment_v1(scores, step): # bins = [] # for i in range(0, len(scores), int((len(scores) / step))): # if i == 0: # bins.append(0) # else: # bins.append(scores[i]) # bins.append(1) # return list(OrderedDict.fromkeys(bins)) # # # def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]: # sored_df = df.sort_values(by=['score']) # # 评估分数分段 # scores = sored_df['score'].values # # bins = _segment_v1(scores, step) # # # 等分分桶 # # split_indices = np.array_split(np.arange(len(scores)), step) # # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]] # # sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins) # # # 计算分段内分数的差异 # group_df = sored_df.groupby("score_segment", observed=True).agg( # segment_label_sum=('label', 'sum'), # segment_label_cnt=('label', 'count'), # segment_score_avg=('score', 'mean'), # ).reset_index() # group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt'] # group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0) # # # 完整的分段文件保存 # csv_data = group_df.to_csv(sep="\t", index=False) # with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer: # writer.write(csv_data) # # filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)] # filtered_df = filtered_df[['score_segment', 'segment_diff_rate']] # # 每条曝光数据添加对应分数的diff # merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left") # # merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0) # return merged_df, filtered_df # # # def read_and_calibration_predict(predict_path: str) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]: # """ # 读取评估结果,并进行校准 # """ # # 本地调试使用 # # predicts = read_predict_from_local_txt(predict_path) # predicts = read_predict_from_hdfs(predict_path) # df = pd.DataFrame(predicts) # # # 模型分分段计算与真实ctcvr的dff_rate # predict_basename = os.path.basename(predict_path) # if predict_basename.endswith("/"): # predict_basename = predict_basename[:-1] # df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100) # # # 生成校准后的分数 # df['score_2'] = df['score'] / (1 + df['segment_diff_rate']) # # # 按CID统计真实ctcvr和校准前后的平均模型分 # grouped_df = df.groupby("cid").agg( # view=('cid', 'size'), # conv=('label', 'sum'), # score_avg=('score', lambda x: round(x.mean(), 6)), # score_2_avg=('score_2', lambda x: round(x.mean(), 6)), # ).reset_index() # grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view'] # # return df, grouped_df, segment_df # # # def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame): # """ # 本地保存一份评估结果, 计算AUC使用 # """ # d = {"old": old_df, "new": new_df} # for key in d: # df = d[key][['label', "score"]] # df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False) # df = d[key][['label', "score_2"]] # df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False) # # # def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str): # old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path) # new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path) # # predict_local_save_for_auc(old_df, new_df) # # # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段 # new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False) # # # 字段重命名,和列过滤 # old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True) # new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True) # old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']] # new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']] # # merged = pd.merge(old_group_df, new_group_df, on='cid', how='left') # # # 计算与真实ctcvr的差异值 # merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # # # 计算校准后的模型分与ctcvr的差异值 # merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0) # # # 按照曝光排序,写入本地文件 # merged = merged.sort_values(by=['view'], ascending=False) # merged = merged[[ # 'cid', 'view', "conv", "true_ctcvr", # "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", # "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", # ]] # # # 根据文件名保存不同的格式 # if analyse_file.endswith(".csv"): # merged.to_csv(analyse_file, index=False) # else: # with open(analyse_file, "w") as writer: # writer.write(merged.to_string(index=False)) # print("0") # # # if __name__ == '__main__': # parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py") # parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果") # parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果") # parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径") # parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径") # args = parser.parse_args() # # _main( # old_predict_path=args.old_predict_path, # new_predict_path=args.new_predict_path, # calibration_file=args.calibration_file, # analyse_file=args.analyse_file # )