Quellcode durchsuchen

feat:添加模型分本地保存逻辑

zhaohaipeng vor 5 Monaten
Ursprung
Commit
95ef011c87

+ 2 - 59
ad/02_ad_model_update_test.sh

@@ -8,62 +8,5 @@ export JAVA_HOME=/usr/lib/jvm/java-1.8.0
 sh_path=$(cd $(dirname $0); pwd)
 source ${sh_path}/00_common.sh
 
-# 评测结果保存路径,后续需要根据此文件评估是否要更新模型
-predict_analyse_file_path=/root/zhaohp/XGB/predict_analyse_file/20241105_351_1000_analyse.txt
-
-# 保存模型评估的分析结果
-old_incr_rate_avg=0
-new_incr_rate_avg=0
-
-top10_msg=""
-
-
-calc_model_predict() {
-  local count=0
-  local max_line=10
-  local old_total_diff=0
-  local new_total_diff=0
-  top10_msg="| CID  | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
-  top10_msg+=" \n| ---- | --------- | -------- |"
-  while read -r line && [ ${count} -lt ${max_line} ]; do
-
-      # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
-      if [[ "${line}" == *"cid"* ]]; then
-          continue
-      fi
-
-      read -a numbers <<< "${line}"
-
-      # 分数分别保存
-      real_score_map[${numbers[0]}]=${numbers[3]}
-      old_score_map[${numbers[0]}]=${numbers[6]}
-      new_score_map[${numbers[0]}]=${numbers[7]}
-
-      # 拼接Top10详情的飞书消息
-      top10_msg="${top10_msg} \n| ${numbers[0]} | ${numbers[6]} | ${numbers[7]} | "
-
-      # 计算top10相对误差绝对值的均值
-      old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l )
-      new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l )
-
-      old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
-      new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
-
-      count=$((${count} + 1))
-
-  done < "${predict_analyse_file_path}"
-
-  old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
-
-  new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
-
-  echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
-  echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
-  echo "新老模型分数对比: "
-  for cid in "${!new_score_map[@]}"; do
-    echo "\t CID: $cid, 老模型分数: ${old_score_map[$cid]}, 新模型分数: ${new_score_map[$cid]}"
-  done
-}
-
-
-calc_model_predict
+local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
+echo "${python_return_code}"

+ 0 - 2
ad/25_xgb_make_data_origin_bucket.sh

@@ -12,8 +12,6 @@ source ${sh_path}/00_common.sh
 
 source /root/anaconda3/bin/activate py37
 
-
-
 make_origin_data() {
   
   local step_start_time=$(date +%s)

+ 21 - 9
ad/model_predict_analyse.py

@@ -8,7 +8,8 @@ from hdfs import InsecureClient
 
 client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
 
-SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration_file"
+SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
+PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
 
 
 def read_predict_from_local_txt(txt_file) -> list:
@@ -81,10 +82,7 @@ def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, st
         segment_score_avg=('score', 'mean'),
     ).reset_index()
     group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
-    group_df['segment_diff_rate_origin'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
-
-    # 使用滑动窗口计算当前值以及上下两行的平均值,作为新的diff_rate
-    group_df['segment_diff_rate'] = group_df['segment_diff_rate_origin'].rolling(window=5, center=True, min_periods=1).mean()
+    group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
 
     # 完整的分段文件保存
     csv_data = group_df.to_csv(sep="\t", index=False)
@@ -100,7 +98,7 @@ def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, st
     return merged_df, filtered_df
 
 
-def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
+def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
     """
     读取评估结果,并进行校准
     """
@@ -127,12 +125,26 @@ def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame,
     ).reset_index()
     grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
 
-    return grouped_df, segment_df
+    return df, grouped_df, segment_df
+
+
+def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
+    """
+    本地保存一份评估结果, 计算AUC使用
+    """
+    d = {"old": old_df, "new": new_df}
+    for key in d:
+        df = d[key][['label', "score"]]
+        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.csv", index=False, header=False)
+        df = d[key][['label', "score_2"]]
+        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.csv", index=False, header=False)
 
 
 def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
-    old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
-    new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
+    old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
+    new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
+
+    predict_local_save_for_auc(old_df, new_df)
 
     # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
     new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)

+ 24 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_34_statistics_20241111.scala

@@ -0,0 +1,24 @@
+package src.main.scala.com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.spark.examples.myUtils.ParamUtils
+import org.apache.spark.sql.SparkSession
+
+/**
+ * 附件生产
+ * <br >
+ * 1. 按CID维度汇总,曝光总量,转化量等信息
+ */
+object makedata_ad_34_statistic_20241111 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate();
+    val sc =  spark.sparkContext
+    val loader = getClass.getClassLoader
+
+    val param = ParamUtils.parseArgs(args)
+
+  }
+}