Explorar o código

feat:添加测试脚本

zhaohaipeng hai 5 meses
pai
achega
ce944c5996

+ 248 - 0
ad/02_ad_model_update_test.sh

@@ -0,0 +1,248 @@
+#!/bin/sh
+set -x
+
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+sh_path=$(cd $(dirname $0); pwd)
+source ${sh_path}/00_common.sh
+
+source /root/anaconda3/bin/activate py37
+
+
+# 全局常量
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
+BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
+TABLE=alg_recsys_ad_sample_all
+# 特征文件名
+feature_file=20240703_ad_feature_name.txt
+# 模型本地临时保存路径
+model_local_home=/root/zhaohp/XGB/
+
+# 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_PATH=/dw/recommend/model/35_ad_model
+# 预测结果保存路径,测试时修改为其他路径,避免影响线上
+PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data
+# 模型OSS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
+# 线上模型名,测试时修改为其他模型名,避免影响线上
+model_name=model_xgb_351_1000_v2
+# 线上校准文件名
+OSS_CALIBRATION_FILE_NAME=model_xgb_351_1000_v2_calibration
+
+
+# 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
+model_path_file=${model_local_home}/online_model_path.txt
+# 获取当前是星期几,1表示星期一
+current_day_of_week="$(date +"%u")"
+
+# 任务开始时间
+start_time=$(date +%s)
+# 前一天
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+# 线上模型在HDFS中的路径
+online_model_path=`cat ${model_path_file}`
+# 训练用的数据路径
+train_data_path=""
+# 评估用的数据路径
+predict_date_path=""
+#评估结果保存路径
+new_model_predict_result_path=""
+# 模型保存路径
+model_save_path=""
+# 评测结果保存路径,后续需要根据此文件评估是否要更新模型
+predict_analyse_file_path=""
+# 校准文件保存路径
+calibration_file_path=""
+
+# 保存模型评估的分析结果
+old_incr_rate_avg=0
+new_incr_rate_avg=0
+
+top10_msg=""
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+    local msg=$4
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$(($step_end_time - $step_start_time))
+
+    if [ $status -ne 0 ]; then
+        echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
+        local elapsed=$(($step_end_time - $start_time))
+        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" --top10 "${top10_msg}"
+        exit 1
+    else
+        echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
+    fi
+}
+
+send_success_upload_msg(){ 
+  # 发送更新成功通知
+  local msg=" 广告模型文件更新完成"
+  msg+="\n\t - 老模型Top10差异平均值: ${old_incr_rate_avg}"
+  msg+="\n\t - 新模型Top10差异平均值: ${new_incr_rate_avg}"
+  msg+="\n\t - 模型在HDFS中的路径: ${model_save_path}"
+  msg+="\n\t - 模型上传OSS中的路径: ${MODEL_OSS_PATH}/${model_name}.tar.gz"
+
+  local step_end_time=$(date +%s)
+  local elapsed=$(($step_end_time - $start_time))
+
+  /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+}
+
+init() {
+  
+  declare -a date_keys=()
+  local count=1
+  local current_data="$(date -d '2 days ago' +%Y%m%d)"
+  # 循环获取前 n 天的非节日日期
+  while [[ $count -lt 7 ]]; do
+    date_key=$(date -d "$current_data" +%Y%m%d)
+    # 判断是否是节日,并拼接训练数据路径
+    if [ $(is_not_holidays $date_key) -eq 1 ]; then
+
+      # 将 date_key 放入数组
+      date_keys+=("$date_key")
+
+      if [[ -z ${train_data_path} ]]; then
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
+      else
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
+      fi 
+      count=$((count + 1))
+    else
+      echo "日期: ${date_key}是节日,跳过"
+    fi
+    current_data=$(date -d "$current_data -1 day" +%Y%m%d)
+  done
+
+  last_index=$((${#date_keys[@]} - 1))
+  train_first_day=${date_keys[$last_index]}
+  train_last_day=${date_keys[0]}
+
+  model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
+  predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
+  new_model_predict_result_path=/dw/recommend/model/34_ad_predict_data/20241104_351_1000_1028_1102
+  online_model_predict_result_path=/dw/recommend/model/34_ad_predict_data/20241104_351_1000_1028_1102
+  predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt
+  calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt
+
+  echo "init param train_data_path: ${train_data_path}"
+  echo "init param predict_date_path: ${predict_date_path}"
+  echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
+  echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
+  echo "init param model_save_path: ${model_save_path}"
+  echo "init param online_model_path: ${online_model_path}"
+  echo "init param feature_file: ${feature_file}"
+  echo "init param model_name: ${model_name}"
+  echo "init param model_local_home: ${model_local_home}"
+  echo "init param model_oss_path: ${MODEL_OSS_PATH}"
+  echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
+  echo "init param calibration_file_path: ${calibration_file_path}"
+  echo "init param current_day_of_week: ${current_day_of_week}"
+
+  echo "当前Python环境安装的Python版本: $(python --version)"
+  echo "当前Python环境安装的三方包: $(python -m pip list)"
+}
+
+
+calc_model_predict() {
+  local count=0
+  local max_line=10
+  local old_total_diff=0
+  local new_total_diff=0
+
+  local declare -A real_score_map
+  local declare -A old_score_map
+  local declare -A new_score_map
+  top10_msg="| CID  | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
+  top10_msg+=" \n| ---- | --------- | -------- |"
+  while read -r line && [ ${count} -lt ${max_line} ]; do
+
+      # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
+      if [[ "${line}" == *"cid"* ]]; then
+          continue
+      fi
+
+      read -a numbers <<< "${line}"
+
+      # 分数分别保存
+      real_score_map[${numbers[0]}]=${numbers[3]}
+      old_score_map[${numbers[0]}]=${numbers[6]}
+      new_score_map[${numbers[0]}]=${numbers[7]}
+
+      # 拼接Top10详情的飞书消息
+      top10_msg="${top10_msg} \n| ${numbers[0]} | ${numbers[6]} | ${numbers[7]} | "
+      old_abs_score=$( echo "if(${numbers[6]} < 0) -${numbers[6]} else ${numbers[6]}" | bc -l)
+      new_abs_score=$( echo "if(${numbers[7]} < 0) -${numbers[7]} else ${numbers[7]}" | bc -l)
+
+      old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
+      new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
+
+      count=$((${count} + 1))
+
+  done < "${predict_analyse_file_path}"
+
+  local return_code=$?
+  check_run_status $return_code $step_start_time "计算Top10差异" "计算Top10差异异常"
+
+  old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
+  check_run_status $? $step_start_time "计算老模型Top10差异" "计算老模型Top10差异异常"
+
+
+  new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
+  check_run_status $? $step_start_time "计算新模型Top10差异" "计算新模型Top10差异异常"
+
+  echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
+  echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
+  echo "新老模型分数对比: "
+  for cid in "${!new_score_map[@]}"; do
+    echo "\t CID: $cid, 老模型分数: ${old_score_map[$cid]}, 新模型分数: ${new_score_map[$cid]}"
+  done
+}
+
+model_predict() {
+
+  local return_code=$?
+  check_run_status $return_code $step_start_time "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
+
+  # 结果分析
+  local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
+  check_run_status $python_return_code $step_start_time "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
+
+  calc_model_predict
+
+  if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then 
+    echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    check_run_status 1 $step_start_time "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    exit 1
+  fi 
+
+
+  # 对比两个模型的差异
+  score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
+  if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then 
+    echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
+    check_run_status 1 $step_start_time "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
+    exit 1
+  fi 
+
+}
+
+# 主方法
+main() {
+  init
+
+  model_predict
+
+}
+
+
+main

+ 0 - 377
ad/02_ad_model_update_twice_daily.sh

@@ -1,377 +0,0 @@
-#!/bin/sh
-set -x
-
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-source /root/anaconda3/bin/activate py37
-
-# 全局常量
-originDataSavePath=/dw/recommend/model/31_ad_sample_data_v4_auto
-bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v4_auto
-model_name=model_bkb8_v4
-LAST_MODEL_HOME=/root/zhaohp/model_online
-
-MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model
-
-PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-FM_HOME=/root/sunmingze/alphaFM
-
-today="$(date +%Y%m%d)"
-today_early_1="$(date -d '1 days ago' +%Y%m%d)"
-
-start_time=$(date +%s)
-elapsed=0
-LOG_PREFIX=广告模型自动更新任务
-
-# 训练和预测数据分区
-train_begin_str=''
-train_end_str=''
-predict_begin_str=''
-predict_end_str=''
-
-# HDFS保存数据的目录
-trainBucketFeaturePath=${bucketFeatureSavePathHome}
-predictBucketFeaturePath=${bucketFeatureSavePathHome}
-
-local_model_file_path=${MODEL_HOME}/${model_name}.txt
-local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt
-
-max_hour=21
-max_minute=20
-
-# 全局初始化
-global_init() {
-    # 获取当前小时,确定需要使用的数据分区范围
-    local current_hour="$(date +%H)"
-    if [ $current_hour -le 06 ]; then
-        train_begin_str=${today_early_1}08
-        train_end_str=${today_early_1}21
-        predict_begin_str=${today_early_1}22
-        predict_end_str=${today_early_1}23
-
-        trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train
-        predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict
-
-        local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt
-        local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt
-        max_hour=08
-    elif [ $current_hour -ge 16 ]; then 
-        train_begin_str=${today_early_1}22
-        train_end_str=${today}13
-        predict_begin_str=${today}14
-        predict_end_str=${today}15
-
-        trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/train
-        predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/predict
-
-        local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt
-        local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt
-        max_hour=21
-
-    else
-        echo "当前时间段异常: 退出任务"
-        exit 1
-    fi
-
-    # 删除HDFS目录,保证本次任务运行时目录干净
-    $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath}
-    $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath}
-
-    echo "全局变量初始化化: "
-    echo "  train_begin_str=${train_begin_str}"
-    echo "  train_end_str=${train_end_str}"
-    echo "  predict_begin_str=${predict_begin_str}"
-    echo "  predict_end_str=${predict_end_str}"
-    echo "  originDataSavePath=${originDataSavePath}"
-    echo "  trainBucketFeaturePath=${trainBucketFeaturePath}"
-    echo "  predictBucketFeaturePath=${predictBucketFeaturePath}"
-    echo "  local_model_file_path=${local_model_file_path}"
-    echo "  local_change_model_file_path=${local_change_model_file_path}"
-    echo "  max_hour=${max_hour}"
-
-}
-
-# 校验命令的退出码
-check_run_status() {
-    local status=$1
-    local step_start_time=$2
-    local step_name=$3
-
-    local step_end_time=$(date +%s)
-    local step_elapsed=$(($step_end_time - $step_start_time))
-
-    if [ $status -ne 0 ]; then
-        echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
-        local elapsed=$(($step_end_time - $start_time))
-        # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
-        exit 1
-    else
-        echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
-    fi
-}
-
-# 校验大数据任务是否执行完成
-check_ad_hive() {
-    local step_start_time=$(date +%s)
-    while true; do
-        local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:8:10})
-
-        local step_end_time=$(date +%s)
-        local elapsed=$(($step_end_time - $step_start_time))
-        if [ "$python_return_code" -eq 0 ]; then
-            break
-        fi
-        echo "Python程序返回非0值,等待五分钟后再次调用。"
-        sleep 300
-        local current_hour=$(date +%H)
-        local current_minute=$(date +%M)
-        if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
-            local msg="大数据数据生产校验失败, 分区: ${today}10"
-            echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
-            # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
-            exit 1
-        fi
-    done
-    echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
-
-}
-
-# 原始特征生产
-make_origin_data() {
-    local step_start_time=$(date +%s)
-    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
-    --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-    tablePart:64 repartition:16 \
-    beginStr:${train_begin_str} endStr:${predict_end_str} \
-    savePath:${originDataSavePath} \
-    table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
-    idDefaultValue:0.01
-
-    local return_code=$?
-    check_run_status $return_code $step_start_time "Spark原始样本生产任务"
-
-}
-
-# 训练用数据分桶
-make_train_bucket_feature() {
-    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
-    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-    beginStr:${train_begin_str:0:8} endStr:${train_end_str:0:8} repartition:100 \
-    filterNames:adid_,targeting_conversion_ \
-    readPath:${originDataSavePath} \
-    savePath:${trainBucketFeaturePath}
-}
-
-# 预测用数据分桶
-make_predict_bucket_feature() {
-    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
-    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-    beginStr:${predict_begin_str:0:8} endStr:${predict_end_str:0:8} repartition:100 \
-    filterNames:adid_,targeting_conversion_ \
-    readPath:${originDataSavePath} \
-    savePath:${predictBucketFeaturePath}
-}
-
-
-# 特征分桶,训练用的数据和预测用的数据分不同的目录
-make_bucket_feature() {
-    local step_start_time=$(date +%s)
-    
-    # 训练用的数据
-    make_train_bucket_feature &
-    train_bucket_pid=$!
-
-    wait $train_bucket_pid
-
-    local train_return_code=$?
-    check_run_status $train_return_code $step_start_time "Spark特征分桶任务: 训练数据分桶"
-
-    
-    # 预测用的数据
-    make_predict_bucket_feature &
-    predict_bucket_pid=$!
-
-    wait $predict_bucket_pid
-
-    local predict_return_code=$?
-    check_run_status $predict_return_code $step_start_time "Spark特征分桶任务: 预测数据分桶"
-}
-
-# 模型训练
-model_train() {
-    local step_start_time=$(date +%s)
-    $HADOOP fs -text ${trainBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_train -m ${local_model_file_path} -dim 1,1,8  -im ${LAST_MODEL_HOME}/model_online.txt -core 8
-
-    local return_code=$?
-    check_run_status $return_code $step_start_time "模型训练"
-}
-
-
-# 计算线上模型的AUC
-calc_online_model_auc() {
-    $HADOOP fs -text ${predictBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt
-    online_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt | /root/sunmingze/AUC/AUC`
-}
-
-
-# 计算新模型AUC
-calc_new_model_auc() {
-    $HADOOP fs -text ${predictBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_predict -m ${local_model_file_path} -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt
-    new_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt | /root/sunmingze/AUC/AUC`
-}
-
-# AUC对比
-auc_compare() {
-    local step5_start_time=$(date +%s)
-
-    # 5.1 计算线上模型的AUC
-    local step_start_time=$(date +%s)
-
-    calc_online_model_auc &
-    local calc_online_model_auc_pid=$!
-
-    wait $calc_online_model_auc_pid
-    local return_code=$?
-    check_run_status $return_code $step_start_time "线上模型AUC计算"
-
-    # 5.2 计算新模型的AUC
-    step_start_time=$(date +%s)
-
-    calc_new_model_auc &
-    local calc_new_model_auc_pid=$!
-
-    wait $calc_new_model_auc_pid
-
-    local new_return_code=$?
-    check_run_status $new_return_code $step_start_time "新模型的AUC计算"
-
-
-
-    echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}"
-
-    # 5.3 计算新模型与线上模型的AUC差值的绝对值
-    auc_diff=$(echo "$online_auc - $new_auc" | bc -l)
-    local auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l)
-
-    local step_end_time=$(date +%s)
-    local step5_elapsed=$(($step_end_time - $step5_start_time))
-    # 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型
-    if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then
-        local msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}"
-        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
-
-    elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then
-        local msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
-        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
-
-    else
-        local msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff"
-        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
-        local elapsed=$(($step_end_time - $start_time))
-        # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
-        exit 1
-    fi
-}
-
-# 模型格式转换
-model_to_online_format() {
-    local step_start_time=$(date +%s)
-    cat ${local_model_file_path} |
-    awk -F " " '{
-        if (NR == 1) {
-            print $1"\t"$2
-        } else {
-            split($0, fields, " ");
-            OFS="\t";
-            line=""
-            for (i = 1; i <= 10 && i <= length(fields); i++) {
-                line = (line ? line "\t" : "") fields[i];
-            }
-            print line
-        }
-    }' > ${local_change_model_file_path}
-
-    local return_code=$?
-    check_run_status $return_code $step_start_time "模型格式转换"
-
-}
-
-# 模型文件上传OSS
-model_upload_oss() {
-    local step_start_time=$(date +%s)
-    local online_model_path=${OSS_PATH}/${model_name}.txt
-    $HADOOP fs -test -e ${online_model_path}
-    if [ $? -eq 0 ]; then
-        echo "删除已存在的OSS模型文件"
-        $HADOOP fs -rm -r -skipTrash ${online_model_path}
-    fi
-    $HADOOP fs -put ${local_change_model_file_path} ${online_model_path}
-    
-    local return_code=$?
-    check_run_status $return_code $step_start_time "模型文件上传OSS"
-    
-}
-
-# 模型文件本地备份
-model_local_back() {
-    local step_start_time=$(date +%s)
-    # 将之前的线上模型进行备份,表示从上一个备份时间到当前时间内,使用的线上模型都是此文件
-    # 假设当前是07-11,上一次备份时间为07-07。备份之后表示从07-07下午至07-11上午线上使用的模型文件都是model_online_20240711.txt
-    file_suffix=$(date "+%Y%m%d%H")
-    cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_${file_suffix}.txt
-    cp -f ${local_model_file_path} ${LAST_MODEL_HOME}/model_online.txt
-
-    local return_code=$?
-    check_run_status $return_code $step_start_time "模型备份"
-}
-
-# 任务完成通知
-success_inform() {
-    local step_end_time=$(date +%s)
-    local msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc  \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path"
-    echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed"
-    local elapsed=$(($step_end_time - $start_time))
-    # /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed"
-}
-
-main() {
-
-    global_init
-
-    check_ad_hive
-
-    make_origin_data
-
-    make_bucket_feature
-
-    # model_train
-
-    # auc_compare
-
-    # model_to_online_format
-
-    # model_upload_oss
-
-    # model_local_back
-
-    # success_inform
-}
-
-
-main
-
-
-
-# nohup ./ad/02_ad_model_update_twice_daily.sh > logs/02_twice_daily.log 2>&1 &

