36 次代碼提交 3c0c35fc04 ... bafb1ccc45

作者 SHA1 備註 提交日期
  zhangbo bafb1ccc45 新特征 2 月之前
  zhangbo 5916fde411 Merge branch 'master' into feature/zhangbo_makedata_v2 2 月之前
  fengzhoutian 5e265f6c2a Merge branch 'feature/20250104-zt-update' of algorithm/recommend-emr-dataprocess into master 3 月之前
  StrayWarrior 034977485b Update holidays.txt 3 月之前
  zhaohaipeng b010ea16ff Merge branch 'feature_zhaohaipeng' of algorithm/recommend-emr-dataprocess into master 3 月之前
  zhaohaipeng 652e2837b1 Merge branch 'feature/jch_makedata' of algorithm/recommend-emr-dataprocess into master 4 月之前
  zhaohaipeng 746995d26a feat:修改训练资源 4 月之前
  zhaohaipeng cae9baf7fb feat:修改训练资源 4 月之前
  zhaohaipeng b3b2e2c9ed feat:修改节日文本文件 4 月之前
  jch a354f6e8f6 获取异常 4 月之前
  zhaohaipeng 167c284637 feat:修改模型更新测试脚本 4 月之前
  zhaohaipeng 01ee5f6e97 feat:修改打印特征模型验证scala任务 4 月之前
  zhaohaipeng c63f0b5a99 feat:修改打印特征模型验证scala任务 4 月之前
  zhaohaipeng 2b6aa7be2d feat:修改打印特征模型验证scala任务 4 月之前
  jch 3cf849c596 rov&nor feature 4 月之前
  zhaohaipeng f77dfa9e67 feat:模型暂停更新 4 月之前
  jch 3942f7ceb7 rov和nor样本特征选择 4 月之前
  zhaohaipeng 8385d454e7 feat:添加获取yarn任务列表脚本 4 月之前
  zhaohaipeng 6048a9dc1d feat:添加获取yarn任务列表脚本 4 月之前
  zhaohaipeng 14515cc434 feat:添加获取yarn任务列表脚本 4 月之前
  zhaohaipeng 3c36014ece feat:添加获取yarn任务列表脚本 4 月之前
  zhaohaipeng c2fbafb942 feat:添加获取yarn任务列表脚本 4 月之前
  zhaohaipeng c35defb1f0 feat:修改XGB训练脚本,不更新 4 月之前
  zhaohaipeng 17ee55a1db feat:添加XGB训练脚本,不上传 4 月之前
  zhaohaipeng 7a20db39a6 feat:模型分校准实验 4 月之前
  jch 40aa0585c8 rov和nor样本 4 月之前
  zhaohaipeng a82f3f5b5b feat:修改模型评估结果分析脚本 4 月之前
  zhaohaipeng 0f43f406e7 feat:修改模型评估结果分析脚本 4 月之前
  zhaohaipeng 43dd7d585f feat:修改模型评估结果分析脚本 4 月之前
  zhaohaipeng 07a42be276 feat:修改模型评估结果分析脚本 4 月之前
  zhaohaipeng 76521e6ef0 Merge branch 'master' into feature_zhaohaipeng 4 月之前
  zhaohaipeng b93ade39ae feat:添加问题数据日期 4 月之前
  zhaohaipeng 1d072d571b feat:添加XGB训练脚本,不推送 4 月之前
  zhaohaipeng ec736129b2 feat:修改无回传判断逻辑 4 月之前
  zhaohaipeng 6ceac0fc86 feat:修改无回传判断逻辑 4 月之前
  zhaohaipeng ef0a8cba2c feat:修改无回传判断逻辑 4 月之前

+ 4 - 2
ad/01_ad_model_update.sh

@@ -4,6 +4,8 @@ set -x
 export PATH=$SPARK_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
 export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+export PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache/
+export SEGMENT_BASE_PATH=/dw/recommend/model/36_model_attachment/score_calibration_file
 
 sh_path=$(cd $(dirname $0); pwd)
 source ${sh_path}/00_common.sh
