#!/bin/sh set -ex # 0 全局变量/参数 samplePath=/dw/recommend/model/00_sample_data/ savePath=/dw/recommend/model/04_str_data/ model_name=model_str_mid today="$(date +%Y%m%d)" today_early_1="$(date -d '1 days ago' +%Y%m%d)" yesterday="$(date -d '1 days ago' +%Y%m%d)" #today=20240129 #today_early_1=20240128 #yesterday=20240128 HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop" FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train" MODEL_PATH="/root/zhangbo/recommend-emr-dataprocess/zhangbo/model/" OSS_PATH="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_str_model/" export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8 export PATH=$SPARK_HOME/bin:$PATH export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf export JAVA_HOME=/usr/lib/jvm/java-1.8.0 # 0 判断上游表是否生产完成,最长等待到12点 source /root/anaconda3/bin/activate py37 max_hour=11 max_minute=00 while true; do python_return_code=$(python utils.py --excute_program check_origin_hive --partition ${today_early_1}) if [ $python_return_code -eq 0 ]; then echo "Python程序返回0,退出循环。" break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 current_hour=$(date +%H) current_minute=$(date +%M) if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then echo "最长等待时间已到,失败:${current_hour}-${current_minute}" exit 1 fi done #conda deactivate # 1 生产数据 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.examples.makedata.makedata_06_originData \ --name every_day_origindata_${model_name}_${today} \ --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 16 \ /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ tablePart:32 savePath:${samplePath} beginStr:${today_early_1} endStr:${today_early_1} if [ $? -eq 1 ]; then echo "Spark原始样本生产任务执行失败" exit 1 else echo "spark原始样本生产执行成功" fi /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \ --class com.aliyun.odps.spark.examples.makedata.makedata_07_strData \ --name every_day_strdata_${model_name}_${today} \ --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 16 \ /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \ readPath:${samplePath} savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1} featureVersion:v4 ifRepart:100 if [ $? -eq 1 ]; then echo "Spark训练样本生产任务执行失败" exit 1 else echo "spark训练样本生产执行成功" fi # 2 加载上次模型 训练本轮数据 保存本轮模型 end_date=${today} loop_date=${yesterday} while [[ "$loop_date" != "$end_date" ]]; do echo -------train ${loop_date}---------- loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d) $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \ -dim 1,1,8 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt if [ $? -eq 1 ]; then echo "训练失败" exit 1 fi echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt---------- loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d) done # 3 本轮模型格式转换 cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \ | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \ > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt # 4 转换后模型上传oss online_model_path=${OSS_PATH}/${model_name}.txt $HADOOP fs -test -e ${online_model_path} if [ $? -eq 0 ]; then echo "数据存在, 先删除。" $HADOOP fs -rm -r ${online_model_path} else echo "数据不存在" fi $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path} #nohup sh 05_update_everyday_str.sh > p.log 2>&1 &