+ 2 - 2
ad/ad_monitor_util.py

@@ -38,8 +38,8 @@ def send_card_msg_to_feishu(webhook, card_json):
         "card": card_json
     }
     print(f"推送飞书消息内容: {json.dumps(payload_message)}")
-    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
-    print(response.text)
+    # response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    # print(response.text)
 
 
 def timestamp_format(timestamp: str) -> str:

+ 137 - 42
ad/model_predict_analyse.py

@@ -1,21 +1,43 @@
 import argparse
 import gzip
-import sys
+import os.path
+from collections import OrderedDict
 
 import pandas as pd
+import numpy as np
 from hdfs import InsecureClient
 
 client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
 
+SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration"
 
-def read_predict(hdfs_path: str) -> list:
+def read_predict_from_local_txt(txt_file) -> list:
+    result = []
+    with open(txt_file, "r") as f:
+        for line in f.readlines():
+            sp = line.replace("\n", "").split("\t")
+            if len(sp) == 4:
+                label = int(sp[0])
+                cid = sp[3].split("_")[0]
+                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+                result.append({
+                    "label": label,
+                    "cid": cid,
+                    "score": score
+                })
+    return result
+
+
+def read_predict_from_hdfs(hdfs_path: str) -> list:
+    if not hdfs_path.endswith("/"):
+        hdfs_path += "/"
     result = []
     for file in client.list(hdfs_path):
         with client.read(hdfs_path + file) as reader:
             with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
                 for line in gz_file.read().decode("utf-8").split("\n"):
                     split = line.split("\t")
