model_predict_analyse_20241101.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # import argparse
  2. # import gzip
  3. # import os.path
  4. # from collections import OrderedDict
  5. #
  6. # import pandas as pd
  7. # from hdfs import InsecureClient
  8. #
  9. # client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
  10. #
  11. # SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
  12. # PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
  13. #
  14. #
  15. # def read_predict_from_local_txt(txt_file) -> list:
  16. # result = []
  17. # with open(txt_file, "r") as f:
  18. # for line in f.readlines():
  19. # sp = line.replace("\n", "").split("\t")
  20. # if len(sp) == 4:
  21. # label = int(sp[0])
  22. # cid = sp[3].split("_")[0]
  23. # score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
  24. # result.append({
  25. # "label": label,
  26. # "cid": cid,
  27. # "score": score
  28. # })
  29. # return result
  30. #
  31. #
  32. # def read_predict_from_hdfs(hdfs_path: str) -> list:
  33. # if not hdfs_path.endswith("/"):
  34. # hdfs_path += "/"
  35. # result = []
  36. # for file in client.list(hdfs_path):
  37. # with client.read(hdfs_path + file) as reader:
  38. # with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
  39. # for line in gz_file.read().decode("utf-8").split("\n"):
  40. # split = line.split("\t")
  41. # if len(split) == 4:
  42. # cid = split[3].split("_")[0]
  43. # label = int(split[0])
  44. # score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
  45. # result.append({
  46. # "cid": cid,
  47. # "label": label,
  48. # "score": score
  49. # })
  50. #
  51. # return result
  52. #
  53. #
  54. # def _segment_v1(scores, step):
  55. # bins = []
  56. # for i in range(0, len(scores), int((len(scores) / step))):
  57. # if i == 0:
  58. # bins.append(0)
  59. # else:
  60. # bins.append(scores[i])
  61. # bins.append(1)
  62. # return list(OrderedDict.fromkeys(bins))
  63. #
  64. #
  65. # def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
  66. # sored_df = df.sort_values(by=['score'])
  67. # # 评估分数分段
  68. # scores = sored_df['score'].values
  69. #
  70. # bins = _segment_v1(scores, step)
  71. #
  72. # # 等分分桶
  73. # # split_indices = np.array_split(np.arange(len(scores)), step)
  74. # # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
  75. #
  76. # sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
  77. #
  78. # # 计算分段内分数的差异
  79. # group_df = sored_df.groupby("score_segment", observed=True).agg(
  80. # segment_label_sum=('label', 'sum'),
  81. # segment_label_cnt=('label', 'count'),
  82. # segment_score_avg=('score', 'mean'),
  83. # ).reset_index()
  84. # group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
  85. # group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
  86. #
  87. # # 完整的分段文件保存
  88. # csv_data = group_df.to_csv(sep="\t", index=False)
  89. # with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
  90. # writer.write(csv_data)
  91. #
  92. # filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
  93. # filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
  94. # # 每条曝光数据添加对应分数的diff
  95. # merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
  96. #
  97. # merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
  98. # return merged_df, filtered_df
  99. #
  100. #
  101. # def read_and_calibration_predict(predict_path: str) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
  102. # """
  103. # 读取评估结果,并进行校准
  104. # """
  105. # # 本地调试使用
  106. # # predicts = read_predict_from_local_txt(predict_path)
  107. # predicts = read_predict_from_hdfs(predict_path)
  108. # df = pd.DataFrame(predicts)
  109. #
  110. # # 模型分分段计算与真实ctcvr的dff_rate
  111. # predict_basename = os.path.basename(predict_path)
  112. # if predict_basename.endswith("/"):
  113. # predict_basename = predict_basename[:-1]
  114. # df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
  115. #
  116. # # 生成校准后的分数
  117. # df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
  118. #
  119. # # 按CID统计真实ctcvr和校准前后的平均模型分
  120. # grouped_df = df.groupby("cid").agg(
  121. # view=('cid', 'size'),
  122. # conv=('label', 'sum'),
  123. # score_avg=('score', lambda x: round(x.mean(), 6)),
  124. # score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
  125. # ).reset_index()
  126. # grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
  127. #
  128. # return df, grouped_df, segment_df
  129. #
  130. #
  131. # def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
  132. # """
  133. # 本地保存一份评估结果, 计算AUC使用
  134. # """
  135. # d = {"old": old_df, "new": new_df}
  136. # for key in d:
  137. # df = d[key][['label', "score"]]
  138. # df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
  139. # df = d[key][['label', "score_2"]]
  140. # df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
  141. #
  142. #
  143. # def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
  144. # old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
  145. # new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
  146. #
  147. # predict_local_save_for_auc(old_df, new_df)
  148. #
  149. # # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
  150. # new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
  151. #
  152. # # 字段重命名,和列过滤
  153. # old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
  154. # new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
  155. # old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
  156. # new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
  157. #
  158. # merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
  159. #
  160. # # 计算与真实ctcvr的差异值
  161. # merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  162. # merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  163. #
  164. # # 计算校准后的模型分与ctcvr的差异值
  165. # merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  166. # merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
  167. #
  168. # # 按照曝光排序,写入本地文件
  169. # merged = merged.sort_values(by=['view'], ascending=False)
  170. # merged = merged[[
  171. # 'cid', 'view', "conv", "true_ctcvr",
  172. # "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
  173. # "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
  174. # ]]
  175. #
  176. # # 根据文件名保存不同的格式
  177. # if analyse_file.endswith(".csv"):
  178. # merged.to_csv(analyse_file, index=False)
  179. # else:
  180. # with open(analyse_file, "w") as writer:
  181. # writer.write(merged.to_string(index=False))
  182. # print("0")
  183. #
  184. #
  185. # if __name__ == '__main__':
  186. # parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
  187. # parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
  188. # parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
  189. # parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
  190. # parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
  191. # args = parser.parse_args()
  192. #
  193. # _main(
  194. # old_predict_path=args.old_predict_path,
  195. # new_predict_path=args.new_predict_path,
  196. # calibration_file=args.calibration_file,
  197. # analyse_file=args.analyse_file
  198. # )