Selaa lähdekoodia

feat:添加广告补数据脚本

zhaohaipeng 5 kuukautta sitten
vanhempi
commit
d30fcc96a8
4 muutettua tiedostoa jossa 145 lisäystä ja 103 poistoa
  1. 30 29
      ad/01_ad_model_update.sh
  2. 99 0
      ad/24_supplementary_data.sh
  3. 0 70
      ad/25_ad_data_make.sh
  4. 16 4
      ad/25_xgb_make_data_origin_bucket.sh

+ 30 - 29
ad/01_ad_model_update.sh

@@ -12,6 +12,7 @@ source /root/anaconda3/bin/activate py37
 
 
 # 全局常量
+LOG_PREFIX=广告模型训练任务
 HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
 TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
 BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
@@ -77,13 +78,13 @@ check_run_status() {
     local step_end_time=$(date +%s)
     local step_elapsed=$(($step_end_time - $step_start_time))
 
-    if [ $status -ne 0 ]; then
-        echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
+    if [ ${status} -ne 0 ]; then
+        echo "${LOG_PREFIX} -- ${step_name}失败: 耗时 ${step_elapsed}"
         local elapsed=$(($step_end_time - $start_time))
-        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" --top10 "${top10_msg}"
+        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
         exit 1
     else
-        echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
+        echo "${LOG_PREFIX} -- ${step_name}成功: 耗时 ${step_elapsed}"
     fi
 }
 
@@ -96,7 +97,7 @@ send_success_upload_msg(){
   msg+="\n\t - 模型上传OSS中的路径: ${MODEL_OSS_PATH}/${model_name}.tar.gz"
 
   local step_end_time=$(date +%s)
-  local elapsed=$(($step_end_time - $start_time))
+  local elapsed=$((${step_end_time} - ${start_time}))
 
   /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
 }
