Browse Source

feat:模型更新加快

zhaohaipeng 10 months ago
parent
commit
3481624655
2 changed files with 310 additions and 2 deletions
  1. 2 2
      ad/01_ad_model_update_everyday.sh
  2. 308 0
      ad/02_ad_model_update_ twice_daily.sh

+ 2 - 2
ad/01_ad_model_update_everyday.sh

@@ -175,7 +175,7 @@ elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.7
     echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed"
 
 else
-    msg="新模型与线上模型差值大于等于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
+    msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff"
     echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed"
     elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
     /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
@@ -280,7 +280,7 @@ echo -e "$LOG_PREFIX -- 模型备份 -- 模型备份完成: 耗时 $step_elapsed
 
 # 9 任务完成通知
 step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
-msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc  \n\t - AUC差值: $auc_diff_abs \n\t - 模型上传路径: $online_model_path"
+msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc  \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path"
 echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed"
 elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
 /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed"

+ 308 - 0
ad/02_ad_model_update_ twice_daily.sh

@@ -0,0 +1,308 @@
+#!/bin/sh
+set -x
+
+
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+source /root/anaconda3/bin/activate py37
+
+# 全局常量
+originDataSavePath=/dw/recommend/model/31_ad_sample_data_v3_auto
+bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v3_auto
+model_name=model_bkb8_v3
+LAST_MODEL_HOME=/root/zhaohp/model_online
+MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model
+PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+FM_HOME=/root/sunmingze/alphaFM
+OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo
+
+today="$(date +%Y%m%d)"
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+
+start_time=$(date "+%Y-%m-%d %H:%M:%S")
+elapsed=0
+LOG_PREFIX=广告模型自动更新任务
+
+# 训练和预测数据分区
+train_begin_str=''
+train_end_str=''
+predict_begin_str=''
+predict_end_str=''
+
+# HDFS保存数据的目录
+trainBucketFeaturePath=${bucketFeatureSavePathHome}
+predictBucketFeaturePath=${bucketFeatureSavePathHome}
+
+local_model_file_path=${MODEL_HOME}/${model_name}.txt
+local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt
+
+# 全局初始化
+global_init() {
+    # 获取当前小时,确定需要使用的数据分区范围
+    local current_hour = $(date +%H)
+    if [ $current_hour -lt 08]; then
+        train_begin_str=${today_early_1}14
+        train_end_str=${today_early_1}21
+        predict_begin_str=${today_early_1}22
+        predict_end_str=${today_early_1}23
+
+        trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train
+        predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict
+
+        local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
+        local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
+
+    elif [ $current_hour -ge 20 ]; then 
+        train_begin_str=${today_early_1}22
+        train_end_str=${today}13
+        predict_begin_str=${today}14
+        predict_end_str=${today}15
+
+        local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
+        local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
+
+    fi
+
+    # 删除HDFS目录,保证本次任务运行时目录干净
+    $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath}
+    $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath}
+
+}
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+
+    local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
+    local step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time")))
+
+    if [ $status -ne 0 ]; then
+        echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
+        local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
+        /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+        exit 1
+    else
+        echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
+    fi
+}
+
+# 校验大数据任务是否执行完成
+check_ad_hive() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    while true; do
+        local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:9:10})
+
+        local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
+        local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time")))
+        if [ "$python_return_code" -eq 0 ]; then
+            break
+        fi
+        echo "Python程序返回非0值,等待五分钟后再次调用。"
+        sleep 300
+        local current_hour=$(date +%H)
+        local current_minute=$(date +%M)
+        if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+            local msg="大数据数据生产校验失败, 分区: ${today}10"
+            echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
+            /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+            exit 1
+        fi
+    done
+    echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
+
+}
+
+# 原始特征生产
+make_origin_data() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
+    --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    tablePart:64 repartition:16 \
+    beginStr:${train_begin_str} endStr:${predict_end_str} \
+    savePath:${originDataSavePath} \
+    table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
+    idDefaultValue:0.01
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "Spark原始样本生产任务"
+
+}
+
+# 特征分桶,训练用的数据和预测用的数据分不同的目录
+make_bucket_feature() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    # 训练用的数据
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
+    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    beginStr:${train_begin_str} endStr:${train_end_str} repartition:100 \
+    filterNames:adid_,targeting_conversion_ \
+    readPath:${originDataSavePath} \
+    savePath:${trainBucketFeaturePath}
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "Spark特征分桶任务: 训练数据分桶"
+    
+    # 预测用的数据
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
+    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    beginStr:${predict_begin_str} endStr:${predict_end_str} repartition:100 \
+    filterNames:adid_,targeting_conversion_ \
+    readPath:${originDataSavePath} \
+    savePath:${predictBucketFeaturePath}
+
+    return_code=$?
+    check_run_status $return_code $step_start_time "Spark特征分桶任务: 预测数据分桶"
+}
+
+# 模型训练
+model_train() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    $HADOOP fs -text ${trainBucketFeaturePath}/* | ${FM_HOME}/bin/fm_train -m ${local_model_file_path} -dim 1,1,8  -im ${LAST_MODEL_HOME}/model_online.txt -core 8
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "模型训练"
+}
+
+# AUC对比
+auc_compare() {
+    local step5_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+
+    # 5.1 计算线上模型的AUC
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    $HADOOP fs -text ${predictBucketFeaturePath}/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt
+    online_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt | /root/sunmingze/AUC/AUC`
+    
+    local return_code=$?
+    check_run_status $return_code $step_start_time "线上模型AUC计算"
+
+    # 5.2 计算新模型的AUC
+    step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    $HADOOP fs -text ${predictBucketFeaturePath}/* | ${FM_HOME}/bin/fm_predict -m ${local_model_file_path} -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt
+    new_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt | /root/sunmingze/AUC/AUC`
+
+    return_code=$?
+    check_run_status $return_code $step_start_time "新模型的AUC计算"
+
+    echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}"
+
+    # 5.3 计算新模型与线上模型的AUC差值的绝对值
+    auc_diff=$(echo "$online_auc - $new_auc" | bc -l)
+    local auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l)
+
+    local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
+    local step5_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step5_start_time")))
+    # 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型
+    if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then
+        local msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}"
+        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
+
+    elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then
+        local msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
+        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
+
+    else
+        local msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff"
+        echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
+        local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
+        /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+        exit 1
+    fi
+}
+
+# 模型格式转换
+model_to_online_format() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    cat ${local_model_file_path} |
+    awk -F " " '{
+        if (NR == 1) {
+            print $1"\t"$2
+        } else {
+            split($0, fields, " ");
+            OFS="\t";
+            line=""
+            for (i = 1; i <= 10 && i <= length(fields); i++) {
+                line = (line ? line "\t" : "") fields[i];
+            }
+            print line
+        }
+    }' > ${local_change_model_file_path}
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "模型格式转换"
+
+}
+
+# 模型文件上传OSS
+model_upload_oss() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    local online_model_path=${OSS_PATH}/${model_name}.txt
+    $HADOOP fs -test -e ${online_model_path}
+    if [ $? -eq 0 ]; then
+        echo "删除已存在的OSS模型文件"
+        $HADOOP fs -rm -r -skipTrash ${online_model_path}
+    fi
+    $HADOOP fs -put ${local_change_model_file_path} ${online_model_path}
+    
+    local return_code=$?
+    check_run_status $return_code $step_start_time "模型文件上传OSS"
+    
+}
+
+# 模型文件本地备份
+model_local_back() {
+    local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+    # 将之前的线上模型进行备份,表示从上一个备份时间到当前时间内,使用的线上模型都是此文件
+    # 假设当前是07-11,上一次备份时间为07-07。备份之后表示从07-07下午至07-11上午线上使用的模型文件都是model_online_20240711.txt
+    cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_${train_end_str}.txt
+    cp -f ${local_model_file_path} ${LAST_MODEL_HOME}/model_online.txt
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "模型备份"
+}
+
+# 任务完成通知
+success_inform() {
+    local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
+    local msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc  \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path"
+    echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed"
+    local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
+    /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+}
+
+main() {
+
+    global_init
+
+    check_ad_hive
+
+    make_origin_data
+
+    make_bucket_feature
+
+    model_train
+
+    auc_compare
+
+    model_to_online_format
+
+    model_upload_oss
+
+    model_local_back
+
+    success_inform
+}
+
+
+main