model_predict_analyse.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import argparse
  2. import gzip
  3. import os.path
  4. from collections import OrderedDict
  5. import pandas as pd
  6. import numpy as np
  7. from hdfs import InsecureClient
  8. client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
  9. SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration"
  10. def read_predict_from_local_txt(txt_file) -> list:
  11. result = []
  12. with open(txt_file, "r") as f:
  13. for line in f.readlines():
  14. sp = line.replace("\n", "").split("\t")
  15. if len(sp) == 4:
  16. label = int(sp[0])
  17. cid = sp[3].split("_")[0]
  18. score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
  19. result.append({
  20. "label": label,
  21. "cid": cid,
  22. "score": score
  23. })
  24. return result
  25. def read_predict_from_hdfs(hdfs_path: str) -> list:
  26. if not hdfs_path.endswith("/"):
  27. hdfs_path += "/"
  28. result = []
  29. for file in client.list(hdfs_path):
  30. with client.read(hdfs_path + file) as reader:
  31. with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
  32. for line in gz_file.read().decode("utf-8").split("\n"):
  33. split = line.split("\t")
  34. if len(split) == 4:
  35. cid = split[3].split("_")[0]
  36. label = int(split[0])
  37. score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
  38. result.append({
  39. "cid": cid,
  40. "label": label,
  41. "score": score
  42. })
  43. return result
  44. def _segment_v1(scores, step):
  45. bins = []
  46. for i in range(0, len(scores), int((len(scores) / step))):
  47. if i == 0:
  48. bins.append(0)
  49. else:
  50. bins.append(scores[i])
  51. bins.append(1)
  52. return list(OrderedDict.fromkeys(bins))
  53. def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
  54. sored_df = df.sort_values(by=['score'])
  55. # 评估分数分段
  56. scores = sored_df['score'].values
  57. bins = _segment_v1(scores, step)
  58. # 等分分桶
  59. # split_indices = np.array_split(np.arange(len(scores)), step)
  60. # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
  61. sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
  62. # 计算分段内分数的差异
  63. group_df = sored_df.groupby("score_segment", observed=True).agg(
  64. segment_label_sum=('label', 'sum'),
  65. segment_label_cnt=('label', 'count'),
  66. segment_score_avg=('score', 'mean'),
  67. ).reset_index()
  68. group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
  69. group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
  70. # 完整的分段文件保存
  71. group_df.to_csv(segment_file_path, sep="\t", index=False)
  72. filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
  73. filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
  74. # 每条曝光数据添加对应分数的diff
  75. merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
  76. merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
  77. return merged_df, filtered_df
  78. def read_and_calibration_predict(predict_path: str, is_hdfs=True, step=100) -> [pd.DataFrame, pd.DataFrame]:
  79. """
  80. 读取评估结果,并进行校准
  81. """
  82. if is_hdfs:
  83. # 文件路径处理
  84. predicts = read_predict_from_hdfs(predict_path)
  85. else:
  86. predicts = read_predict_from_local_txt(predict_path)
  87. df = pd.DataFrame(predicts)
  88. # 模型分分段计算与真实ctcvr的dff_rate
  89. predict_basename = os.path.basename(predict_path)
  90. if predict_basename.endswith("/"):
  91. predict_basename = predict_basename[:-1]
  92. df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}", step=100)
  93. # 生成校准后的分数
  94. df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
  95. # 按CID统计真实ctcvr和校准前后的平均模型分
  96. grouped_df = df.groupby("cid").agg(
  97. view=('cid', 'size'),
  98. conv=('label', 'sum'),
  99. score_avg=('score', lambda x: round(x.mean(), 6)),
  100. score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
  101. ).reset_index()
  102. grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
  103. return grouped_df, segment_df
  104. def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
  105. old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path, is_hdfs=True)
  106. new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path, is_hdfs=True)
  107. # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
  108. new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
  109. # 字段重命名,和列过滤
  110. old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
  111. new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
  112. old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
  113. new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
  114. merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
  115. # 计算与真实ctcvr的差异值
  116. merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  117. merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  118. # 计算校准后的模型分与ctcvr的差异值
  119. merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  120. merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  121. # 按照曝光排序,写入本地文件
  122. merged = merged.sort_values(by=['view'], ascending=False)
  123. merged = merged[[
  124. 'cid', 'view', "conv", "true_ctcvr",
  125. "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
  126. "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
  127. ]]
  128. # 根据文件名保存不同的格式
  129. if analyse_file.endswith(".csv"):
  130. merged.to_csv(analyse_file, index=False)
  131. else:
  132. with open(analyse_file, "w") as writer:
  133. writer.write(merged.to_string(index=False))
  134. print("0")
  135. if __name__ == '__main__':
  136. parser = argparse.ArgumentParser(description="model_predict_analyse.py")
  137. parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
  138. parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
  139. parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
  140. parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
  141. args = parser.parse_args()
  142. _main(
  143. old_predict_path=args.old_predict_path,
  144. new_predict_path=args.new_predict_path,
  145. calibration_file=args.calibration_file,
  146. analyse_file=args.analyse_file
  147. )