123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import logging
- import pandas as pd
- # client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
- SEGMENT_BASE_PATH = "/Users/zhao/Desktop/tzld/XGB/segment_csv"
- logger = logging.getLogger(__name__)
- def read_predict_from_local_txt(txt_file) -> list:
- result = []
- with open(txt_file, "r") as f:
- for line in f.readlines():
- sp = line.replace("\n", "").split("\t")
- if len(sp) == 4:
- label = int(sp[0])
- cid = sp[3].split("_")[0]
- score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
- result.append({
- "label": label,
- "cid": cid,
- "score": score
- })
- return result
- def read_predict_from_hdfs(hdfs_path: str) -> list:
- if not hdfs_path.endswith("/"):
- hdfs_path += "/"
- result = []
- # for file in client.list(hdfs_path):
- # with client.read(hdfs_path + file) as reader:
- # with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
- # for line in gz_file.read().decode("utf-8").split("\n"):
- # split = line.split("\t")
- # if len(split) == 4:
- # continue
- # cid = split[3].split("_")[0]
- # label = int(split[0])
- # score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
- #
- # result.append({
- # "cid": cid,
- # "label": label,
- # "score": score
- # })
- #
- return result
- def read_cpa() -> pd.DataFrame:
- # 添加CPA
- cpa_target_df = pd.read_csv("/Users/zhao/Desktop/tzld/XGB/creative_package_target.csv")
- cpa_target_df = cpa_target_df[['creative_id', 'cpa_target']]
- cpa_target_df.rename(columns={'creative_id': 'cid'}, inplace=True)
- cpa_target_df['cid'] = cpa_target_df['cid'].astype(str)
- return cpa_target_df
- def read_calibration_file(filepath: str) -> list:
- segment = []
- with open(filepath, "r") as f:
- for line in f.readlines():
- s = line.split("\t")
- if len(s) != 2:
- continue
- score = s[0].split(",")
- min_score = float(score[0].replace("(", ""))
- max_score = float(score[1].replace("]", ""))
- alpha = float(s[1])
- segment.append((min_score, max_score, alpha))
- return segment
- def score_calibration(score, segment: list):
- for e in segment:
- if (score >= e[0]) & (score <= e[1]):
- return score * (1 + e[2])
- return score
- def read_and_calibration_predict(predict_path: str, is_hdfs=True) -> [pd.DataFrame]:
- """
- 读取评估结果,并进行校准
- """
- if is_hdfs:
- # 文件路径处理
- predicts = read_predict_from_hdfs(predict_path)
- else:
- predicts = read_predict_from_local_txt(predict_path)
- df = pd.DataFrame(predicts)
- # 每条曝光添加对应创意的CPA,并计算CPM
- cpa_target_df = read_cpa()
- df = pd.merge(df, cpa_target_df, on='cid', how='left')
- df['p_cpm'] = df['score'] * df['cpa_target'] * 1000
- df['t_cpm'] = df['score'] * df['cpa_target'] * 1000 * df['label']
- segment = read_calibration_file("/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt")
- # 生成校准后的分数并计算对应的p_cpm和t_cpm
- df['score_2'] = df['score'].apply(lambda x: score_calibration(x, segment))
- df['p_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000
- df['t_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 * df['label']
- # 按CID统计真实ctcvr和校准前后的平均模型分
- grouped_df = df.groupby("cid").agg(
- view=('cid', 'size'),
- conv=('label', 'sum'),
- score_avg=('score', lambda x: round(x.mean(), 6)),
- score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
- ).reset_index()
- grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
- return grouped_df
- def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
- old_group_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
- new_group_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
- # 字段重命名,和列过滤
- old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
- new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
- old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
- new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
- # 拼接CPA
- cpa_target_df = read_cpa()
- merged = (pd.merge(old_group_df, new_group_df, on='cid', how='left')
- .merge(cpa_target_df, on='cid', how='left'))
- # 计算与真实ctcvr的差异值
- merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
- merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
- merged['new_cpm'] = merged['new_score_avg'] * merged['cpa_target'] * 1000
- merged['old_cpm'] = merged['old_score_avg'] * merged['cpa_target'] * 1000
- # 计算校准后的模型分与ctcvr的差异值
- merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
- merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
- merged['new2_cpm'] = merged['new_score_2_avg'] * merged['cpa_target'] * 1000
- merged['old2_cpm'] = merged['old_score_2_avg'] * merged['cpa_target'] * 1000
- # 计算真实的CPM
- merged['true_cpm'] = merged['true_ctcvr'] * merged['cpa_target'] * 1000
- # 按照曝光排序,写入本地文件
- merged = merged.sort_values(by=['view'], ascending=False)
- merged = merged[[
- 'cid', 'view', "conv", "true_ctcvr", 'true_cpm',
- "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm",
- "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm",
- ]]
- # 根据文件名保存不同的格式
- if analyse_file.endswith(".csv"):
- merged.to_csv(analyse_file, index=False)
- else:
- with open(analyse_file, "w") as writer:
- writer.write(merged.to_string(index=False))
- print("0")
- if __name__ == '__main__':
- _main(
- f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241103_351_1000_1028_1102.txt",
- f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241104_351_1000_1028_1102.txt",
- "/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt",
- f"/Users/zhao/Desktop/tzld/XGB/20241104.csv"
- )
|