#!/bin/sh set -x # 0 全局变量/参数 originDataSavePath=/dw/recommend/model/31_ad_sample_data_v3_auto bucketFeatureSavePath=/dw/recommend/model/33_ad_train_data_v3_auto model_name=model_bkb8_v3 today="$(date +%Y%m%d)" today_early_1="$(date -d '1 days ago' +%Y%m%d)" today_early_2="$(date -d '2 days ago' +%Y%m%d)" LAST_MODEL_HOME=/root/zhaohp/model_online MODEL_PATH=/root/zhaohp/recommend-emr-dataprocess/model PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop FM_HOME=/root/sunmingze/alphaFM OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo max_hour=17 max_minute=00 export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8 export PATH=$SPARK_HOME/bin:$PATH export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf export JAVA_HOME=/usr/lib/jvm/java-1.8.0 start_time=$(date "+%Y-%m-%d %H:%M:%S") elapsed=0 LOG_PREFIX=广告模型自动更新任务 # 1 判断依赖的数据表是否生产完成 source /root/anaconda3/bin/activate py37 while true; do python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${today} --hh 10) step_end_time=$(date "+%Y-%m-%d %H:%M:%S") elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) if [ "$python_return_code" -eq 0 ]; then break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 current_hour=$(date +%H) current_minute=$(date +%M) if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then msg="大数据数据生产校验失败, 分区: ${today}10" echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed" /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi done echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed" # 2 原始特征生成 step_start_time=$(date "+%Y-%m-%d %H:%M:%S") /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \ --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ tablePart:64 repartition:16 \ beginStr:${today_early_1}00 endStr:${today}10 \ savePath:${originDataSavePath} \ table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \ idDefaultValue:0.01 step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="Spark原始样本生产任务执行失败" echo "$LOG_PREFIX -- 原始样本生产 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo "$LOG_PREFIX -- 原始样本生产 -- Spark原始样本生产任务执行成功: 耗时 $step_elapsed" # 3 特征分桶 step_start_time=$(date "+%Y-%m-%d %H:%M:%S") /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \ --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ beginStr:${today_early_1} endStr:${today} repartition:100 \ filterNames:adid_,targeting_conversion_ \ readPath:${originDataSavePath} \ savePath:${bucketFeatureSavePath} step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="Spark特征分桶处理任务执行失败" echo "$LOG_PREFIX -- 特征分桶处理任务 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py ${msg} exit 1 fi echo "$LOG_PREFIX -- 特征分桶处理任务 -- spark特征分桶处理执行成功: 耗时 $step_elapsed" # 4 模型训练 step_start_time=$(date "+%Y-%m-%d %H:%M:%S") $HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | ${FM_HOME}/bin/fm_train -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 1,1,8 -im ${LAST_MODEL_HOME}/model_online.txt -core 8 step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg "模型训练失败" echo "$LOG_PREFIX -- 原始样本生产 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo "$LOG_PREFIX -- 原始样本生产 -- 模型训练完成: 耗时 $step_elapsed" # 5 对比AUC step5_start_time=$(date "+%Y-%m-%d %H:%M:%S") # 5.1 计算线上模型的AUC step_start_time=$(date "+%Y-%m-%d %H:%M:%S") $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_online.txt online_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_online.txt | /root/sunmingze/AUC/AUC` step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="线上模型AUC计算失败" echo "$LOG_PREFIX -- 线上模型AUC计算 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo "$LOG_PREFIX -- 线上模型AUC计算 -- 线上模型AUC计算完成: 耗时 $step_elapsed" # 5.2 计算新模型的AUC step_start_time=$(date "+%Y-%m-%d %H:%M:%S") $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | ${FM_HOME}/bin/fm_predict -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_new.txt new_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_new.txt | /root/sunmingze/AUC/AUC` step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="新模型AUC计算失败" echo "$LOG_PREFIX -- 新模型AUC计算 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo "$LOG_PREFIX -- 新模型AUC计算 -- 新模型AUC计算完成: 耗时 $step_elapsed" echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}" # 5.3 计算新模型与线上模型的AUC差值的绝对值 auc_diff=$(echo "$online_auc - $new_auc" | bc -l) auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l) step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step5_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step5_start_time"))) # 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型 if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}" echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed" elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs" echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed" else msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff" echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi # 5.5 使用前一天线上模型和前一天的新模型对前一天的数据进行预测并计算AUC yesterday_online_model=${LAST_MODEL_HOME}/model_online.txt # 5.5.1 判断model_online文件的生成时间,如果是昨天生成的则表示模型有更新 # ${MODEL_PATH}/${model_name}_${today_early_1}.txt 和 ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt file_creation_date=$(stat -c %Y "$yesterday_online_model") file_creation_date_format=$(date -d "@$file_creation_date" +%Y%m%d) if [ "$file_creation_date_format" == "$today_early_1" ]; then yesterday_online_model=${LAST_MODEL_HOME}/model_online_${today_early_1}.txt fi # 5.5.2 使用昨天的线上模型,进行预测 echo "前一天的线上模型路径: $yesterday_online_model" $HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | ${FM_HOME}/bin/fm_predict -m "$yesterday_online_model" -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today_early_1}_online_all.txt yesterday_online_auc=`cat ${PREDICT_PATH}/${model_name}_${today_early_1}_online_all.txt | /root/sunmingze/AUC/AUC` # 5.5.3 使用昨天的新模型,进行预测 $HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | ${FM_HOME}/bin/fm_predict -m ${MODEL_PATH}/${model_name}_${today_early_2}.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today_early_1}_new_all.txt yesterday_new_auc=`cat ${PREDICT_PATH}/${model_name}_${today_early_1}_new_all.txt | /root/sunmingze/AUC/AUC` # 6 模型格式转换 step_start_time=$(date "+%Y-%m-%d %H:%M:%S") change_txt_path=${MODEL_PATH}/${model_name}_${today_early_1}_change.txt cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt | awk -F " " '{ if (NR == 1) { print $1"\t"$2 } else { split($0, fields, " "); OFS="\t"; line="" for (i = 1; i <= 10 && i <= length(fields); i++) { line = (line ? line "\t" : "") fields[i]; } print line } }' > "$change_txt_path" step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="新模型文件格式转换失败" echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo -e "$LOG_PREFIX -- 模型文件格式转换 -- 转换后的路径为 [$change_txt_path]: 耗时 $step_elapsed" # 7 模型文件上传OSS step_start_time=$(date "+%Y-%m-%d %H:%M:%S") online_model_path=${OSS_PATH}/${model_name}.txt $HADOOP fs -test -e ${online_model_path} if [ $? -eq 0 ]; then echo "删除已存在的OSS模型文件" $HADOOP fs -rm -r -skipTrash ${online_model_path} fi $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path} step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="广告模型文件至OSS失败, OSS模型文件路径: $online_model_path" echo -e "$LOG_PREFIX -- 模型文件推送至OSS -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo -e "$LOG_PREFIX -- 模型文件推送至OSS -- 广告模型文件至OSS成功, OSS模型文件路径 $online_model_path: 耗时 $step_elapsed" # 8 本地保存最新的线上使用的模型,用于下一次的AUC验证 step_start_time=$(date "+%Y-%m-%d %H:%M:%S") # 将之前的线上模型进行备份,表示从上一个备份时间到当前时间内,使用的线上模型都是此文件 # 假设当前是07-11,上一次备份时间为07-07。备份之后表示从07-07下午至07-11上午线上使用的模型文件都是model_online_20240711.txt cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_${today}.txt cp -f ${MODEL_PATH}/${model_name}_${today_early_1}.txt ${LAST_MODEL_HOME}/model_online.txt step_end_time=$(date "+%Y-%m-%d %H:%M:%S") step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time"))) if [ $? -ne 0 ]; then msg="模型备份失败" echo -e "$LOG_PREFIX -- 模型备份 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi echo -e "$LOG_PREFIX -- 模型备份 -- 模型备份完成: 耗时 $step_elapsed" # 9 任务完成通知 step_end_time=$(date "+%Y-%m-%d %H:%M:%S") msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path" echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed" elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time"))) /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed" # 15 15 * * * cd /root/zhaohp/recommend-emr-dataprocess && /bin/sh ./ad/01_ad_model_update_everyday.sh > logs/01_update_eventday_$(date +\%Y-\%m-\%d_\%H-\%M).log 2>&1