@@ -218,7 +220,7 @@ xgb_train() {
 
   /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
   --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
-  --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 31 \
+  --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 35 \
   --conf spark.yarn.executor.memoryoverhead=2048 \
   --conf spark.shuffle.service.enabled=true \
   --conf spark.shuffle.service.port=7337 \
@@ -233,7 +235,7 @@ xgb_train() {
   testPath:${predict_date_path} \
   savePath:${new_model_predict_result_path} \
   modelPath:${model_save_path} \
-  eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
+  eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:34 repartition:20
 
   local return_code=$?
   check_run_status ${return_code} ${step_start_time} "XGB模型训练任务" "XGB模型训练失败"

+ 395 - 9
ad/02_ad_model_update_test.sh

@@ -4,18 +4,404 @@ set -x
 export PATH=$SPARK_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
 export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-export PREDICT_CACHE_PATH=/root/zhaohp/XGB/test/predict_cache/
-export SEGMENT_BASE_PATH=/root/zhaohp/XGB/test/predict_analyse_file/
-
+export PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache/
+export SEGMENT_BASE_PATH=/dw/recommend/model/36_model_attachment/score_calibration_file
 
 sh_path=$(cd $(dirname $0); pwd)
 source ${sh_path}/00_common.sh
 
-online_model_predict_result_path=/dw/recommend/model/34_ad_predict_data/20241110_351_1000_1031_1106
-new_model_predict_result_path=/dw/recommend/model/34_ad_predict_data/20241110_351_1000_1103_1109
-predict_analyse_file_path=/root/zhaohp/XGB/test/predict_analyse_file/20241110_351_1000_analyse.txt
-calibration_file_path=/root/zhaohp/XGB/test/model_xgb_351_1000_v2_calibration.txt
+source /root/anaconda3/bin/activate py37
+
+
+# 全局常量
+LOG_PREFIX=广告模型训练任务
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
+BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
+TABLE=alg_recsys_ad_sample_all
+# 特征文件名
+feature_file=20240703_ad_feature_name.txt
+# 模型本地临时保存路径
+model_local_home=/root/zhaohp/XGB/
+
+# 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_PATH=/dw/recommend/model/35_ad_model
+# 预测结果保存路径,测试时修改为其他路径,避免影响线上
+PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data
+# 模型OSS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
+# 线上模型名,测试时修改为其他模型名,避免影响线上
+model_name=model_xgb_351_1000_v2
+# 线上校准文件名
+OSS_CALIBRATION_FILE_NAME=model_xgb_351_1000_v2_calibration
+# 用于存放一些临时的文件
+PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache
+
+
+# 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
+model_path_file=${model_local_home}/online_model_path.txt
+# 获取当前是星期几,1表示星期一
+current_day_of_week="$(date +"%u")"
+
+# 任务开始时间
+start_time=$(date +%s)
+# 前一天
+today_early_1=20241218
+# 线上模型在HDFS中的路径
+online_model_path=`cat ${model_path_file}`
+# 训练用的数据路径
+train_data_path=""
+# 评估用的数据路径
+predict_date_path=""
+#评估结果保存路径
+new_model_predict_result_path=""
+# 模型保存路径
+model_save_path=""
+# 评测结果保存路径,后续需要根据此文件评估是否要更新模型
+predict_analyse_file_path=""
+# 校准文件保存路径
+calibration_file_path=""
+
+# 保存模型评估的分析结果
+old_incr_rate_avg=0
+new_incr_rate_avg=0
+# Top10的详情
+top10_msg=""
+# AUC值
+old_auc=0
+new_auc=0
+
+declare -A real_score_map
+declare -A old_score_map
+declare -A new_score_map
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+    local msg=$4
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$(($step_end_time - $step_start_time))
+
+    if [[ -n "${old_auc}" && "${old_auc}" != "0" ]]; then
+      msg+="\n\t - 老模型AUC: ${old_auc}"
+    fi
+    if [[ -n "${new_auc}" && "${new_auc}" != "0" ]]; then
+      msg+="\n\t - 新模型AUC: ${new_auc}"
+    fi
+
+
+    if [ ${status} -ne 0 ]; then
+        echo "${LOG_PREFIX} -- ${step_name}失败: 耗时 ${step_elapsed}"
+        local elapsed=$(($step_end_time - $start_time))
+        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+        exit 1
+    else
+        echo "${LOG_PREFIX} -- ${step_name}成功: 耗时 ${step_elapsed}"
+    fi
+}
+
+send_success_upload_msg(){ 
+  # 发送更新成功通知
+  local msg=" 广告模型文件更新完成"
+  msg+="\n\t - 老模型AUC: ${old_auc}"
+  msg+="\n\t - 新模型AUC: ${new_auc}"
+  msg+="\n\t - 老模型Top10差异平均值: ${old_incr_rate_avg}"
+  msg+="\n\t - 新模型Top10差异平均值: ${new_incr_rate_avg}"
+  msg+="\n\t - 模型在HDFS中的路径: ${model_save_path}"
+  msg+="\n\t - 模型上传OSS中的路径: ${MODEL_OSS_PATH}/${model_name}.tar.gz"
+
+  local step_end_time=$(date +%s)
+  local elapsed=$((${step_end_time} - ${start_time}))
+
+  /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+}
+
+init() {
+  
+  declare -a date_keys=()
+  local count=1
+  local current_data="$(date -d '1 days ago' +%Y%m%d)"
+  # 循环获取前 n 天的非节日日期
+  while [[ ${count} -le 7 ]]; do
+    date_key=$(date -d "${current_data}" +%Y%m%d)
+    # 判断是否是节日,并拼接训练数据路径
+    if [ $(is_not_holidays ${date_key}) -eq 1 ]; then
+
+      # 将 date_key 放入数组
+      date_keys+=("${date_key}")
+
+      if [[ -z ${train_data_path} ]]; then
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
+      else
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
+      fi 
+      count=$((count + 1))
+    else
+      echo "日期: ${date_key}是节日,跳过"
+    fi
+    current_data=$(date -d "${current_data} -1 day" +%Y%m%d)
+  done
+
+  last_index=$((${#date_keys[@]} - 1))
+  train_first_day=${date_keys[$last_index]}
+  train_last_day=${date_keys[0]}
+
+  model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
+  predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
+  new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4}
+  online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9}
+  predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt
+  calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt
+
+  echo "init param train_data_path: ${train_data_path}"
+  echo "init param predict_date_path: ${predict_date_path}"
+  echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
+  echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
+  echo "init param model_save_path: ${model_save_path}"
+  echo "init param online_model_path: ${online_model_path}"
+  echo "init param feature_file: ${feature_file}"
+  echo "init param model_name: ${model_name}"
+  echo "init param model_local_home: ${model_local_home}"
+  echo "init param model_oss_path: ${MODEL_OSS_PATH}"
+  echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
+  echo "init param calibration_file_path: ${calibration_file_path}"
+  echo "init param current_day_of_week: ${current_day_of_week}"
+
+  echo "当前Python环境安装的Python版本: $(python --version)"
+  echo "当前Python环境安装的三方包: $(python -m pip list)"
+}
+
+# 校验大数据任务是否执行完成
+check_ad_hive() {
+  local step_start_time=$(date +%s)
+  local max_hour=05
+  local max_minute=30
+  local elapsed=0
+  while true; do
+      local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
+
+      elapsed=$(($(date +%s) - ${step_start_time}))
+      if [ "${python_return_code}" -eq 0 ]; then
+          break
+      fi
+      echo "Python程序返回非0值,等待五分钟后再次调用。"
+      sleep 300
+      local current_hour=$(date +%H)
+      local current_minute=$(date +%M)
+      if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then
+          local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
+          echo -e "${LOG_PREFIX} -- 大数据数据生产校验 -- ${msg}: 耗时 ${elapsed}"
+          /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}"
+          exit 1
+      fi
+  done
+  echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 ${elapsed}"
+}
+
+origin_data() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_origin_data
+  )
+}
+
+bucket_feature() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_bucket_feature
+  )
+}
+
+xgb_train() {
+  local step_start_time=$(date +%s)
+
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
+  --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 31 \
+  --conf spark.yarn.executor.memoryoverhead=2048 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  trainPath:${train_data_path} \
+  testPath:${predict_date_path} \
+  savePath:${new_model_predict_result_path} \
+  modelPath:${model_save_path} \
+  eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "XGB模型训练任务" "XGB模型训练失败"
+}
+
+calc_model_predict() {
+  local count=0
+  local max_line=10
+  local old_total_diff=0
+  local new_total_diff=0
+  top10_msg="| CID  | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
+  top10_msg+=" \n| ---- | --------- | -------- |"
+  while read -r line && [ ${count} -lt ${max_line} ]; do
+
+      # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
+      if [[ "${line}" == *"cid"* ]]; then
+          continue
+      fi
+
+      read -a numbers <<< "${line}"
+
+      # 分数分别保存
+      real_score_map[${numbers[0]}]=${numbers[3]}
+      old_score_map[${numbers[0]}]=${numbers[6]}
+      new_score_map[${numbers[0]}]=${numbers[7]}
+
+      # 拼接Top10详情的飞书消息
+      top10_msg="${top10_msg} \n| ${numbers[0]} | ${numbers[6]} | ${numbers[7]} | "
+
+      # 计算top10相对误差绝对值的均值
+      old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l )
+      new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l )
+
+      old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
+      new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
+
+      count=$((${count} + 1))
+
+  done < "${predict_analyse_file_path}"
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "计算Top10差异" "计算Top10差异异常"
+
+  old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算老模型Top10差异" "计算老模型Top10差异异常"
+
+
+  new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算新模型Top10差异" "计算新模型Top10差异异常"
+
+  echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
+  echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
+  echo "新老模型分数对比: "
+  for cid in "${!new_score_map[@]}"; do
+    echo "\t CID: $cid, 老模型分数: ${old_score_map[$cid]}, 新模型分数: ${new_score_map[$cid]}"
+  done
+}
+
+calc_auc() {
+  old_auc=`cat ${PREDICT_CACHE_PATH}/old_1.txt | /root/sunmingze/AUC/AUC`
+  new_auc=`cat ${PREDICT_CACHE_PATH}/new_1.txt | /root/sunmingze/AUC/AUC`
+}
+
+model_predict() {
+
+  # 线上模型评估最新的数据
+  local step_start_time=$(date +%s)
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
+  --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 30 \
+  --conf spark.yarn.executor.memoryoverhead=1024 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  testPath:${predict_date_path} \
+  savePath:${online_model_predict_result_path} \
+  modelPath:${online_model_path}
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
+
+  # 结果分析
+  local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
+  check_run_status ${python_return_code} ${step_start_time} "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
+
+  calc_model_predict
+
+  calc_auc
+
+  if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then 
+    echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    # check_run_status 1 ${step_start_time} "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    # exit 1
+  fi 
+
+
+  # 对比两个模型的差异
+  score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
+  if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then 
+    echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
+    # check_run_status 1 ${step_start_time} "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
+    # exit 1
+  fi 
+
+}
+
+model_upload_oss() {
+  local step_start_time=$(date +%s)
+
+  (
+    cd ${model_local_home}
+
+    ${HADOOP} fs -get ${model_save_path} ${model_name}
+    if [ ! -d ${model_name} ]; then
+      echo "从HDFS下载模型失败"
+      check_run_status 1 ${step_start_time} "HDFS下载模型任务" "HDFS下载模型失败" 
+      exit 1 
+    fi
+
+    tar -czvf ${model_name}.tar.gz -C ${model_name} .
+
+    rm -rf ${model_name}.tar.gz.crc
+
+    # 从OSS中移除模型文件和校准文件
+    ${HADOOP} fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz ${MODEL_OSS_PATH}/${OSS_CALIBRATION_FILE_NAME}.txt
+    
+    # 将模型文件和校准文件推送到OSS上
+    ${HADOOP} fs -put ${model_name}.tar.gz ${OSS_CALIBRATION_FILE_NAME}.txt ${MODEL_OSS_PATH}
+    local return_code=$?
+    check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
+
+    echo ${model_save_path} > ${model_path_file}
+
+    # 
+    rm -f ./${model_name}.tar.gz
+    rm -rf ./${model_name}
+    rm -rf ${OSS_CALIBRATION_FILE_NAME}.txt
+  )
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
+
+  local step_end_time=$(date +%s)
+  local elapsed=$((${step_end_time} - ${start_time}))
+  echo -e "${LOG_PREFIX} -- 模型更新完成 -- 模型更新成功: 耗时 ${elapsed}"
+  
+  send_success_upload_msg
+}
+
+# 主方法
+main() {
+  init
+
+  xgb_train
+
+  model_predict
+
+  model_upload_oss
+
+}
 
 
-local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
-echo "${python_return_code}"
+main

