model_predict_analyse_20241115.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import gzip
  2. import os.path
  3. import pandas as pd
  4. from hdfs import InsecureClient
  5. client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
  6. SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
  7. PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
  8. def parse_predict_line(line: str) -> [bool, dict]:
  9. sp = line.replace("\n", "").split("\t")
  10. if len(sp) == 4:
  11. label = int(sp[0])
  12. cid = sp[3].split("_")[0]
  13. score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
  14. return True, {
  15. "label": label,
  16. "cid": cid,
  17. "score": score
  18. }
  19. return False, {}
  20. def read_predict_file(file_path: str) -> pd.DataFrame:
  21. result = []
  22. if file_path.startswith("/dw"):
  23. if not file_path.endswith("/"):
  24. file_path += "/"
  25. for file in client.list(file_path):
  26. with client.read(file_path + file) as reader:
  27. with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
  28. for line in gz_file.read().decode("utf-8").split("\n"):
  29. b, d = parse_predict_line(line)
  30. if b: result.append(d)
  31. else:
  32. with open(file_path, "r") as f:
  33. for line in f.readlines():
  34. b, d = parse_predict_line(line)
  35. if b: result.append(d)
  36. return pd.DataFrame(result)
  37. def calibration_file_save(df: pd.DataFrame, file_path: str):
  38. if file_path.startswith("/dw"):
  39. # 完整的分段文件保存
  40. with client.write(file_path, encoding='utf-8', overwrite=True) as writer:
  41. writer.write(df.to_csv(sep="\t", index=False))
  42. else:
  43. df.tocsv(file_path, sep="\t", index=False)
  44. def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
  45. """
  46. 本地保存一份评估结果, 计算AUC使用
  47. """
  48. d = {"old": old_df, "new": new_df}
  49. for key in d:
  50. df = d[key][['label', "score"]]
  51. df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
  52. df = d[key][['label', "score_2"]]
  53. df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
  54. def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str):
  55. if segment_file_path.startswith("/dw"):
  56. # 完整的分段文件保存
  57. with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
  58. writer.write(df.to_csv(sep="\t", index=False))
  59. else:
  60. df.tocsv(segment_file_path, sep="\t", index=False)
  61. def calc_calibration_diff_rate(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]:
  62. """
  63. 计算模型分的diff_rate
  64. """
  65. agg_df = predict_df_agg(df)
  66. condition = 'view > 1000 and diff_rate >= 0.1'
  67. save_full_calibration_file(agg_df, f"{PREDICT_CACHE_PATH}/{predict_basename}.txt")
  68. calibration = agg_df.query(condition)
  69. return calibration
  70. def get_predict_basename(predict_path) -> [str]:
  71. """
  72. 获取文件路径的最后一部分,作为与模型关联的文件名
  73. """
  74. predict_basename = os.path.basename(predict_path)
  75. if predict_basename.endswith("/"):
  76. predict_basename = predict_basename[:-1]
  77. return predict_basename
  78. def calc_calibration_score2(df: pd.DataFrame, calibration_df: pd.DataFrame) -> [pd.DataFrame]:
  79. calibration_df = calibration_df[['cid', 'diff_rate']]
  80. df = pd.merge(df, calibration_df, on='cid', how='left')
  81. df.fullna(0)
  82. df['score_2'] = df['score'] / (1 + df['diff_rate'])
  83. return df
  84. def predict_df_agg(df: pd.DataFrame) -> [pd.DataFrame]:
  85. # 基础聚合操作
  86. agg_operations = {
  87. 'view': ('cid', 'size'),
  88. 'conv': ('label', 'sum'),
  89. 'p_score_avg': ('score', lambda x: round(x.mean(), 6)),
  90. }
  91. # 如果存在 score_2 列,则增加相关聚合
  92. if "score_2" in df.columns:
  93. agg_operations['p_score_2_avg'] = ('score_2', lambda x: round(x.mean(), 6))
  94. grouped_df = df.groupby("cid").agg(**agg_operations).reset_index()
  95. grouped_df['t_ctcvr'] = grouped_df['conv'] / grouped_df['view']
  96. return grouped_df
  97. def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
  98. old_df = read_predict_file(old_predict_path)
  99. new_df = read_predict_file(new_predict_path)
  100. old_calibration_df = calc_calibration_diff_rate(old_df, get_predict_basename(old_predict_path))
  101. old_df = calc_calibration_score2(old_df, old_calibration_df)
  102. new_calibration_df = calc_calibration_diff_rate(new_df, get_predict_basename(new_predict_path))
  103. new_df = calc_calibration_score2(new_df, new_calibration_df)
  104. old_agg_df = predict_df_agg(old_df)
  105. new_agg_df = predict_df_agg(new_df)
  106. # 字段重命名,和列过滤
  107. old_agg_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
  108. new_agg_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
  109. old_group_df = old_agg_df[['cid', 'view', 'conv', 't_ctcvr', 'old_score_avg', 'old_score_2_avg']]
  110. new_group_df = new_agg_df[['cid', 'new_score_avg', 'new_score_2_avg']]
  111. merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
  112. # 计算与真实ctcvr的差异值
  113. merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  114. merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  115. # 计算校准后的模型分与ctcvr的差异值
  116. merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  117. merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  118. # 按照曝光排序,写入本地文件
  119. merged = merged.sort_values(by=['view'], ascending=False)
  120. merged = merged[[
  121. 'cid', 'view', "conv", "true_ctcvr",
  122. "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
  123. "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
  124. ]]
  125. # 根据文件名保存不同的格式
  126. if analyse_file.endswith(".csv"):
  127. merged.to_csv(analyse_file, index=False)
  128. else:
  129. with open(analyse_file, "w") as writer:
  130. writer.write(merged.to_string(index=False))
  131. print("0")
  132. if __name__ == '__main__':
  133. _main("", "", "", "")
  134. # parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
  135. # parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
  136. # parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
  137. # parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
  138. # parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
  139. # args = parser.parse_args()
  140. #
  141. # _main(
  142. # old_predict_path=args.old_predict_path,
  143. # new_predict_path=args.new_predict_path,
  144. # calibration_file=args.calibration_file,
  145. # analyse_file=args.analyse_file
  146. # )