#!/bin/sh set -x export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8 export PATH=$SPARK_HOME/bin:$PATH export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf export JAVA_HOME=/usr/lib/jvm/java-1.8.0 source /root/anaconda3/bin/activate py37 # 全局常量 originDataSavePath=/dw/recommend/model/31_ad_sample_data_v4_auto bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v4_auto model_name=model_bkb8_v4 LAST_MODEL_HOME=/root/zhaohp/model_online MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop FM_HOME=/root/sunmingze/alphaFM today="$(date +%Y%m%d)" today_early_1="$(date -d '1 days ago' +%Y%m%d)" start_time=$(date +%s) elapsed=0 LOG_PREFIX=广告模型自动更新任务 # 训练和预测数据分区 train_begin_str='' train_end_str='' predict_begin_str='' predict_end_str='' # HDFS保存数据的目录 trainBucketFeaturePath=${bucketFeatureSavePathHome} predictBucketFeaturePath=${bucketFeatureSavePathHome} local_model_file_path=${MODEL_HOME}/${model_name}.txt local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt max_hour=21 max_minute=20 # 全局初始化 global_init() { # 获取当前小时,确定需要使用的数据分区范围 local current_hour="$(date +%H)" if [ $current_hour -le 06 ]; then train_begin_str=${today_early_1}08 train_end_str=${today_early_1}21 predict_begin_str=${today_early_1}22 predict_end_str=${today_early_1}23 trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt max_hour=08 elif [ $current_hour -ge 16 ]; then train_begin_str=${today_early_1}22 train_end_str=${today}13 predict_begin_str=${today}14 predict_end_str=${today}15 trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/train predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/predict local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt max_hour=21 else echo "当前时间段异常: 退出任务" exit 1 fi # 删除HDFS目录,保证本次任务运行时目录干净 $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath} $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath} echo "全局变量初始化化: " echo " train_begin_str=${train_begin_str}" echo " train_end_str=${train_end_str}" echo " predict_begin_str=${predict_begin_str}" echo " predict_end_str=${predict_end_str}" echo " originDataSavePath=${originDataSavePath}" echo " trainBucketFeaturePath=${trainBucketFeaturePath}" echo " predictBucketFeaturePath=${predictBucketFeaturePath}" echo " local_model_file_path=${local_model_file_path}" echo " local_change_model_file_path=${local_change_model_file_path}" echo " max_hour=${max_hour}" } # 校验命令的退出码 check_run_status() { local status=$1 local step_start_time=$2 local step_name=$3 local step_end_time=$(date +%s) local step_elapsed=$(($step_end_time - $step_start_time)) if [ $status -ne 0 ]; then echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed" local elapsed=$(($step_end_time - $start_time)) # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 else echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed" fi } # 校验大数据任务是否执行完成 check_ad_hive() { local step_start_time=$(date +%s) while true; do local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:8:10}) local step_end_time=$(date +%s) local elapsed=$(($step_end_time - $step_start_time)) if [ "$python_return_code" -eq 0 ]; then break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 local current_hour=$(date +%H) local current_minute=$(date +%M) if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then local msg="大数据数据生产校验失败, 分区: ${today}10" echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed" # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi done echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed" } # 原始特征生产 make_origin_data() { local step_start_time=$(date +%s) /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \ --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ tablePart:64 repartition:16 \ beginStr:${train_begin_str} endStr:${predict_end_str} \ savePath:${originDataSavePath} \ table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \ idDefaultValue:0.01 local return_code=$? check_run_status $return_code $step_start_time "Spark原始样本生产任务" } # 训练用数据分桶 make_train_bucket_feature() { /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \ --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ beginStr:${train_begin_str:0:8} endStr:${train_end_str:0:8} repartition:100 \ filterNames:adid_,targeting_conversion_ \ readPath:${originDataSavePath} \ savePath:${trainBucketFeaturePath} } # 预测用数据分桶 make_predict_bucket_feature() { /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \ --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ beginStr:${predict_begin_str:0:8} endStr:${predict_end_str:0:8} repartition:100 \ filterNames:adid_,targeting_conversion_ \ readPath:${originDataSavePath} \ savePath:${predictBucketFeaturePath} } # 特征分桶,训练用的数据和预测用的数据分不同的目录 make_bucket_feature() { local step_start_time=$(date +%s) # 训练用的数据 make_train_bucket_feature & train_bucket_pid=$! wait $train_bucket_pid local train_return_code=$? check_run_status $train_return_code $step_start_time "Spark特征分桶任务: 训练数据分桶" # 预测用的数据 make_predict_bucket_feature & predict_bucket_pid=$! wait $predict_bucket_pid local predict_return_code=$? check_run_status $predict_return_code $step_start_time "Spark特征分桶任务: 预测数据分桶" } # 模型训练 model_train() { local step_start_time=$(date +%s) $HADOOP fs -text ${trainBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_train -m ${local_model_file_path} -dim 1,1,8 -im ${LAST_MODEL_HOME}/model_online.txt -core 8 local return_code=$? check_run_status $return_code $step_start_time "模型训练" } # 计算线上模型的AUC calc_online_model_auc() { $HADOOP fs -text ${predictBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt online_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt | /root/sunmingze/AUC/AUC` } # 计算新模型AUC calc_new_model_auc() { $HADOOP fs -text ${predictBucketFeaturePath}/*/* | ${FM_HOME}/bin/fm_predict -m ${local_model_file_path} -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt new_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt | /root/sunmingze/AUC/AUC` } # AUC对比 auc_compare() { local step5_start_time=$(date +%s) # 5.1 计算线上模型的AUC local step_start_time=$(date +%s) calc_online_model_auc & local calc_online_model_auc_pid=$! wait $calc_online_model_auc_pid local return_code=$? check_run_status $return_code $step_start_time "线上模型AUC计算" # 5.2 计算新模型的AUC step_start_time=$(date +%s) calc_new_model_auc & local calc_new_model_auc_pid=$! wait $calc_new_model_auc_pid local new_return_code=$? check_run_status $new_return_code $step_start_time "新模型的AUC计算" echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}" # 5.3 计算新模型与线上模型的AUC差值的绝对值 auc_diff=$(echo "$online_auc - $new_auc" | bc -l) local auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l) local step_end_time=$(date +%s) local step5_elapsed=$(($step_end_time - $step5_start_time)) # 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型 if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then local msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}" echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed" elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then local msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs" echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed" else local msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff" echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed" local elapsed=$(($step_end_time - $start_time)) # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi } # 模型格式转换 model_to_online_format() { local step_start_time=$(date +%s) cat ${local_model_file_path} | awk -F " " '{ if (NR == 1) { print $1"\t"$2 } else { split($0, fields, " "); OFS="\t"; line="" for (i = 1; i <= 10 && i <= length(fields); i++) { line = (line ? line "\t" : "") fields[i]; } print line } }' > ${local_change_model_file_path} local return_code=$? check_run_status $return_code $step_start_time "模型格式转换" } # 模型文件上传OSS model_upload_oss() { local step_start_time=$(date +%s) local online_model_path=${OSS_PATH}/${model_name}.txt $HADOOP fs -test -e ${online_model_path} if [ $? -eq 0 ]; then echo "删除已存在的OSS模型文件" $HADOOP fs -rm -r -skipTrash ${online_model_path} fi $HADOOP fs -put ${local_change_model_file_path} ${online_model_path} local return_code=$? check_run_status $return_code $step_start_time "模型文件上传OSS" } # 模型文件本地备份 model_local_back() { local step_start_time=$(date +%s) # 将之前的线上模型进行备份,表示从上一个备份时间到当前时间内,使用的线上模型都是此文件 # 假设当前是07-11,上一次备份时间为07-07。备份之后表示从07-07下午至07-11上午线上使用的模型文件都是model_online_20240711.txt file_suffix=$(date "+%Y%m%d%H") cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_${file_suffix}.txt cp -f ${local_model_file_path} ${LAST_MODEL_HOME}/model_online.txt local return_code=$? check_run_status $return_code $step_start_time "模型备份" } # 任务完成通知 success_inform() { local step_end_time=$(date +%s) local msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path" echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed" local elapsed=$(($step_end_time - $start_time)) # /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed" } main() { global_init check_ad_hive make_origin_data make_bucket_feature # model_train # auc_compare # model_to_online_format # model_upload_oss # model_local_back # success_inform } main # nohup ./ad/02_ad_model_update_twice_daily.sh > logs/02_twice_daily.log 2>&1 &