+ 303 - 0
ad/03_xgb_train.sh

@@ -0,0 +1,303 @@
+#!/bin/sh
+set -x
+
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+export PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache/
+export SEGMENT_BASE_PATH=/dw/recommend/model/36_model_attachment/score_calibration_file
+
+sh_path=$(cd $(dirname $0); pwd)
+source ${sh_path}/00_common.sh
+
+source /root/anaconda3/bin/activate py37
+
+
+# 全局常量
+LOG_PREFIX=广告模型训练任务
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
+BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
+TABLE=alg_recsys_ad_sample_all
+# 特征文件名
+feature_file=20240703_ad_feature_name.txt
+# 模型本地临时保存路径
+model_local_home=/root/zhaohp/XGB/
+
+# 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_PATH=/dw/recommend/model/35_ad_model
+# 预测结果保存路径,测试时修改为其他路径,避免影响线上
+PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data
+# 模型OSS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
+# 线上模型名,测试时修改为其他模型名,避免影响线上
+model_name=model_xgb_351_1000_v2
+# 线上校准文件名
+OSS_CALIBRATION_FILE_NAME=model_xgb_351_1000_v2_calibration
+# 用于存放一些临时的文件
+PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache
+
+
+# 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
+model_path_file=${model_local_home}/online_model_path.txt
+# 获取当前是星期几,1表示星期一
+current_day_of_week="$(date +"%u")"
+
+# 任务开始时间
+start_time=$(date +%s)
+# 前一天
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+# 线上模型在HDFS中的路径
+online_model_path=`cat ${model_path_file}`
+# 训练用的数据路径
+train_data_path=""
+# 评估用的数据路径
+predict_date_path=""
+#评估结果保存路径
+new_model_predict_result_path=""
+# 模型保存路径
+model_save_path=""
+# 评测结果保存路径,后续需要根据此文件评估是否要更新模型
+predict_analyse_file_path=""
+# 校准文件保存路径
+calibration_file_path=""
+
+# 保存模型评估的分析结果
+old_incr_rate_avg=0
+new_incr_rate_avg=0
+# Top10的详情
+top10_msg=""
+# AUC值
+old_auc=0
+new_auc=0
+
+declare -A real_score_map
+declare -A old_score_map
+declare -A new_score_map
+
+init() {
+  
+  declare -a date_keys=()
+  local count=1
+  local current_data="$(date -d '2 days ago' +%Y%m%d)"
+  # 循环获取前 n 天的非节日日期
+  while [[ ${count} -le 7 ]]; do
+    date_key=$(date -d "${current_data}" +%Y%m%d)
+    # 判断是否是节日,并拼接训练数据路径
+    if [ $(is_not_holidays ${date_key}) -eq 1 ]; then
+
+      # 将 date_key 放入数组
+      date_keys+=("${date_key}")
+
+      if [[ -z ${train_data_path} ]]; then
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
+      else
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
+      fi 
+      count=$((count + 1))
+    else
+      echo "日期: ${date_key}是节日,跳过"
+    fi
+    current_data=$(date -d "${current_data} -1 day" +%Y%m%d)
+  done
+
+  last_index=$((${#date_keys[@]} - 1))
+  train_first_day=${date_keys[$last_index]}
+  train_last_day=${date_keys[0]}
+
+  model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
+  predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
+  new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4}
+  online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9}
+  predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt
+  calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt
+
+  echo "init param train_data_path: ${train_data_path}"
+  echo "init param predict_date_path: ${predict_date_path}"
+  echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
+  echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
+  echo "init param model_save_path: ${model_save_path}"
+  echo "init param online_model_path: ${online_model_path}"
+  echo "init param feature_file: ${feature_file}"
+  echo "init param model_name: ${model_name}"
+  echo "init param model_local_home: ${model_local_home}"
+  echo "init param model_oss_path: ${MODEL_OSS_PATH}"
+  echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
+  echo "init param calibration_file_path: ${calibration_file_path}"
+  echo "init param current_day_of_week: ${current_day_of_week}"
+
+  echo "当前Python环境安装的Python版本: $(python --version)"
+  echo "当前Python环境安装的三方包: $(python -m pip list)"
+}
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+    local msg=$4
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$(($step_end_time - $step_start_time))
+
+    if [[ -n "${old_auc}" && "${old_auc}" != "0" ]]; then
+      msg+="\n\t - 老模型AUC: ${old_auc}"
+    fi
+    if [[ -n "${new_auc}" && "${new_auc}" != "0" ]]; then
+      msg+="\n\t - 新模型AUC: ${new_auc}"
+    fi
+
+
+    if [ ${status} -ne 0 ]; then
+        local elapsed=$(($step_end_time - $start_time))
+        echo "${LOG_PREFIX} -- ${step_name}失败: 耗时 ${step_elapsed}, msg: ${msg}, 总耗时: ${elapsed}"
+        exit 1
+    else
+        echo "${LOG_PREFIX} -- ${step_name}成功: 耗时 ${step_elapsed}"
+    fi
+}
+
+xgb_train() {
+  local step_start_time=$(date +%s)
+
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
+  --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 31 \
+  --conf spark.yarn.executor.memoryoverhead=2048 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  trainPath:${train_data_path} \
+  testPath:${predict_date_path} \
+  savePath:${new_model_predict_result_path} \
+  modelPath:${model_save_path} \
+  eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "XGB模型训练任务" "XGB模型训练失败"
+}
+
+calc_model_predict() {
+  local count=0
+  local max_line=10
+  local old_total_diff=0
+  local new_total_diff=0
+  top10_msg="| CID  | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
+  top10_msg+=" \n| ---- | --------- | -------- |"
+  while read -r line && [ ${count} -lt ${max_line} ]; do
+
+      # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
+      if [[ "${line}" == *"cid"* ]]; then
+          continue
+      fi
+
+      read -a numbers <<< "${line}"
+
+      # 分数分别保存
+      real_score_map[${numbers[0]}]=${numbers[3]}
+      old_score_map[${numbers[0]}]=${numbers[6]}
+      new_score_map[${numbers[0]}]=${numbers[7]}
+
+      # 拼接Top10详情的飞书消息
+      top10_msg="${top10_msg} \n| ${numbers[0]} | ${numbers[6]} | ${numbers[7]} | "
+
+      # 计算top10相对误差绝对值的均值
+      old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l )
+      new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l )
+
+      old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
+      new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
+
+      count=$((${count} + 1))
+
+  done < "${predict_analyse_file_path}"
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "计算Top10差异" "计算Top10差异异常"
+
+  old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算老模型Top10差异" "计算老模型Top10差异异常"
+
+
+  new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算新模型Top10差异" "计算新模型Top10差异异常"
+
+  echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
+  echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
+  echo "新老模型分数对比: "
+  for cid in "${!new_score_map[@]}"; do
+    echo "\t CID: $cid, 老模型分数: ${old_score_map[$cid]}, 新模型分数: ${new_score_map[$cid]}"
+  done
+}
+
+calc_auc() {
+  old_auc=`cat ${PREDICT_CACHE_PATH}/old_1.txt | /root/sunmingze/AUC/AUC`
+  new_auc=`cat ${PREDICT_CACHE_PATH}/new_1.txt | /root/sunmingze/AUC/AUC`
+}
+
+model_predict() {
+
+  # 线上模型评估最新的数据
+  local step_start_time=$(date +%s)
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
+  --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 30 \
+  --conf spark.yarn.executor.memoryoverhead=1024 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  testPath:${predict_date_path} \
+  savePath:${online_model_predict_result_path} \
+  modelPath:${online_model_path}
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
+
+  # 结果分析
+  local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
+  check_run_status ${python_return_code} ${step_start_time} "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
+
+  calc_model_predict
+
+  calc_auc
+
+  if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then 
+    echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    check_run_status 1 ${step_start_time} "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    exit 1
+  fi 
+
+
+  # 对比两个模型的差异
+  score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
+  if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then 
+    echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
+    check_run_status 1 ${step_start_time} "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
+    exit 1
+  fi 
+
+}
+
+# 主方法
+main() {
+  init
+    
+  xgb_train
+
+  model_predict
+}
+
+
+main