@@ -107,13 +108,13 @@ init() {
   local count=1
   local current_data="$(date -d '2 days ago' +%Y%m%d)"
   # 循环获取前 n 天的非节日日期
-  while [[ $count -le 7 ]]; do
-    date_key=$(date -d "$current_data" +%Y%m%d)
+  while [[ ${count} -le 7 ]]; do
+    date_key=$(date -d "${current_data}" +%Y%m%d)
     # 判断是否是节日,并拼接训练数据路径
-    if [ $(is_not_holidays $date_key) -eq 1 ]; then
+    if [ $(is_not_holidays ${date_key}) -eq 1 ]; then
 
       # 将 date_key 放入数组
-      date_keys+=("$date_key")
+      date_keys+=("${date_key}")
 
       if [[ -z ${train_data_path} ]]; then
         train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
@@ -124,7 +125,7 @@ init() {
     else
       echo "日期: ${date_key}是节日,跳过"
     fi
-    current_data=$(date -d "$current_data -1 day" +%Y%m%d)
+    current_data=$(date -d "${current_data} -1 day" +%Y%m%d)
   done
 
   last_index=$((${#date_keys[@]} - 1))
@@ -165,22 +166,22 @@ check_ad_hive() {
   while true; do
       local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
 
-      elapsed=$(($(date +%s) - $step_start_time))
-      if [ "$python_return_code" -eq 0 ]; then
+      elapsed=$(($(date +%s) - ${step_start_time}))
+      if [ "${python_return_code}" -eq 0 ]; then
           break
       fi
       echo "Python程序返回非0值,等待五分钟后再次调用。"
       sleep 300
       local current_hour=$(date +%H)
       local current_minute=$(date +%M)
-      if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+      if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then
           local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
-          echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
-          /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+          echo -e "${LOG_PREFIX} -- 大数据数据生产校验 -- ${msg}: 耗时 ${elapsed}"
+          /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}"
           exit 1
       fi
   done
-  echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
+  echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 ${elapsed}"
 }
 
 origin_data() {
@@ -220,7 +221,7 @@ xgb_train() {
   eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
 
   local return_code=$?
-  check_run_status $return_code $step_start_time "XGB模型训练任务" "XGB模型训练失败"
+  check_run_status ${return_code} ${step_start_time} "XGB模型训练任务" "XGB模型训练失败"
 }
 
 calc_model_predict() {
@@ -259,14 +260,14 @@ calc_model_predict() {
   done < "${predict_analyse_file_path}"
 
   local return_code=$?
-  check_run_status $return_code $step_start_time "计算Top10差异" "计算Top10差异异常"
+  check_run_status ${return_code} ${step_start_time} "计算Top10差异" "计算Top10差异异常"
 
   old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
-  check_run_status $? $step_start_time "计算老模型Top10差异" "计算老模型Top10差异异常"
+  check_run_status $? ${step_start_time} "计算老模型Top10差异" "计算老模型Top10差异异常"
 
 
   new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
-  check_run_status $? $step_start_time "计算新模型Top10差异" "计算新模型Top10差异异常"
+  check_run_status $? ${step_start_time} "计算新模型Top10差异" "计算新模型Top10差异异常"
 
   echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
   echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
@@ -298,17 +299,17 @@ model_predict() {
   modelPath:${online_model_path}
 
   local return_code=$?
-  check_run_status $return_code $step_start_time "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
+  check_run_status ${return_code} ${step_start_time} "线上模型评估${predict_date_path: -8}的数据" "线上模型评估${predict_date_path: -8}的数据失败"
 
   # 结果分析
   local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
-  check_run_status $python_return_code $step_start_time "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
+  check_run_status ${python_return_code} ${step_start_time} "分析线上模型评估${predict_date_path: -8}的数据" "分析线上模型评估${predict_date_path: -8}的数据失败"
 
   calc_model_predict
 
   if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then 
     echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
-    check_run_status 1 $step_start_time "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
+    check_run_status 1 ${step_start_time} "${predict_date_path: -8}的数据,绝对误差大于0.1" "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
     exit 1
   fi 
 
@@ -317,7 +318,7 @@ model_predict() {
   score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
   if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then 
     echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
-    check_run_status 1 $step_start_time "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
+    check_run_status 1 ${step_start_time} "两个模型评估${predict_date_path: -8}的数据" "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
     exit 1
   fi 
 
@@ -332,7 +333,7 @@ model_upload_oss() {
     ${HADOOP} fs -get ${model_save_path} ${model_name}
     if [ ! -d ${model_name} ]; then
       echo "从HDFS下载模型失败"
-      check_run_status 1 $step_start_time "HDFS下载模型任务" "HDFS下载模型失败" 
+      check_run_status 1 ${step_start_time} "HDFS下载模型任务" "HDFS下载模型失败" 
       exit 1 
     fi
 
@@ -346,7 +347,7 @@ model_upload_oss() {
     # 将模型文件和校准文件推送到OSS上
     ${HADOOP} fs -put ${model_name}.tar.gz ${OSS_CALIBRATION_FILE_NAME}.txt ${MODEL_OSS_PATH}
     local return_code=$?
-    check_run_status $return_code $step_start_time "模型上传OSS任务" "模型上传OSS失败"
+    check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
 
     echo ${model_save_path} > ${model_path_file}
 
@@ -357,11 +358,11 @@ model_upload_oss() {
   )
 
   local return_code=$?
-  check_run_status $return_code $step_start_time "模型上传OSS任务" "模型上传OSS失败"
+  check_run_status ${return_code} ${step_start_time} "模型上传OSS任务" "模型上传OSS失败"
 
   local step_end_time=$(date +%s)
-  local elapsed=$(($step_end_time - $start_time))
-  echo -e "$LOG_PREFIX -- 模型更新完成 -- 模型更新成功: 耗时 $elapsed"
+  local elapsed=$((${step_end_time} - ${start_time}))
+  echo -e "${LOG_PREFIX} -- 模型更新完成 -- 模型更新成功: 耗时 ${elapsed}"
   
   send_success_upload_msg
 }

+ 99 - 0
ad/24_supplementary_data.sh

@@ -0,0 +1,99 @@
+#!/bin/sh
+set -x
+
+# 广告补数据脚本,修改{today_early_1}补单天的数据
+
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+sh_path=$(cd $(dirname $0); pwd)
+source ${sh_path}/00_common.sh
+
+source /root/anaconda3/bin/activate py37
+
+
+# 全局常量
+LOG_PREFIX=广告模型训练任务
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
+BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
+TABLE=alg_recsys_ad_sample_all
+
+# 任务开始时间
+start_time=$(date +%s)
+# 前一天
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+    local msg=$4
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$((${step_end_time} - ${step_start_time}))
+
+    if [ ${status} -ne 0 ]; then
+        echo "${LOG_PREFIX} -- ${step_name}失败: 耗时 ${step_elapsed}"
+        local elapsed=$((${step_end_time} - ${start_time}))
+        /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}" --top10 "${top10_msg}"
+        exit 1
+    else
+        echo "${LOG_PREFIX} -- ${step_name}成功: 耗时 ${step_elapsed}"
+    fi
+}
+
+# 校验大数据任务是否执行完成
+check_ad_hive() {
+  local step_start_time=$(date +%s)
+  local max_hour=05
+  local max_minute=30
+  local elapsed=0
+  while true; do
+      local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
+
+      elapsed=$(($(date +%s) - ${step_start_time}))
+      if [ "${python_return_code}" -eq 0 ]; then
+          break
+      fi
+      echo "Python程序返回非0值,等待五分钟后再次调用。"
+      sleep 300
+      local current_hour=$(date +%H)
+      local current_minute=$(date +%M)
+      if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then
+          local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
+          echo -e "${LOG_PREFIX} -- 大数据数据生产校验 -- ${msg}: 耗时 ${elapsed}"
+          /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}"
+          exit 1
+      fi
+  done
+  echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
+}
+
+origin_data() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_origin_data
+  )
+}
+
+bucket_feature() {
+  (
+    source ${sh_path}/25_xgb_make_data_origin_bucket.sh
+    make_bucket_feature
+  )
+}
+
+# 主方法
+main() {
+  check_ad_hive
+
+  origin_data
+
+  bucket_feature
+}
+
+
+main

+ 0 - 70
ad/25_ad_data_make.sh

@@ -1,70 +0,0 @@
-#!/bin/sh
-
-# 广告数据生产
-
-set -x 
-
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-source /root/anaconda3/bin/activate py37
-
-# 全局常量
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-FM_HOME=/root/sunmingze/alphaFM
-
-TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
-BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
-TABLE=alg_recsys_ad_sample_all
-
-today="$(date +%Y%m%d)"
-today_early_1="$(date -d '1 days ago' +%Y%m%d)"
-
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${today_early_1}00 endStr:${today_early_1}12 \
-savePath:${TRAIN_PATH} \
-table:${TABLE} \
-filterHours:00,01,02,03,04,05,06,07 \
-idDefaultValue:0.1 &
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${today_early_1}13 endStr:${today_early_1}18 \
-savePath:${TRAIN_PATH} \
-table:${TABLE} \
-filterHours:00,01,02,03,04,05,06,07 \
-idDefaultValue:0.1 &
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${today_early_1}19 endStr:${today_early_1}23 \
-savePath:${TRAIN_PATH} \
-table:${TABLE} \
-filterHours:00,01,02,03,04,05,06,07 \
-idDefaultValue:0.1 &
-
-wait
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketData_20240718 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-beginStr:${today_early_1} endStr:${today_early_1} repartition:100 \
-filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
-readPath:${TRAIN_PATH} \
-savePath:${BUCKET_FEATURE_PATH}
-

+ 16 - 4
ad/25_xgb_make_data_origin_bucket.sh

@@ -26,6 +26,7 @@ make_origin_data() {
   table:${TABLE} \
   filterHours:00,01,02,03,04,05,06,07 \
   idDefaultValue:0.1 &
+  local task1=$!
 
   /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
   --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
@@ -37,6 +38,7 @@ make_origin_data() {
   table:${TABLE} \
   filterHours:00,01,02,03,04,05,06,07 \
   idDefaultValue:0.1 &
+  local task2=$!
 
   /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
   --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
@@ -48,11 +50,21 @@ make_origin_data() {
   table:${TABLE} \
   filterHours:00,01,02,03,04,05,06,07 \
   idDefaultValue:0.1 &
+  local task3=$!
 
-  wait
+  wait ${task1}
+  local task1_return_code=$?
 
-  local return_code=$?
-  check_run_status $return_code $step_start_time "spark原始样本生产任务"
+  wait ${task2}
+  local task2_return_code=$?
+
+  wait ${task3}
+  local task3_return_code=$?
+
+
+  check_run_status ${task1_return_code} ${step_start_time} "spark原始样本生产任务: 生产00~12数据异常"
+  check_run_status ${task2_return_code} ${step_start_time} "spark原始样本生产任务: 生产13~18数据异常"
+  check_run_status ${task3_return_code} ${step_start_time} "spark原始样本生产任务: 生产19~23数据异常"
 }
 
 
@@ -71,5 +83,5 @@ make_bucket_feature() {
   savePath:${BUCKET_FEATURE_PATH}
 
   local return_code=$?
-  check_run_status $return_code $step_start_time "spark特征分桶任务"
+  check_run_status ${return_code} ${step_start_time} "spark特征分桶任务"
 }