segment_calibration_check.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import logging
  2. import pandas as pd
  3. # client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
  4. SEGMENT_BASE_PATH = "/Users/zhao/Desktop/tzld/XGB/segment_csv"
  5. logger = logging.getLogger(__name__)
  6. def read_predict_from_local_txt(txt_file) -> list:
  7. result = []
  8. with open(txt_file, "r") as f:
  9. for line in f.readlines():
  10. sp = line.replace("\n", "").split("\t")
  11. if len(sp) == 4:
  12. label = int(sp[0])
  13. cid = sp[3].split("_")[0]
  14. score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
  15. result.append({
  16. "label": label,
  17. "cid": cid,
  18. "score": score
  19. })
  20. return result
  21. def read_predict_from_hdfs(hdfs_path: str) -> list:
  22. if not hdfs_path.endswith("/"):
  23. hdfs_path += "/"
  24. result = []
  25. # for file in client.list(hdfs_path):
  26. # with client.read(hdfs_path + file) as reader:
  27. # with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
  28. # for line in gz_file.read().decode("utf-8").split("\n"):
  29. # split = line.split("\t")
  30. # if len(split) == 4:
  31. # continue
  32. # cid = split[3].split("_")[0]
  33. # label = int(split[0])
  34. # score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
  35. #
  36. # result.append({
  37. # "cid": cid,
  38. # "label": label,
  39. # "score": score
  40. # })
  41. #
  42. return result
  43. def read_cpa() -> pd.DataFrame:
  44. # 添加CPA
  45. cpa_target_df = pd.read_csv("/Users/zhao/Desktop/tzld/XGB/creative_package_target.csv")
  46. cpa_target_df = cpa_target_df[['creative_id', 'cpa_target']]
  47. cpa_target_df.rename(columns={'creative_id': 'cid'}, inplace=True)
  48. cpa_target_df['cid'] = cpa_target_df['cid'].astype(str)
  49. return cpa_target_df
  50. def read_calibration_file(filepath: str) -> list:
  51. segment = []
  52. with open(filepath, "r") as f:
  53. for line in f.readlines():
  54. s = line.split("\t")
  55. if len(s) != 2:
  56. continue
  57. score = s[0].split(",")
  58. min_score = float(score[0].replace("(", ""))
  59. max_score = float(score[1].replace("]", ""))
  60. alpha = float(s[1])
  61. segment.append((min_score, max_score, alpha))
  62. return segment
  63. def score_calibration(score, segment: list):
  64. for e in segment:
  65. if (score >= e[0]) & (score <= e[1]):
  66. return score * (1 + e[2])
  67. return score
  68. def read_and_calibration_predict(predict_path: str, is_hdfs=True) -> [pd.DataFrame]:
  69. """
  70. 读取评估结果,并进行校准
  71. """
  72. if is_hdfs:
  73. # 文件路径处理
  74. predicts = read_predict_from_hdfs(predict_path)
  75. else:
  76. predicts = read_predict_from_local_txt(predict_path)
  77. df = pd.DataFrame(predicts)
  78. # 每条曝光添加对应创意的CPA,并计算CPM
  79. cpa_target_df = read_cpa()
  80. df = pd.merge(df, cpa_target_df, on='cid', how='left')
  81. df['p_cpm'] = df['score'] * df['cpa_target'] * 1000
  82. df['t_cpm'] = df['score'] * df['cpa_target'] * 1000 * df['label']
  83. segment = read_calibration_file("/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt")
  84. # 生成校准后的分数并计算对应的p_cpm和t_cpm
  85. df['score_2'] = df['score'].apply(lambda x: score_calibration(x, segment))
  86. df['p_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000
  87. df['t_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 * df['label']
  88. # 按CID统计真实ctcvr和校准前后的平均模型分
  89. grouped_df = df.groupby("cid").agg(
  90. view=('cid', 'size'),
  91. conv=('label', 'sum'),
  92. score_avg=('score', lambda x: round(x.mean(), 6)),
  93. score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
  94. ).reset_index()
  95. grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
  96. return grouped_df
  97. def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
  98. old_group_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
  99. new_group_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
  100. # 字段重命名,和列过滤
  101. old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
  102. new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
  103. old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
  104. new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
  105. # 拼接CPA
  106. cpa_target_df = read_cpa()
  107. merged = (pd.merge(old_group_df, new_group_df, on='cid', how='left')
  108. .merge(cpa_target_df, on='cid', how='left'))
  109. # 计算与真实ctcvr的差异值
  110. merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  111. merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  112. merged['new_cpm'] = merged['new_score_avg'] * merged['cpa_target'] * 1000
  113. merged['old_cpm'] = merged['old_score_avg'] * merged['cpa_target'] * 1000
  114. # 计算校准后的模型分与ctcvr的差异值
  115. merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  116. merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  117. merged['new2_cpm'] = merged['new_score_2_avg'] * merged['cpa_target'] * 1000
  118. merged['old2_cpm'] = merged['old_score_2_avg'] * merged['cpa_target'] * 1000
  119. # 计算真实的CPM
  120. merged['true_cpm'] = merged['true_ctcvr'] * merged['cpa_target'] * 1000
  121. # 按照曝光排序,写入本地文件
  122. merged = merged.sort_values(by=['view'], ascending=False)
  123. merged = merged[[
  124. 'cid', 'view', "conv", "true_ctcvr", 'true_cpm',
  125. "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm",
  126. "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm",
  127. ]]
  128. # 根据文件名保存不同的格式
  129. if analyse_file.endswith(".csv"):
  130. merged.to_csv(analyse_file, index=False)
  131. else:
  132. with open(analyse_file, "w") as writer:
  133. writer.write(merged.to_string(index=False))
  134. print("0")
  135. if __name__ == '__main__':
  136. _main(
  137. f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241103_351_1000_1028_1102.txt",
  138. f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241104_351_1000_1028_1102.txt",
  139. "/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt",
  140. f"/Users/zhao/Desktop/tzld/XGB/20241104.csv"
  141. )