+ 41 - 0
ad/20_yarn_app_list.sh

@@ -0,0 +1,41 @@
+#!/bin/sh
+
+# 定义合法的任务状态
+VALID_STATES="ALL,NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED"
+
+# 检查是否传入参数
+if [ $# -lt 1 ]; then
+    echo "Usage: $0 <appStates>"
+    echo "Example: $0 FINISHED"
+    echo "Valid states: $VALID_STATES"
+    exit 1
+fi
+
+# 获取任务状态作为参数
+APP_STATES=$1
+
+# 检查输入状态是否合法
+if ! echo "$VALID_STATES" | grep -qw "$APP_STATES"; then
+    echo "Error: Invalid appStates '$APP_STATES'."
+    echo "Valid states: $VALID_STATES"
+    exit 1
+fi
+
+# 获取指定状态的任务列表
+yarn app -list -appStates "$APP_STATES" 2>/dev/null | grep '61' | awk '{print $1}' | while read -r app_id; do
+    # 获取任务详情
+    details=$(yarn application -status "$app_id" 2>/dev/null | grep -E "Application-Id|Application-Name|Start-Time|Finish-Time")
+    
+    # 提取信息
+    app_id=$(echo "$details" | grep "Application-Id" | awk -F ": " '{print $2}')
+    app_name=$(echo "$details" | grep "Application-Name" | awk -F ": " '{print $2}')
+    start_time_ms=$(echo "$details" | grep "Start-Time" | awk -F ": " '{print $2}')
+    finish_time_ms=$(echo "$details" | grep "Finish-Time" | awk -F ": " '{print $2}')
+    
+    # 将毫秒级时间戳转换为秒级,并格式化为日期时间
+    start_time=$(date -d @$((start_time_ms / 1000)) +'%Y-%m-%d %H:%M:%S')
+    finish_time=$(date -d @$((finish_time_ms / 1000)) +'%Y-%m-%d %H:%M:%S')
+
+    # 显示任务信息
+    echo -e "Application-Id: $app_id\tApplication-Name: $app_name\tStart-Time: $start_time\tFinish-Time: $finish_time"
+done

+ 0 - 60
ad/22_ad_model_predict_auc.sh

@@ -1,60 +0,0 @@
-#!/bin/sh
-
-# 训练新模型,并使用后面的数据计算AUC,评估模型效果
-
-set -x
-
-begin_date=$1
-end_date=$2
-model_name=$3
-predict_dim=$4
-
-PROJECT_HOME=/root/zhaohp/20240723
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-HDFS_TRAIN_DATE_PATH=/dw/recommend/model/33_ad_train_data_v4_idn1
-MODEL_PATH=${PROJECT_HOME}/model
-PREDICT_PATH=${PROJECT_HOME}/predict
-
-FM_TRAIN=/root/sunmingze/alphaFM/bin/fm_train
-FM_PREDICT=/root/sunmingze/alphaFM/bin/fm_predict
-
-train_date=$begin_date
-
-# 计算模型的AUC,从训练日期的后一天到参数的end_date
-predict_auc() {
-    echo -e "\t==================== 开始预测 $train_date 模型 ===================="
-
-    predict_date=$(date -d "$train_date +1 day" +%Y%m%d)
-    predict_end_date=$(date -d "$end_date +1 day" +%Y%m%d)
-    while [ "$predict_date" != "$predict_end_date" ]; do
-
-        $HADOOP fs -text ${HDFS_TRAIN_DATE_PATH}/${predict_date}/* | ${FM_PREDICT} -m ${MODEL_PATH}/${model_name}_${train_date}.txt -dim ${predict_dim} -core 8 -out ${PREDICT_PATH}/${model_name}_${train_date}.txt
-        auc=`cat ${PREDICT_PATH}/${model_name}_${train_date}.txt | /root/sunmingze/AUC/AUC`
-
-        echo "模型训练日期: ${train_date}, 模型预测日期: ${predict_date}, AUC: ${auc}, 模型路径: ${MODEL_PATH}/${model_name}_${train_date}.txt"
-
-        predict_date=$(date -d "$predict_date +1 day" +%Y%m%d)
-
-    done
-
-    echo -e "\n\t==================== 预测 $train_date 模型结束 ===================="
-
-}
-main() {
-
-    # 增量训练模型
-    while [ "$train_date" != "$end_date" ]; do
-        echo "==================== 开始训练 $train_date 模型 ===================="
-
-        predict_auc
-
-        echo -e "==================== 训练 $train_date 模型结束 ==================== \n\n\n\n\n\n"
-        train_date=$(date -d "$train_date +1 day" +%Y%m%d)
-    done
-
-}
-
-main
-
-
-# nohup ./22_ad_model_predict_auc.sh 20240712 20240717 model_bkb8_v4_idn1 8  > logs/22_ad_model_predict_auc.log 2>&1 &

+ 0 - 29
ad/23_ad_model_batch_calc_cid_score_avg.sh

@@ -1,29 +0,0 @@
-#!/bin/sh
-
-# 计算模型对某天,某个CID的打分情况,输出平均值
-
-set -x
-
-cids=$1
-model=$2
-hdfs_path=$3
-bias=$4
-
-MODEL_PATH=/root/zhaohp/recommend-emr-dataprocess/model/ad
-PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict/ad
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-FM_HOME=/root/sunmingze/alphaFM
-
-# 将cids中的逗号分隔列表拆分为数组
-IFS=',' read -ra cid_array <<< "$cids"
-
-for cid in "${cid_array[@]}"; do
-    # 对每个CID执行打分计算并输出平均值
-    $HADOOP fs -text ${hdfs_path}/* | grep "cid_${cid}" | ${FM_HOME}/bin/fm_predict -m ${MODEL_PATH}/${model}.txt -dim ${bias} -core 8 -out ${PREDICT_PATH}/${model}_${cid}.txt
-
-    score_avg=`awk '{ sum += $2; count++ } END { if (count > 0) print sum / count }' ${PREDICT_PATH}/${model}_${cid}.txt`
-
-    echo -e "CID- ${cid} -平均分计算结果: ${score_avg} \n\t模型: ${MODEL_PATH}/${model} \n\tHDFS数据路径: ${hdfs_path} \n\t"
-done
-
-# nohup ./ad/23_ad_model_batch_calc_cid_score_avg.sh 3024,2966,2670,3163,3595,3594,3364,3365,3593,3363,3180,1910,2660,3478,3431,3772,3060,3178,3056,3771,3208,3041,2910,3690,1626,3318,3357,3628,3766,3770,3763,3769,3768,3541,3534,2806,3755,3760,3319,3758,3746,3759,3747,3754,3767,3745,3756,3437,3608,3527,3691,3197,3361,3362,3212,3344,3343,3346,3345,3612,3540,3526,3611,3761,3617,3762,3618,3616,3623,3765,3624,3764,3198,3542,3353,2374,3200 model_bkb8_v55_20240804 /dw/recommend/model/33_ad_train_data_v4/20240806 8 > logs/model_bkb8_v55_20240804_cid_06_12.log 2>&1 &

+ 14 - 0
ad/holidays.txt

@@ -9,9 +9,23 @@
 2024-11-12
 20241113
 2024-11-13
+20241126
+2024-11-26
+20241127
+2024-11-27
+20241128
+2024-11-28
+20241212
+2024-12-12
+冬至
+20241221
+2024-12-21
 圣诞节
 20241225
 2024-12-25
+毛泽东
+20241226
+2024-12-26
 元旦
 20250101
 2025-01-01

+ 122 - 128
ad/model_predict_analyse.py

@@ -1,131 +1,56 @@
 import argparse
 import gzip
 import os.path
-from collections import OrderedDict
 
 import pandas as pd
 from hdfs import InsecureClient
 
 client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
 
-SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
+SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_model_attachment/score_calibration_file")
 PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
 
 
-def read_predict_from_local_txt(txt_file) -> list:
-    result = []
-    with open(txt_file, "r") as f:
-        for line in f.readlines():
-            sp = line.replace("\n", "").split("\t")
-            if len(sp) == 4:
-                label = int(sp[0])
-                cid = sp[3].split("_")[0]
-                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
-                result.append({
-                    "label": label,
-                    "cid": cid,
-                    "score": score
-                })
-    return result
-
-
-def read_predict_from_hdfs(hdfs_path: str) -> list:
-    if not hdfs_path.endswith("/"):
-        hdfs_path += "/"
-    result = []
-    for file in client.list(hdfs_path):
-        with client.read(hdfs_path + file) as reader:
-            with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
-                for line in gz_file.read().decode("utf-8").split("\n"):
-                    split = line.split("\t")
-                    if len(split) == 4:
-                        cid = split[3].split("_")[0]
-                        label = int(split[0])
-                        score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
-                        result.append({
-                            "cid": cid,
-                            "label": label,
-                            "score": score
-                        })
-
-    return result
-
-
-def _segment_v1(scores, step):
-    bins = []
-    for i in range(0, len(scores), int((len(scores) / step))):
-        if i == 0:
-            bins.append(0)
-        else:
-            bins.append(scores[i])
-    bins.append(1)
-    return list(OrderedDict.fromkeys(bins))
-
-
-def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
-    sored_df = df.sort_values(by=['score'])
-    # 评估分数分段
-    scores = sored_df['score'].values
-
-    bins = _segment_v1(scores, step)
-
-    # 等分分桶
-    # split_indices = np.array_split(np.arange(len(scores)), step)
-    # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
-
-    sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
-
-    # 计算分段内分数的差异
-    group_df = sored_df.groupby("score_segment", observed=True).agg(
-        segment_label_sum=('label', 'sum'),
-        segment_label_cnt=('label', 'count'),
-        segment_score_avg=('score', 'mean'),
-    ).reset_index()
-    group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
-    group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
-
-    # 完整的分段文件保存
-    csv_data = group_df.to_csv(sep="\t", index=False)
-    with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
-        writer.write(csv_data)
-
-    filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
-    filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
-    # 每条曝光数据添加对应分数的diff
-    merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
-
-    merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
-    return merged_df, filtered_df
-
-
-def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
-    """
-    读取评估结果,并进行校准
-    """
-    # 本地调试使用
-    # predicts = read_predict_from_local_txt(predict_path)
-    predicts = read_predict_from_hdfs(predict_path)
-    df = pd.DataFrame(predicts)
+def parse_predict_line(line: str) -> [bool, dict]:
+    sp = line.replace("\n", "").split("\t")
+    if len(sp) == 4:
+        label = int(sp[0])
+        cid = sp[3].split("_")[0]
+        score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+        return True, {
+            "label": label,
+            "cid": cid,
+            "score": score
+        }
+    return False, {}
 
-    # 模型分分段计算与真实ctcvr的dff_rate
-    predict_basename = os.path.basename(predict_path)
-    if predict_basename.endswith("/"):
-        predict_basename = predict_basename[:-1]
-    df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
-
-    # 生成校准后的分数
-    df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
-
-    # 按CID统计真实ctcvr和校准前后的平均模型分
-    grouped_df = df.groupby("cid").agg(
-        view=('cid', 'size'),
-        conv=('label', 'sum'),
-        score_avg=('score', lambda x: round(x.mean(), 6)),
-        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
-    ).reset_index()
-    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
 
-    return df, grouped_df, segment_df
+def read_predict_file(file_path: str) -> pd.DataFrame:
+    result = []
+    if file_path.startswith("/dw"):
+        if not file_path.endswith("/"):
+            file_path += "/"
+        for file in client.list(file_path):
+            with client.read(file_path + file) as reader:
+                with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
+                    for line in gz_file.read().decode("utf-8").split("\n"):
+                        b, d = parse_predict_line(line)
+                        if b: result.append(d)
+    else:
+        with open(file_path, "r") as f:
+            for line in f.readlines():
+                b, d = parse_predict_line(line)
+                if b: result.append(d)
+    return pd.DataFrame(result)
+
+
+def calibration_file_save(df: pd.DataFrame, file_path: str):
+    if file_path.startswith("/dw"):
+        # 完整的分段文件保存
+        with client.write(file_path, encoding='utf-8', overwrite=True) as writer:
+            writer.write(df.to_csv(sep="\t", index=False))
+    else:
+        df.tocsv(file_path, sep="\t", index=False)
 
 
 def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
@@ -134,27 +59,96 @@ def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
     """
     d = {"old": old_df, "new": new_df}
     for key in d:
-        df = d[key][['label', "score"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
-        df = d[key][['label', "score_2"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
+        df = d[key]
+        if 'score' in df.columns:
+            score_df = df[['label', "score"]]
+            score_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
+        if 'score_2' in df.columns:
+            score_2_df = d[key][['label', "score_2"]]
+            score_2_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
+
+
+def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str):
+    if segment_file_path.startswith("/dw"):
+        # 完整的分段文件保存
+        with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
+            writer.write(df.to_csv(sep="\t", index=False))
+    else:
+        df.to_csv(segment_file_path, sep="\t", index=False)
+
+
+def get_predict_calibration_file(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]:
+    """
+    计算模型分的diff_rate
+    """
+    agg_df = predict_df_agg(df)
+    agg_df['diff_rate'] = (agg_df['score_avg'] / agg_df['true_ctcvr'] - 1).mask(agg_df['true_ctcvr'] == 0, 0).round(6)
+    condition = 'view > 1000 and diff_rate >= 0.2'
+    save_full_calibration_file(agg_df, f"{SEGMENT_BASE_PATH}/{predict_basename}.txt")
+    calibration = agg_df[(agg_df['view'] > 1000) & ((agg_df['diff_rate'] >= 0.2) | (agg_df['diff_rate'] <= 0.2)) & agg_df['diff_rate'] != 0]
+    return calibration
+
+
+def get_predict_basename(predict_path) -> [str]:
+    """
+    获取文件路径的最后一部分,作为与模型关联的文件名
+    """
+    predict_basename = os.path.basename(predict_path)
+    if predict_basename.endswith("/"):
+        predict_basename = predict_basename[:-1]
+
+    return predict_basename
+
+
+def calc_calibration_score2(df: pd.DataFrame, calibration_df: pd.DataFrame) -> [pd.DataFrame]:
+    calibration_df = calibration_df[['cid', 'diff_rate']]
+    df = pd.merge(df, calibration_df, on='cid', how='left').fillna(0)
+    df['score_2'] = df['score'] / (1 + df['diff_rate'])
+    return df
+
+
+def predict_df_agg(df: pd.DataFrame) -> [pd.DataFrame]:
+    # 基础聚合操作
+    agg_operations = {
+        'view': ('cid', 'size'),
+        'conv': ('label', 'sum'),
+        'score_avg': ('score', lambda x: round(x.mean(), 6)),
+    }
+
+    # 如果存在 score_2 列,则增加相关聚合
+    if "score_2" in df.columns:
+        agg_operations['score_2_avg'] = ('score_2', lambda x: round(x.mean(), 6))
+
+    grouped_df = df.groupby("cid").agg(**agg_operations).reset_index()
+    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
+
+    return grouped_df
 
 
 def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
-    old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
-    new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
+    old_df = read_predict_file(old_predict_path)
+    new_df = read_predict_file(new_predict_path)
 
+    old_calibration_df = get_predict_calibration_file(old_df, get_predict_basename(old_predict_path))
+    old_df = calc_calibration_score2(old_df, old_calibration_df)
+
+    new_calibration_df = get_predict_calibration_file(new_df, get_predict_basename(new_predict_path))
+    new_df = calc_calibration_score2(new_df, new_calibration_df)
+
+    # 本地保存label、score以及校准后的score,用于计算AUC等信息
     predict_local_save_for_auc(old_df, new_df)
 
-    # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
-    new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
+    # 新模型校准文件保存本地,用于同步OSS
+    new_calibration_df[['cid', 'diff_rate']].to_csv(calibration_file, sep="\t", index=False, header=False)
 
-    # 字段重命名,和列过滤
-    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
-    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
-    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
-    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
+    old_agg_df = predict_df_agg(old_df)
+    new_agg_df = predict_df_agg(new_df)
 
+    # 字段重命名,和列过滤
+    old_agg_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_agg_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_group_df = old_agg_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+    new_group_df = new_agg_df[['cid', 'new_score_avg', 'new_score_2_avg']]
     merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
 
     # 计算与真实ctcvr的差异值
@@ -183,7 +177,7 @@ def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, a
 
 
 if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description="model_predict_analyse.py")
+    parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
     parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
     parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
     parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")

+ 4 - 1
src/main/java/examples/utils/AdUtil.java

@@ -57,7 +57,10 @@ public class AdUtil {
             return false;
         }
         if (extendAlgJson.containsKey("extinfo")) {
-            return IS_API_FLAG.equals(extendAlgJson.getJSONObject("extinfo").getString("isApi"));
+            JSONObject extInfoJson = extendAlgJson.getJSONObject("extinfo");
+            if (extInfoJson.containsKey("isApi")) {
+                return IS_API_FLAG.equals(extInfoJson.getString("isApi"));
+            }
         }
         if (extendAlgJson.containsKey("is_api")) {
             return IS_API_FLAG.equals(extendAlgJson.getString("is_api"));

+ 2 - 5
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_31_originData_20240718.scala

@@ -5,7 +5,7 @@ import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
 import examples.extractor.RankExtractorFeature_20240530
-import examples.utils.DateTimeUtil
+import examples.utils.{AdUtil, DateTimeUtil}
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.SparkSession
 import org.xm.Similarity
@@ -55,10 +55,7 @@ object makedata_ad_31_originData_20240718 {
             transfer = func,
             numPartition = tablePart)
           .filter(record => {
-            val extendAlg: JSONObject = if (record.isNull("extend_alg")) new JSONObject() else
-              JSON.parseObject(record.getString("extend_alg"))
-            val isApi = extendAlg.getString("is_api")
-            "1".equals(isApi)
+            AdUtil.isApi(record)
           })
           .map(record => {
 

文件差異過大導致無法顯示
+ 37 - 45
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataPrint_20240718.scala


+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_41_originData_20250218.scala

@@ -12,7 +12,7 @@ import org.xm.Similarity
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 /*
-   20240608 提取特征
+   20250218 提取特征
  */
 
 object makedata_recsys_41_originData_20250218 {

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_61_bucket_20241209.scala

@@ -55,7 +55,7 @@ object makedata_recsys_61_bucket_20241209 {
         doubles.put(r._1, jsons.getDoubleValue(r._1))
       })
       doubles
-    }).sample(false, sampleRate).repartition(20)
+    }).sample(false, sampleRate).repartition(32).persist()
 
     val result = new ArrayBuffer[String]()
 