-                    if len(split) != 4:
+                    if len(split) == 4:
                         continue
                     cid = split[3].split("_")[0]
                     label = int(split[0])
@@ -30,63 +52,136 @@ def read_predict(hdfs_path: str) -> list:
     return result
 
 
-def _main(model1_predict_path: str, model2_predict_path: str, file: str):
-    if not model1_predict_path.endswith("/"):
-        model1_predict_path += "/"
+def _segment_v1(scores, step):
+    bins = []
+    for i in range(0, len(scores), int((len(scores) / step))):
+        if i == 0:
+            bins.append(0)
+        else:
+            bins.append(scores[i])
+    bins.append(1)
+    return list(OrderedDict.fromkeys(bins))
 
-    if not model2_predict_path.endswith("/"):
-        model2_predict_path += "/"
 
-    # # 设置 pandas 显示选项
-    # pd.set_option('display.max_rows', None)  # 显示所有行
-    # pd.set_option('display.max_columns', None)  # 显示所有列
+def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
+    sored_df = df.sort_values(by=['score'])
+    # 评估分数分段
+    scores = sored_df['score'].values
 
-    model1_result = read_predict(model1_predict_path)
-    model2_result = read_predict(model2_predict_path)
+    bins = _segment_v1(scores, step)
 
-    m1 = pd.DataFrame(model1_result)
-    g1 = m1.groupby("cid").agg(
+    # 等分分桶
+    # split_indices = np.array_split(np.arange(len(scores)), step)
+    # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
+
+    sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
+
+    # 计算分段内分数的差异
+    group_df = sored_df.groupby("score_segment", observed=True).agg(
+        segment_label_sum=('label', 'sum'),
+        segment_label_cnt=('label', 'count'),
+        segment_score_avg=('score', 'mean'),
+        p_cpm_avg=('p_cpm', 'mean'),
+        t_cpm_avg=('t_cpm', 'mean'),
+    ).reset_index()
+    group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
+    group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
+
+    # 完整的分段文件保存
+    group_df.to_csv(segment_file_path, sep="\t", index=False)
+
+    filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
+    filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
+    # 每条曝光数据添加对应分数的diff
+    merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
+
+    merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
+    return merged_df, filtered_df
+
+
+def read_and_calibration_predict(predict_path: str, is_hdfs=True, step=100) -> [pd.DataFrame, pd.DataFrame]:
+    """
+    读取评估结果,并进行校准
+    """
+    if is_hdfs:
+        # 文件路径处理
+        predicts = read_predict_from_hdfs(predict_path)
+    else:
+        predicts = read_predict_from_local_txt(predict_path)
+    df = pd.DataFrame(predicts)
+
+    # 模型分分段计算与真实ctcvr的dff_rate
+    predict_basename = os.path.basename(predict_path)
+    if predict_basename.endswith("/"):
+        predict_basename = predict_basename[:-1]
+    df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}", step=100)
+
+    # 生成校准后的分数
+    df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
+
+    # 按CID统计真实ctcvr和校准前后的平均模型分
+    grouped_df = df.groupby("cid").agg(
         view=('cid', 'size'),
         conv=('label', 'sum'),
-        old_score_avg=('score', lambda x: round(x.mean(), 6))
+        score_avg=('score', lambda x: round(x.mean(), 6)),
+        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
     ).reset_index()
