Browse Source

add new feat

yuehailiang 1 month ago
parent
commit
89b399363c

+ 462 - 0
ad/02_ad_model_14d_v1_update_yhl.sh

@@ -0,0 +1,462 @@
+#!/bin/sh
+set -x
+
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+export PREDICT_CACHE_PATH=/root/fengzhoutian/xgboost-dev/predict_cache/
+export SEGMENT_BASE_PATH=/dw/recommend/model/36_model_attachment/score_calibration_file
+
+sh_path=$(cd $(dirname $0); pwd)
+source ${sh_path}/00_common.sh
+
+source /root/anaconda3/bin/activate py37
+
+
+# 全局常量
+LOG_PREFIX=广告模型训练任务-离线测试-dev20250623
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_dev_20250623
+BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_dev_20250623
+TABLE=alg_recsys_ad_sample_all
+# 特征文件名
+feature_file=20240703_ad_feature_name.txt
+# 模型本地临时保存路径
+model_local_home=/root/yuehailiang/xgboost-dev/
+
+# 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
+MODEL_PATH=/dw/recommend/model/35_ad_model_dev_20250623
+# 预测结果保存路径,测试时修改为其他路径,避免影响线上
+PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data_dev_20250623
+# 模型OSS保存路径,测试时修改为其他路径,避免影响线上
+#MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou-internal.aliyuncs.com/fengzhoutian/
+# 线上模型名,测试时修改为其他模型名,避免影响线上
+model_name=model_xgb_dev_20250623
+model_ver=dev_20250623
+model_name=${model_name}_${model_ver}
+model_local_home=${model_local_home}/${model_name}
+# 线上校准文件名
+OSS_CALIBRATION_FILE_NAME=${model_name}_calibration
+# 用于存放一些临时的文件
+PREDICT_CACHE_PATH=/root/yuehailiang/xgboost-dev/predict_cache/
+
+
+# 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
+model_path_file=${model_local_home}/online_model_path.txt
+# 获取当前是星期几,1表示星期一
+current_day_of_week="$(date +"%u")"
+
+# 任务开始时间
+start_time=$(date +%s)
+# 前一天
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+# 线上模型在HDFS中的路径
+online_model_path=`cat ${model_path_file}`
+# 训练用的数据路径
+train_data_path=""
+train_data_days=14
+# 评估用的数据路径
+predict_date_path=""
+#评估结果保存路径
+new_model_predict_result_path=""
+# 模型保存路径
+model_save_path=""
+# 评测结果保存路径,后续需要根据此文件评估是否要更新模型
+predict_analyse_file_path=""
+# 校准文件保存路径
+calibration_file_path=""
+
+# 保存模型评估的分析结果
+old_incr_rate_avg=0
+new_incr_rate_avg=0
+# Top10的详情
+top10_msg=""
+# AUC值
+old_auc=0
+new_auc=0
+
+declare -A real_score_map
+declare -A old_score_map
+declare -A new_score_map
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+    local msg=$4
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$(($step_end_time - $step_start_time))
+
+    if [[ -n "${old_auc}" && "${old_auc}" != "0" ]]; then
+      msg+="\n\t - 老模型AUC: ${old_auc}"
+    fi
+    if [[ -n "${new_auc}" && "${new_auc}" != "0" ]]; then
+      msg+="\n\t - 新模型AUC: ${new_auc}"
+    fi
+
+
+    if [ ${status} -ne 0 ]; then
+        echo "${LOG_PREFIX} -- ${step_name}失败: 耗时 ${step_elapsed}"
+        local elapsed=$(($step_end_time - $start_time))
+        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+        exit 1
+    else
+        echo "${LOG_PREFIX} -- ${step_name}成功: 耗时 ${step_elapsed}"
+    fi
+}
+
+send_success_upload_msg(){ 
+  # 发送更新成功通知
+  local msg=" 广告模型文件更新完成"
+  msg+="\n\t - 老模型AUC: ${old_auc}"
+  msg+="\n\t - 新模型AUC: ${new_auc}"
+  msg+="\n\t - 老模型Top10差异平均值: ${old_incr_rate_avg}"
+  msg+="\n\t - 新模型Top10差异平均值: ${new_incr_rate_avg}"
+  msg+="\n\t - 模型在HDFS中的路径: ${model_save_path}"
+  msg+="\n\t - 模型上传OSS中的路径: ${MODEL_OSS_PATH}/${model_name}.tar.gz"
+
+  local step_end_time=$(date +%s)
+  local elapsed=$((${step_end_time} - ${start_time}))
+
+  /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+}
+
+init() {
+  set +x
+  declare -a date_keys=()
+  local count=1
+  local current_data="$(date -d "${today_early_1} -1 day" +%Y%m%d)"
+  # 循环获取前 n 天的非节日日期
+  while [[ ${count} -le $train_data_days ]]; do
+    date_key=$(date -d "${current_data}" +%Y%m%d)
+    # 判断是否是节日,并拼接训练数据路径
+    if [ $(is_not_holidays ${date_key}) -eq 1 ]; then
+
+      # 将 date_key 放入数组
+      date_keys+=("${date_key}")
+
+      if [[ -z ${train_data_path} ]]; then
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
+      else
+        train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
+      fi 
+      count=$((count + 1))
+    else
+      echo "日期: ${date_key}是节日,跳过"
+    fi
+    current_data=$(date -d "${current_data} -1 day" +%Y%m%d)
+  done
+
+  last_index=$((${#date_keys[@]} - 1))
+  train_first_day=${date_keys[$last_index]}
+  train_last_day=${date_keys[0]}
+
+  model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
+  predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
+  new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_${model_ver}_${train_first_day: -4}_${train_last_day: -4}
+  online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_${model_ver}_${online_model_path: -9}
+  predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_${model_ver}_analyse.txt
+  calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt
+
+  echo "init param train_data_path: ${train_data_path}"
+  echo "init param predict_date_path: ${predict_date_path}"
+  echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
+  echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
+  echo "init param model_save_path: ${model_save_path}"
+  echo "init param online_model_path: ${online_model_path}"
+  echo "init param feature_file: ${feature_file}"
+  echo "init param model_name: ${model_name}"
+  echo "init param model_local_home: ${model_local_home}"
+  echo "init param model_oss_path: ${MODEL_OSS_PATH}"
+  echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
+  echo "init param calibration_file_path: ${calibration_file_path}"
+  echo "init param current_day_of_week: ${current_day_of_week}"
+
+  echo "当前Python环境安装的Python版本: $(python --version)"
+  echo "当前Python环境安装的三方包: $(python -m pip list)"
+  set -x
+  mkdir -p ${model_local_home}
+}
+
+# 校验大数据任务是否执行完成
+check_ad_hive() {
+  local step_start_time=$(date +%s)
+  local max_hour=05
+  local max_minute=30
+  local elapsed=0
+  while true; do
+      local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
+
+      elapsed=$(($(date +%s) - ${step_start_time}))
+      if [ "${python_return_code}" -eq 0 ]; then
+          break
+      fi
+      echo "Python程序返回非0值,等待五分钟后再次调用。"
+      sleep 300
+      local current_hour=$(date +%H)
+      local current_minute=$(date +%M)
+      if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then
+          local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
+          echo -e "${LOG_PREFIX} -- 大数据数据生产校验 -- ${msg}: 耗时 ${elapsed}"
+          /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}"
+          exit 1
+      fi
+  done
+  echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 ${elapsed}"
+}
+
+origin_data() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_origin_data
+  )
+}
+
+bucket_feature() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_bucket_feature
+  )
+}
+
+xgb_train() {
+  local step_start_time=$(date +%s)
+
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20250104 \
+  --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 2 --num-executors 11 \
+  --conf spark.yarn.executor.memoryoverhead=2048 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/fengzhoutian/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  trainPath:${train_data_path} \
+  testPath:${predict_date_path} \
+  savePath:${new_model_predict_result_path} \
+  modelPath:${model_save_path} \
+  eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:10 repartition:20 \
+  negSampleRate:0.04
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "XGB模型训练任务" "XGB模型训练失败"
+}
+
+calc_model_predict() {
+  local count=0
+  local max_line=10
+  local old_total_diff=0
+  local new_total_diff=0
+  top10_msg="| CID  | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
+  top10_msg+=" \n| ---- | --------- | -------- |"
+  while read -r line && [ ${count} -lt ${max_line} ]; do
+
+      # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
+      if [[ "${line}" == *"cid"* ]]; then
+          continue
+      fi
+
+      read -a numbers <<< "${line}"
+
+      # 分数分别保存
+      real_score_map[${numbers[0]}]=${numbers[3]}
+      old_score_map[${numbers[0]}]=${numbers[6]}
+      new_score_map[${numbers[0]}]=${numbers[7]}
+
+      # 拼接Top10详情的飞书消息
+      top10_msg="${top10_msg} \n| ${numbers[0]} | ${numbers[6]} | ${numbers[7]} | "
+
+      # 计算top10相对误差绝对值的均值
+      old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l )
+      new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l )
+
+      old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
+      new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
+
+      count=$((${count} + 1))
+
+  done < "${predict_analyse_file_path}"
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "计算Top10差异" "计算Top10差异异常"
+
+  old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算老模型Top10差异" "计算老模型Top10差异异常"
+
+
+  new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
+  check_run_status $? ${step_start_time} "计算新模型Top10差异" "计算新模型Top10差异异常"
+
+  echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
+  echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
+  echo "新老模型分数对比: "
+  for cid in "${!new_score_map[@]}"; do
+    echo "\t CID: $cid, 老模型分数: ${old_score_map[$cid]}, 新模型分数: ${new_score_map[$cid]}"
+  done
+}
+
+calc_auc() {
+  old_auc=`cat ${PREDICT_CACHE_PATH}/old_1.txt | /root/sunmingze/AUC/AUC`
+  new_auc=`cat ${PREDICT_CACHE_PATH}/new_1.txt | /root/sunmingze/AUC/AUC`
+}
+
+model_predict() {
+  # 线上模型评估最新的数据
+  local step_start_time=$(date +%s)
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
+  --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 30 \
+  --conf spark.yarn.executor.memoryoverhead=1024 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/fengzhoutian/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  testPath:${predict_date_path} \
+  savePath:${online_model_predict_result_path} \
+  negSampleRate:0.04 \
+  modelPath:${online_model_path}
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
+}
+
+compare_predictions() {
+  local step_start_time=$(date +%s)
+
+  mkdir -p ${model_local_home}/predict_analyse_file
+  # 结果分析
+  python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path}
+  local python_return_code=$?
+  check_run_status ${python_return_code} ${step_start_time} "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
+
+  calc_model_predict
+
+  calc_auc
+
+  if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then 
+    echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    check_run_status 1 ${step_start_time} "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    exit 1
+  fi 
+
+
+  # 对比两个模型的差异
+  score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
+  if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then 
+    echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
+    check_run_status 1 ${step_start_time} "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
+    exit 1
+  fi 
+}
+
+draw_q_distribution() {
+  local step_start_time=$(date +%s)
+
+  python ${sh_path}/draw_predict_distribution.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} --output ${today_early_1}_${model_ver}_${train_first_day: -4}_${train_last_day: -4}.png
+  python_return_code=$?
+}
+
+
+model_upload_oss() {
+  local step_start_time=$(date +%s)
+
+  (
+    cd ${model_local_home}
+
+    ${HADOOP} fs -get ${model_save_path} ${model_name}
+    if [ ! -d ${model_name} ]; then
+      echo "从HDFS下载模型失败"
+      check_run_status 1 ${step_start_time} "HDFS下载模型任务" "HDFS下载模型失败" 
+      exit 1 
+    fi
+
+    tar -czvf ${model_name}_tmp.tar.gz -C ${model_name} .
+    
+    # 将模型文件和校准文件推送到OSS上
+    ${HADOOP} fs -put -f ${model_name}_tmp.tar.gz ${MODEL_OSS_PATH}
+    local return_code=$?
+    check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
+
+    ${HADOOP} fs -cp -f ${MODEL_OSS_PATH}/${model_name}_tmp.tar.gz ${MODEL_OSS_PATH}/${model_name}.tar.gz
+    local return_code=$?
+    check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型重命名OSS失败"
+
+    echo ${model_save_path} > ${model_path_file}
+  )
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
+
+  local step_end_time=$(date +%s)
+  local elapsed=$((${step_end_time} - ${start_time}))
+  echo -e "${LOG_PREFIX} -- 模型更新完成 -- 模型更新成功: 耗时 ${elapsed}"
+  
+  send_success_upload_msg
+  # 
+  cd ${model_local_home}
+  rm -v -f ./${model_name}_tmp.tar.gz
+  rm -v -rf ./${model_name}
+  rm -v -rf ${OSS_CALIBRATION_FILE_NAME}.txt
+}
+
+get_feature_score() {
+  # 线上模型评估最新的数据
+  local step_start_time=$(date +%s)
+  /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+  --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
+  --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 3 \
+  --conf spark.yarn.executor.memoryoverhead=1024 \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.shuffle.service.port=7337 \
+  --conf spark.shuffle.consolidateFiles=true \
+  --conf spark.shuffle.manager=sort \
+  --conf spark.storage.memoryFraction=0.4 \
+  --conf spark.shuffle.memoryFraction=0.5 \
+  --conf spark.default.parallelism=200 \
+  /root/fengzhoutian/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+  featureFile:20240703_ad_feature_name.txt \
+  saveFeatureScoresOnly:true \
+  savePath:"/dw/recommend/model/37_model_feature_scores/${model_name}" \
+  modelPath:"/dw/recommend/model/35_ad_model/${model_name}"
+}
+
+make_data() {
+  origin_data
+  bucket_feature
+}
+
+# 主方法
+/*
+main() {
+  init
+
+  check_ad_hive
+  make_data
+
+  xgb_train
+  model_predict
+  # get_feature_score
+  compare_predictions
+  draw_q_distribution
+  model_upload_oss
+}
+
+fe() {
+  init
+  check_ad_hive
+  origin_data
+}
+
+
+main

+ 154 - 0
ad/25_xgb_make_data_origin_bucket_yhl.sh

@@ -0,0 +1,154 @@
+#!/bin/sh
+set -x
+
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+
+
+sh_path=$(dirname $0)
+source ${sh_path}/00_common.sh
+
+source /root/anaconda3/bin/activate py37
+
+make_origin_data() {
+  
+  local step_start_time=$(date +%s)
+
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20250623 \
+  --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  tablePart:64 repartition:32 \
+  beginStr:${today_early_1}00 endStr:${today_early_1}12 \
+  savePath:${TRAIN_PATH} \
+  table:${TABLE} \
+  filterHours:00,01,02,03,04,05,06,07 \
+  idDefaultValue:0.1 &
+  local task1=$!
+
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20250623 \
+  --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  tablePart:64 repartition:32 \
+  beginStr:${today_early_1}13 endStr:${today_early_1}18 \
+  savePath:${TRAIN_PATH} \
+  table:${TABLE} \
+  filterHours:00,01,02,03,04,05,06,07 \
+  idDefaultValue:0.1 &
+  local task2=$!
+
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20250623 \
+  --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  tablePart:64 repartition:32 \
+  beginStr:${today_early_1}19 endStr:${today_early_1}23 \
+  savePath:${TRAIN_PATH} \
+  table:${TABLE} \
+  filterHours:00,01,02,03,04,05,06,07 \
+  idDefaultValue:0.1 &
+  local task3=$!
+
+  wait ${task1}
+  local task1_return_code=$?
+
+  wait ${task2}
+  local task2_return_code=$?
+
+  wait ${task3}
+  local task3_return_code=$?
+
+
+  check_run_status ${task1_return_code} ${step_start_time} "spark原始样本生产任务: 生产00~12数据异常"
+  check_run_status ${task2_return_code} ${step_start_time} "spark原始样本生产任务: 生产13~18数据异常"
+  check_run_status ${task3_return_code} ${step_start_time} "spark原始样本生产任务: 生产19~23数据异常"
+}
+
+
+
+make_bucket_feature() {
+
+  local step_start_time=$(date +%s)
+  
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketData_20250110 \
+  --master yarn --driver-memory 2G --executor-memory 3G --executor-cores 1 --num-executors 16 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  beginStr:${today_early_1} endStr:${today_early_1} repartition:64 \
+  filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
+  bucketFileName:20250217_ad_bucket_688.txt \
+  readPath:${TRAIN_PATH} \
+  savePath:${BUCKET_FEATURE_PATH}
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "spark特征分桶任务"
+}
+
+make_bucket_feature_to_hive() {
+
+  local step_start_time=$(date +%s)
+  
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketDataToHive_20250110 \
+  --master yarn --driver-memory 2G --executor-memory 3G --executor-cores 1 --num-executors 16 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  beginStr:${today_early_1} endStr:${today_early_1} repartition:64 \
+  filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
+  table:ad_easyrec_eval_data_v2_sampled \
+  partition:"dt=${today_early_1}" \
+  readPath:${TRAIN_PATH} \
+  negSampleRate:0.04
+
+  local return_code=$?
+  check_run_status ${return_code} ${step_start_time} "spark特征分桶任务"
+}
+
+make_bucket_feature_from_origin_to_hive() {
+  local step_start_time=$(date +%s)
+  neg_sample_rate=${NEG_SAMPLE_RATE:-0.04}
+  
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketDataFromOriginToHive_20250228 \
+  --master yarn --driver-memory 2G --executor-memory 3G --executor-cores 1 --num-executors 30 \
+  --conf spark.dynamicAllocation.enabled=true \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.dynamicAllocation.maxExecutors=100 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  beginStr:${today_early_1} endStr:${today_early_1} \
+  filterHours:${FILTER_HOURS:-00,01,02,03,04,05,06,07} \
+  filterAdverIds:${FILTER_ADVER_IDS} \
+  filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
+  outputTable:${outputTable} \
+  inputTable:alg_recsys_ad_sample_all \
+  negSampleRate:${neg_sample_rate}
+  local task1=$!
+
+  /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+  --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketDataFromOriginToHive_20250522 \
+  --master yarn --driver-memory 2G --executor-memory 3G --executor-cores 1 --num-executors 30 \
+  --conf spark.dynamicAllocation.enabled=true \
+  --conf spark.shuffle.service.enabled=true \
+  --conf spark.dynamicAllocation.maxExecutors=100 \
+  ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+  beginStr:${today_early_1} endStr:${today_early_1} \
+  filterHours:${FILTER_HOURS:-00,01,02,03,04,05,06,07} \
+  filterAdverIds:${FILTER_ADVER_IDS} \
+  filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
+  outputTable:${outputTable1} \
+  inputTable:alg_recsys_ad_sample_all \
+  negSampleRate:${neg_sample_rate}
+  local task2=$!
+
+  wait ${task1}
+  local task1_return_code=$?
+
+  wait ${task2}
+  local task2_return_code=$?
+
+  check_run_status ${task1_return_code} ${step_start_time} "离线数据spark特征分桶任务"
+  check_run_status ${task2_return_code} ${step_start_time} "在线日志spark特征分桶任务"
+
+}

+ 474 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_31_originData_20250623.scala

@@ -0,0 +1,474 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.alibaba.fastjson.{JSON, JSONObject}
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import examples.extractor.RankExtractorFeature_20240530
+import examples.utils.{AdUtil, DateTimeUtil}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+import org.xm.Similarity
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/*
+   原始特征处理20250110版本,基于20240718修改
+   * 广告/创意粒度增加CTR类统计特征平滑
+ */
+
+object makedata_ad_31_originData_20250623 {
+  val WILSON_ZSCORE = 1.96
+  val CTR_SMOOTH_BETA_FACTOR = 25
+  val CVR_SMOOTH_BETA_FACTOR = 10
+  val CTCVR_SMOOTH_BETA_FACTOR = 100
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2024062008")
+    val endStr = param.getOrElse("endStr", "2024062023")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/31_ad_sample_data/")
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "alg_recsys_ad_sample_all")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
+    val idDefaultValue = param.getOrElse("idDefaultValue", "1.0").toDouble
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+
+    // 3 循环执行数据生产
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = s"dt=$dt,hh=$hh"
+      if (filterHours.nonEmpty && filterHours.contains(hh)) {
+        println("不执行partiton:" + partition)
+      } else {
+        println("开始执行partiton:" + partition)
+        val odpsData = odpsOps.readTable(project = project,
+            table = table,
+            partition = partition,
+            transfer = func,
+            numPartition = tablePart)
+          .filter(record => {
+            AdUtil.isApi(record)
+          })
+          .map(record => {
+
+            val ts = record.getString("ts").toInt
+            val cid = record.getString("cid")
+            val apptype = record.getString("apptype")
+            val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
+              JSON.parseObject(record.getString("extend"))
+
+            val featureMap = new JSONObject()
+
+            val mateFeature: JSONObject = if (record.isNull("metafeaturemap")) new JSONObject() else
+              JSON.parseObject(record.getString("metafeaturemap"))
+
+            val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b1_feature"))
+            val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b2_feature"))
+            val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b3_feature"))
+            val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b4_feature"))
+            val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b5_feature"))
+            val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b6_feature"))
+            val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b7_feature"))
+            val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b8_feature"))
+            val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("b9_feature"))
+
+
+            featureMap.put("cid_" + cid, idDefaultValue)
+            if (b1.containsKey("adid") && b1.getString("adid").nonEmpty) {
+              featureMap.put("adid_" + b1.getString("adid"), idDefaultValue)
+            }
+            if (b1.containsKey("adverid") && b1.getString("adverid").nonEmpty) {
+              featureMap.put("adverid_" + b1.getString("adverid"), idDefaultValue)
+            }
+            if (b1.containsKey("targeting_conversion") && b1.getString("targeting_conversion").nonEmpty) {
+              featureMap.put("targeting_conversion_" + b1.getString("targeting_conversion"), idDefaultValue)
+            }
+
+            val hour = DateTimeUtil.getHourByTimestamp(ts)
+            featureMap.put("hour_" + hour, idDefaultValue)
+
+            val dayOfWeek = DateTimeUtil.getDayOrWeekByTimestamp(ts)
+            featureMap.put("dayofweek_" + dayOfWeek, idDefaultValue);
+
+            featureMap.put("apptype_" + apptype, idDefaultValue);
+
+            if (extend.containsKey("abcode") && extend.getString("abcode").nonEmpty) {
+              featureMap.put("abcode_" + extend.getString("abcode"), idDefaultValue)
+            }
+
+
+            if (b1.containsKey("cpa")) {
+              featureMap.put("cpa", b1.getString("cpa").toDouble)
+            }
+            if (b1.containsKey("weight") && b1.getString("weight").nonEmpty) {
+              featureMap.put("weight", b1.getString("weight").toDouble)
+            }
+
+            for ((bn, prefix1) <- List(
+              (b2, "b2"), (b3, "b3"), (b4, "b4"), (b5, "b5"), (b8, "b8"), (b9, "b9")
+            )) {
+              for (prefix2 <- List(
+                "1h", "2h", "3h", "4h", "5h", "6h", "12h", "1d", "3d", "7d", "today", "yesterday"
+              )) {
+                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
+                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
+                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
+                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
+                // NOTE(zhoutian):
+                // 这里cpc只是为了计算cpm的平滑的工具量,没有实际业务意义,因为cpm并非比率,本身不适合直接计算Wilson平滑
+                // 不使用cpa的原因是未来可能出现广告采用cpc计费的情况或者无法获取转化量的情况,用点击更为稳定
+                // 其它几组特征亦采用相同逻辑
+                // 2025-02-17改为增加固定分母平滑,income实际已经可以直接参与cpm平滑计算
+                val cpc = if (click == 0) 0D else income / click
+                val f1 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR)
+                val f2 = RankExtractorFeature_20240530.divSmooth2(conver, view, CTCVR_SMOOTH_BETA_FACTOR)
+                val f3 = RankExtractorFeature_20240530.divSmooth2(conver, click, CVR_SMOOTH_BETA_FACTOR)
+                val f4 = conver
+                val f5 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR) * cpc * 1000
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
+
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
+              }
+            }
+
+            for ((bn, prefix1) <- List(
+              (b6, "b6"), (b7, "b7")
+            )) {
+              for (prefix2 <- List(
+                "7d", "14d"
+              )) {
+                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
+                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
+                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
+                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
+                val cpc = if (click == 0) 0D else income / click
+                val f1 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR)
+                val f2 = RankExtractorFeature_20240530.divSmooth2(conver, view, CTCVR_SMOOTH_BETA_FACTOR)
+                val f3 = RankExtractorFeature_20240530.divSmooth2(conver, click, CVR_SMOOTH_BETA_FACTOR)
+                val f4 = conver
+                val f5 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR) * cpc * 1000
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
+
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
+                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
+              }
+            }
+
+            val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("c1_feature"))
+
+            val midActionList = if (c1.containsKey("action") && c1.getString("action").nonEmpty) {
+              c1.getString("action").split(",").map(r => {
+                val rList = r.split(":")
+                (rList(0), (rList(1).toInt, rList(2).toInt, rList(3).toInt, rList(4).toInt, rList(5)))
+              }).sortBy(-_._2._1).toList
+            } else {
+              new ArrayBuffer[(String, (Int, Int, Int, Int, String))]().toList
+            }
+            // u特征
+            val viewAll = midActionList.size.toDouble
+            val clickAll = midActionList.map(_._2._2).sum.toDouble
+            val converAll = midActionList.map(_._2._3).sum.toDouble
+            val incomeAll = midActionList.map(_._2._4).sum.toDouble
+            featureMap.put("viewAll", viewAll)
+            featureMap.put("clickAll", clickAll)
+            featureMap.put("converAll", converAll)
+            featureMap.put("incomeAll", incomeAll)
+            featureMap.put("ctr_all", RankExtractorFeature_20240530.calDiv(clickAll, viewAll))
+            featureMap.put("ctcvr_all", RankExtractorFeature_20240530.calDiv(converAll, viewAll))
+            featureMap.put("cvr_all", RankExtractorFeature_20240530.calDiv(clickAll, converAll))
+            featureMap.put("ecpm_all", RankExtractorFeature_20240530.calDiv(incomeAll * 1000, viewAll))
+
+            // ui特征
+            val midTimeDiff = scala.collection.mutable.Map[String, Double]()
+            midActionList.foreach {
+              case (cid, (ts_history, click, conver, income, title)) =>
+                if (!midTimeDiff.contains("timediff_view_" + cid)) {
+                  midTimeDiff.put("timediff_view_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
+                }
+                if (!midTimeDiff.contains("timediff_click_" + cid) && click > 0) {
+                  midTimeDiff.put("timediff_click_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
+                }
+                if (!midTimeDiff.contains("timediff_conver_" + cid) && conver > 0) {
+                  midTimeDiff.put("timediff_conver_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
+                }
+            }
+
+            val midActionStatic = scala.collection.mutable.Map[String, Double]()
+            midActionList.foreach {
+              case (cid, (ts_history, click, conver, income, title)) =>
+                midActionStatic.put("actionstatic_view_" + cid, 1.0 + midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
+                midActionStatic.put("actionstatic_click_" + cid, click + midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
+                midActionStatic.put("actionstatic_conver_" + cid, conver + midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
+                midActionStatic.put("actionstatic_income_" + cid, income + midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
+            }
+
+            if (midTimeDiff.contains("timediff_view_" + cid)) {
+              featureMap.put("timediff_view", midTimeDiff.getOrDefault("timediff_view_" + cid, 0.0))
+            }
+            if (midTimeDiff.contains("timediff_click_" + cid)) {
+              featureMap.put("timediff_click", midTimeDiff.getOrDefault("timediff_click_" + cid, 0.0))
+            }
+            if (midTimeDiff.contains("timediff_conver_" + cid)) {
+              featureMap.put("timediff_conver", midTimeDiff.getOrDefault("timediff_conver_" + cid, 0.0))
+            }
+            if (midActionStatic.contains("actionstatic_view_" + cid)) {
+              featureMap.put("actionstatic_view", midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
+            }
+            if (midActionStatic.contains("actionstatic_click_" + cid)) {
+              featureMap.put("actionstatic_click", midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
+            }
+            if (midActionStatic.contains("actionstatic_conver_" + cid)) {
+              featureMap.put("actionstatic_conver", midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
+            }
+            if (midActionStatic.contains("actionstatic_income_" + cid)) {
+              featureMap.put("actionstatic_income", midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
+            }
+            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
+              featureMap.put("actionstatic_ctr", RankExtractorFeature_20240530.calDiv(
+                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0),
+                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
+              ))
+            }
+            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_conver_" + cid)) {
+              featureMap.put("actionstatic_ctcvr", RankExtractorFeature_20240530.calDiv(
+                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0),
+                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
+              ))
+            }
+            if (midActionStatic.contains("actionstatic_conver_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
+              featureMap.put("actionstatic_cvr", RankExtractorFeature_20240530.calDiv(
+                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0),
+                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0)
+              ))
+            }
+
+            val e1: JSONObject = if (record.isNull("e1_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("e1_feature"))
+            val e2: JSONObject = if (record.isNull("e2_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("e2_feature"))
+            val title = b1.getOrDefault("cidtitle", "").toString
+            if (title.nonEmpty) {
+              for ((en, prefix1) <- List((e1, "e1"), (e2, "e2"))) {
+                for (prefix2 <- List("tags_3d", "tags_7d", "tags_14d")) {
+                  if (en.nonEmpty && en.containsKey(prefix2) && en.getString(prefix2).nonEmpty) {
+                    val (f1, f2, f3, f4) = funcC34567ForTags(en.getString(prefix2), title)
+                    featureMap.put(prefix1 + "_" + prefix2 + "_matchnum", f1)
+                    featureMap.put(prefix1 + "_" + prefix2 + "_maxscore", f3)
+                    featureMap.put(prefix1 + "_" + prefix2 + "_avgscore", f4)
+
+                  }
+                }
+              }
+            }
+
+            val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("d1_feature"))
+            val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("d2_feature"))
+            val d3: JSONObject = if (record.isNull("d3_feature")) new JSONObject() else
+              JSON.parseObject(record.getString("d3_feature"))
+
+            if (d1.nonEmpty) {
+              for (prefix <- List("3h", "6h", "12h", "1d", "3d", "7d")) {
+                val view = if (!d1.containsKey("ad_view_" + prefix)) 0D else d1.getIntValue("ad_view_" + prefix).toDouble
+                val click = if (!d1.containsKey("ad_click_" + prefix)) 0D else d1.getIntValue("ad_click_" + prefix).toDouble
+                val conver = if (!d1.containsKey("ad_conversion_" + prefix)) 0D else d1.getIntValue("ad_conversion_" + prefix).toDouble
+                val income = if (!d1.containsKey("ad_income_" + prefix)) 0D else d1.getIntValue("ad_income_" + prefix).toDouble
+                val cpc = if (click == 0) 0D else income / click
+                val f1 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR)
+                val f2 = RankExtractorFeature_20240530.divSmooth2(conver, view, CTCVR_SMOOTH_BETA_FACTOR)
+                val f3 = RankExtractorFeature_20240530.divSmooth2(conver, click, CVR_SMOOTH_BETA_FACTOR)
+                val f4 = conver
+                val f5 = RankExtractorFeature_20240530.divSmooth2(click, view, CTR_SMOOTH_BETA_FACTOR) * cpc * 1000
+                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctr", f1)
+                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctcvr", f2)
+                featureMap.put("d1_feature" + "_" + prefix + "_" + "cvr", f3)
+                featureMap.put("d1_feature" + "_" + prefix + "_" + "conver", f4)
+                featureMap.put("d1_feature" + "_" + prefix + "_" + "ecpm", f5)
+              }
+            }
+
+            val vidRankMaps = scala.collection.mutable.Map[String, scala.collection.immutable.Map[String, Double]]()
+            if (d2.nonEmpty) {
+              d2.foreach(r => {
+                val key = r._1
+                val value = d2.getString(key).split(",").map(r => {
+                  val rList = r.split(":")
+                  (rList(0), rList(2).toDouble)
+                }).toMap
+                vidRankMaps.put(key, value)
+              })
+            }
+            for (prefix1 <- List("ctr", "ctcvr", "ecpm")) {
+              for (prefix2 <- List("1d", "3d", "7d", "14d")) {
+                if (vidRankMaps.contains(prefix1 + "_" + prefix2)) {
+                  val rank = vidRankMaps(prefix1 + "_" + prefix2).getOrDefault(cid, 0.0)
+                  if (rank >= 1.0) {
+                    featureMap.put("vid_rank_" + prefix1 + "_" + prefix2, 1.0 / rank)
+                  }
+                }
+              }
+            }
+
+            if (d3.nonEmpty) {
+              val vTitle = d3.getString("title")
+              val score = Similarity.conceptSimilarity(title, vTitle)
+              featureMap.put("ctitle_vtitle_similarity", score);
+            }
+
+            if (c1.containsKey("user_has_conver_1y") && c1.getInteger("user_has_conver_1y") != null) {
+              featureMap.put("user_has_conver_1y", c1.getInteger("user_has_conver_1y"))
+            }
+            val h1: JSONObject = if (!mateFeature.containsKey("alg_mid_feature_adver_action")) new JSONObject() else
+              mateFeature.getJSONObject("alg_mid_feature_adver_action")
+            val h2: JSONObject = if (!mateFeature.containsKey("alg_mid_feature_sku_action")) new JSONObject() else
+              mateFeature.getJSONObject("alg_mid_feature_sku_action")
+
+            // 定义时间维度和对应的前缀
+            val timeDimensions = Seq("3d", "7d", "30d")
+            for (dimension <- timeDimensions) {
+              if (h1.containsKey(dimension) && h1.getString(dimension).nonEmpty) {
+                val action = h1.getString(dimension).split(",")
+                if (action.length >= 3) {
+                  featureMap.put(s"user_adverid_view_${dimension}", action(0))
+                  featureMap.put(s"user_adverid_click_${dimension}", action(1))
+                  featureMap.put(s"user_adverid_conver_${dimension}", action(2))
+                }
+              }
+              if (h2.containsKey(dimension) && h2.getString(dimension).nonEmpty) {
+                val action = h2.getString(dimension).split(",")
+                if (action.length >= 3) {
+                  featureMap.put(s"user_skuid_view_${dimension}", action(0))
+                  featureMap.put(s"user_skuid_click_${dimension}", action(1))
+                  featureMap.put(s"user_skuid_conver_${dimension}", action(2))
+                }
+              }
+            }
+
+            /*
+            广告
+              sparse:cid adid adverid targeting_conversion
+
+              cpa --> 1个
+              adverid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr conver ecpm  --> 30个
+              cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
+              地理//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
+              app//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
+              手机品牌//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
+              系统 无数据
+              week//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
+              hour//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
+
+            用户
+              用户历史 点击/转化 的title tag;3d 7d 14d; cid的title; 数量/最高分/平均分 --> 18个
+              用户历史 14d 看过/点过/转化次数/income; ctr cvr ctcvr ecpm;  --> 8个
+
+              用户到cid的ui特征 --> 10个
+                1/用户最近看过这个cid的时间间隔
+                1/用户最近点过这个cid的时间间隔
+                1/用户最近转过这个cid的时间间隔
+                用户看过这个cid多少次
+                用户点过这个cid多少次
+                用户转过这个cid多少次
+                用户对这个cid花了多少钱
+                用户对这个cid的ctr ctcvr cvr
+
+            视频
+              title与cid的 sim-score-1/-2 无数据
+              vid//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
+              vid//cid下的 1d 3d 7d 14d、 ctr ctcvr ecpm 的rank值 倒数 --> 12个
+
+             */
+
+
+            //4 处理label信息。
+            val labels = new JSONObject
+            for (labelKey <- List("ad_is_click", "ad_is_conversion")) {
+              if (!record.isNull(labelKey)) {
+                labels.put(labelKey, record.getString(labelKey))
+              }
+            }
+            //5 处理log key表头。
+            val mid = record.getString("mid")
+            val headvideoid = record.getString("headvideoid")
+            val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
+            val labelKey = labels.toString()
+            val featureKey = featureMap.toString()
+            //6 拼接数据,保存。
+            logKey + "\t" + labelKey + "\t" + featureKey
+          })
+
+        // 4 保存数据到hdfs
+        val savePartition = dt + hh
+        val hdfsPath = savePath + "/" + savePartition
+        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+          println("删除路径并开始数据写入:" + hdfsPath)
+          MyHdfsUtils.delete_hdfs_path(hdfsPath)
+          odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        } else {
+          println("路径不合法,无法写入:" + hdfsPath)
+        }
+      }
+
+    }
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
+    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+    val tagsList = tags.split(",")
+    var d1 = 0.0
+    val d2 = new ArrayBuffer[String]()
+    var d3 = 0.0
+    var d4 = 0.0
+    for (tag <- tagsList) {
+      if (title.contains(tag)) {
+        d1 = d1 + 1.0
+        d2.add(tag)
+      }
+      val score = Similarity.conceptSimilarity(tag, title)
+      d3 = if (score > d3) score else d3
+      d4 = d4 + score
+    }
+    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4
+    (d1, d2.mkString(","), d3, d4)
+  }
+}