+ 144 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_61_nor_sample_20241209.scala

@@ -0,0 +1,144 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
+import examples.extractor.ExtractorUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+import scala.util.Random
+
+object makedata_recsys_61_nor_sample_20241209 {
+  def main(args: Array[String]): Unit = {
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/61_origin_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_recsys_nor_train_data/")
+    val beginStr = param.getOrElse("beginStr", "20241210")
+    val endStr = param.getOrElse("endStr", "20241210")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val whatLabel = param.getOrElse("whatLabel", "total_return_uv_new")
+    val whatApps = param.getOrElse("whatApps", "0,4,5,21,3,6").split(",").toSet
+    val fuSampleRate = param.getOrElse("fuSampleRate", "-1.0").toDouble
+    val featureNameFile = param.getOrElse("featureName", "20241209_recsys_nor_name.txt")
+    val featureBucketFile = param.getOrElse("featureBucket", "20241209_recsys_nor_bucket.txt")
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val featureNameSet = loadUseFeatureNames(loader, featureNameFile)
+    val featureBucketMap = loadUseFeatureBuckets(loader, featureBucketFile)
+    val bucketsMap_br = sc.broadcast(featureBucketMap)
+
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (date <- dateRange) {
+      println("开始执行:" + date)
+      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val labelKey = rList(1)
+          val jsons = JSON.parseObject(rList(2))
+          val features = scala.collection.mutable.Map[String, Double]()
+          jsons.foreach(r => {
+            features.put(r._1, jsons.getDoubleValue(r._1))
+          })
+          (logKey, labelKey, features)
+        })
+        .filter {
+          case (logKey, labelKey, features) =>
+            val logKeyList = logKey.split(",")
+            val apptype = logKeyList(0)
+            val pagesource = logKeyList(1)
+            whatApps.contains(apptype) && pagesource.endsWith("recommend")
+        }.filter {
+          case (logKey, labelKey, features) =>
+            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString.toInt
+            label > 0 || new Random().nextDouble() <= fuSampleRate
+        }
+        .map {
+          case (logKey, labelKey, features) =>
+            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
+            (label, features)
+        }
+        .mapPartitions(row => {
+          val result = new ArrayBuffer[String]()
+          val bucketsMap = bucketsMap_br.value
+          row.foreach {
+            case (label, features) =>
+              val featuresBucket = features.map {
+                case (name, score) =>
+                  if (!featureNameSet.contains(name)) {
+                    ""
+                  } else {
+                    if (score > 1E-8) {
+                      if (bucketsMap.contains(name)) {
+                        val (bucketsNum, buckets) = bucketsMap(name)
+                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        name + ":" + scoreNew.toString
+                      } else {
+                        name + ":" + score.toString
+                      }
+                    } else {
+                      ""
+                    }
+                  }
+              }.filter(_.nonEmpty)
+              result.add(label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+        })
+
+      // 4 保存数据到hdfs
+      val hdfsPath = savePath + "/" + date
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+
+  private def loadFileData(loader: ClassLoader, nameFile: String): String = {
+    val resourceUrlBucket = loader.getResource(nameFile)
+    val data =
+      if (resourceUrlBucket != null) {
+        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
+        Source.fromURL(resourceUrlBucket).close()
+        buckets
+      } else {
+        ""
+      }
+    data
+  }
+
+  private def loadUseFeatureNames(loader: ClassLoader, nameFile: String): Set[String] = {
+    val names = loadFileData(loader, nameFile)
+    println(names)
+    names.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .toSet
+  }
+
+  private def loadUseFeatureBuckets(loader: ClassLoader, nameFile: String): Map[String, (Double, Array[Double])] = {
+    val buckets = loadFileData(loader, nameFile)
+    println(buckets)
+    buckets.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .map(r => {
+        val rList = r.split("\t")
+        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
+      }).toMap
+  }
+}

+ 11 - 7
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_61_originData_20241209.scala

@@ -329,7 +329,7 @@ object makedata_recsys_61_originData_20241209 {
       if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
         println("删除路径并开始数据写入:" + hdfsPath)
         MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        odpsData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
       } else {
         println("路径不合法,无法写入:" + hdfsPath)
       }
@@ -372,12 +372,16 @@ object makedata_recsys_61_originData_20241209 {
   def truncateDecimal(obj: JSONObject, scale: Int = 6): JSONObject = {
     val data = new JSONObject()
     for (key <- obj.keySet()) {
-      val value = obj.getDoubleValue(key)
-      if (value == value.floor) {
-        data.put(key, value)
-      } else {
-        val newValue = BigDecimal(value).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble
-        data.put(key, newValue)
+      try {
+        val value = obj.getDoubleValue(key)
+        if (value == value.floor) {
+          data.put(key, value)
+        } else {
+          val newValue = BigDecimal(value).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble
+          data.put(key, newValue)
+        }
+      } catch {
+        case e: Exception => System.err.println(e.getMessage)
       }
     }
     data

+ 148 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_61_rov_sample_20241209.scala

@@ -0,0 +1,148 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
+import examples.extractor.ExtractorUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+import scala.util.Random
+
+/*
+
+ */
+
+object makedata_recsys_61_rov_sample_20241209 {
+  def main(args: Array[String]): Unit = {
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/61_origin_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_recsys_rov_train_data/")
+    val beginStr = param.getOrElse("beginStr", "20241210")
+    val endStr = param.getOrElse("endStr", "20241210")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val whatLabel = param.getOrElse("whatLabel", "is_return")
+    val whatApps = param.getOrElse("whatApps", "0,4,5,21,3,6").split(",").toSet
+    val fuSampleRate = param.getOrElse("fuSampleRate", "1.0").toDouble
+    val featureNameFile = param.getOrElse("featureName", "20241209_recsys_rov_name.txt")
+    val featureBucketFile = param.getOrElse("featureBucket", "20241209_recsys_rov_bucket.txt")
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val featureNameSet = loadUseFeatureNames(loader, featureNameFile)
+    val featureBucketMap = loadUseFeatureBuckets(loader, featureBucketFile)
+    val bucketsMap_br = sc.broadcast(featureBucketMap)
+
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (date <- dateRange) {
+      println("开始执行:" + date)
+      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val labelKey = rList(1)
+          val jsons = JSON.parseObject(rList(2))
+          val features = scala.collection.mutable.Map[String, Double]()
+          jsons.foreach(r => {
+            features.put(r._1, jsons.getDoubleValue(r._1))
+          })
+          (logKey, labelKey, features)
+        })
+        .filter {
+          case (logKey, labelKey, features) =>
+            val logKeyList = logKey.split(",")
+            val apptype = logKeyList(0)
+            val pagesource = logKeyList(1)
+            whatApps.contains(apptype) && pagesource.endsWith("recommend")
+        }.filter {
+          case (logKey, labelKey, features) =>
+            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
+            "1".equals(label) || new Random().nextDouble() <= fuSampleRate
+        }
+        .map {
+          case (logKey, labelKey, features) =>
+            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
+            (label, features)
+        }
+        .mapPartitions(row => {
+          val result = new ArrayBuffer[String]()
+          val bucketsMap = bucketsMap_br.value
+          row.foreach {
+            case (label, features) =>
+              val featuresBucket = features.map {
+                case (name, score) =>
+                  if (!featureNameSet.contains(name)) {
+                    ""
+                  } else {
+                    if (score > 1E-8) {
+                      if (bucketsMap.contains(name)) {
+                        val (bucketsNum, buckets) = bucketsMap(name)
+                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        name + ":" + scoreNew.toString
+                      } else {
+                        name + ":" + score.toString
+                      }
+                    } else {
+                      ""
+                    }
+                  }
+              }.filter(_.nonEmpty)
+              result.add(label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+        })
+
+      // 4 保存数据到hdfs
+      val hdfsPath = savePath + "/" + date
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+
+  private def loadFileData(loader: ClassLoader, nameFile: String): String = {
+    val resourceUrlBucket = loader.getResource(nameFile)
+    val data =
+      if (resourceUrlBucket != null) {
+        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
+        Source.fromURL(resourceUrlBucket).close()
+        buckets
+      } else {
+        ""
+      }
+    data
+  }
+
+  private def loadUseFeatureNames(loader: ClassLoader, nameFile: String): Set[String] = {
+    val names = loadFileData(loader, nameFile)
+    println(names)
+    names.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .toSet
+  }
+
+  private def loadUseFeatureBuckets(loader: ClassLoader, nameFile: String): Map[String, (Double, Array[Double])] = {
+    val buckets = loadFileData(loader, nameFile)
+    println(buckets)
+    buckets.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .map(r => {
+        val rList = r.split("\t")
+        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
+      }).toMap
+  }
+}

部分文件因文件數量過多而無法顯示