+    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
 
-    g1['true'] = g1['conv'] / g1['view']
+    return grouped_df, segment_df
 
-    m2 = pd.DataFrame(model2_result)
-    g2 = m2.groupby("cid").agg(
-        new_score_avg=('score', lambda x: round(x.mean(), 6))
-    )
 
-    merged = pd.merge(g1, g2, on='cid', how='left')
-    merged.fillna(0, inplace=True)
+def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
+    old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
+    new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
 
-    merged["abs((new-true)/true)"] = abs(
-        (merged['new_score_avg'] - merged['true']) / merged['true']
-    ).mask(merged['true'] == 0, 0)
+    # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
+    new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
 
-    merged["abs((old-true)/true)"] = abs(
-        (merged['old_score_avg'] - merged['true']) / merged['true']
-    ).mask(merged['true'] == 0, 0)
+    # 字段重命名,和列过滤
+    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
 
-    merged = merged[['cid', 'view', "conv", "true", "old_score_avg", "new_score_avg",
-                     "abs((old-true)/true)", "abs((new-true)/true)"]]
-    merged = merged.sort_values(by=['view'], ascending=False)
+    merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
+
+    # 计算与真实ctcvr的差异值
+    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
 
-    with open(file, "w") as writer:
-        writer.write(merged.to_string(index=False))
+    # 计算校准后的模型分与ctcvr的差异值
+    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+
+    # 按照曝光排序,写入本地文件
+    merged = merged.sort_values(by=['view'], ascending=False)
+    merged = merged[[
+        'cid', 'view', "conv", "true_ctcvr", 'true_cpm',
+        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm",
+        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm",
+    ]]
+
+    # 根据文件名保存不同的格式
+    if analyse_file.endswith(".csv"):
+        merged.to_csv(analyse_file, index=False)
+    else:
+        with open(analyse_file, "w") as writer:
+            writer.write(merged.to_string(index=False))
     print("0")
 
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(description="model_predict_analyse.py")
-    parser.add_argument("-p", "--predict_path_list", nargs='*',
-                        help="模型评估结果保存路径,第一个为老模型评估结果,第二个为新模型评估结果")
-    parser.add_argument("-f", "--file", help="最后计算结果的保存路径")
+    parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
+    parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
+    parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
+    parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
     args = parser.parse_args()
 
-    predict_path_list = args.predict_path_list
-    # 判断参数是否正常
-    if len(predict_path_list) != 2:
-        sys.exit(1)
-    _main(predict_path_list[0], predict_path_list[1], args.file)
+    _main(
+        old_predict_path=args.old_predict_path,
+        new_predict_path=args.new_predict_path,
+        calibration_file=args.calibration_file,
+        analyse_file=args.analyse_file
+    )

+ 0 - 3
recommend/22_supplementary_data_new_table.sh

@@ -68,6 +68,3 @@ else
    echo "spark特征分桶任务执行成功"
 fi
 
-
-# 定时任务配置
-# 0 11 * * * cd /root/zhaohp/recommend-emr-dataprocess && /bin/sh ./recommend/data_new_table.sh > logs/recommend/data_new_table/$(date +\%Y\%m\%d\%H\%M).log 2>&1