#!/bin/sh set -x # 0 全局变量/参数 originDataSavePath=/dw/recommend/model/31_ad_sample_data_auto/ bucketFeatureSavePath=/dw/recommend/model/33_ad_train_data_nosparse_auto/ model_name=model_lr0 today="$(date +%Y%m%d)" today_early_1="$(date -d '1 days ago' +%Y%m%d)" MODEL_PATH=/root/zhaohp/recommend-emr-dataprocess/model PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop FM_HOME=/root/sunmingze/alphaFM OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model/ max_hour=16 max_minute=00 # 1 判断依赖的数据表是否生产完成 source /root/anaconda3/bin/activate py37 while true; do python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${today} --hh 10) if [ $python_return_code -eq 0 ]; then echo "Python程序返回0,退出循环。" break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 current_hour=$(date +%H) current_minute=$(date +%M) if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then echo "最长等待时间已到,失败:${current_hour}-${current_minute}" msg="广告特征数据校验失败,大数据分区没有数据: ${today}10" /root/anaconda3/bin/python ad/ad_monitor_util.py ${msg} exit 1 fi done # 2 原始特征生成 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \ --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ tablePart:64 repartition:32 \ beginStr:${today_early_1}08 endStr:${today}10 \ savePath:${originDataSavePath} \ table:alg_recsys_ad_sample_all_new if [ $? -ne 0 ]; then echo "Spark原始样本生产任务执行失败" msg="广告特征数据生成失败,Spark原始样本生产任务执行失败" /root/anaconda3/bin/python ad/ad_monitor_util.py ${msg} exit 1 else echo "spark原始样本生产执行成功" fi # 3 特征分桶 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \ --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \ ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ beginStr:${today_early_1} endStr:${today} repartition:400 \ filterNames:XXXXX \ bucketFileName:20240620_ad_bucket_249_fix.txt \ readPath:${originDataSavePath} \ savePath:${bucketFeatureSavePath} if [ $? -ne 0 ]; then echo "Spark特征分桶处理任务执行失败" msg="广告特征分桶失败,Spark特征分桶处理任务执行失败" /root/anaconda3/bin/python ad/ad_monitor_util.py ${msg} exit 1 else echo "spark特征分桶处理执行成功" fi # 4 模型训练 $HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 1,1,0 -core 8 if [ $? -ne 0 ]; then echo "模型训练失败" /root/anaconda3/bin/python ad/ad_monitor_util.py "广告模型训练失败" exit 1 fi # 5 对比AUC # 5.1 校验今天10分区的数据是否生产完成 online_model=${MODEL_PATH}/model_online.txt $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | /root/sunmingze/alphaFM/bin/fm_predict -m ${MODEL_PATH}/${online_model} -dim 0 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_online.txt $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | /root/sunmingze/alphaFM/bin/fm_predict -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 0 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_new.txt # shellcheck disable=SC2006 online_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_online.txt | /root/sunmingze/AUC/AUC` new_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_new.txt | /root/sunmingze/AUC/AUC` if [ "$online_auc" \< "$new_auc" ]; then echo "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}" /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}" else echo "新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}" /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}" exit 1 fi # 6 模型格式转换 cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \ | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \ > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt # 7 模型文件上传OSS online_model_path=${OSS_PATH}/${model_name}.txt $HADOOP fs -test -e ${online_model_path} if [ $? -eq 0 ]; then echo "数据存在, 先删除。" $HADOOP fs -rm -r ${online_model_path} else echo "数据不存在" fi $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path} if [ $? -eq 0 ]; then echo "推荐模型文件至OSS成功" else echo "推荐模型文件至OSS失败" /root/anaconda3/bin/python ad/ad_monitor_util.py "推荐模型文件至OSS失败" exit 1 fi # 7.3 本地保存最新的线上使用的模型,用于下一次的AUC验证 cp ${MODEL_PATH}/${model_name}_${today_early_1}.txt ${MODEL_PATH}/model_online.txt