#!/bin/sh set -x source /root/anaconda3/bin/activate py37 sh_path=$(dirname $0) source ${sh_path}/00_common.sh export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8 export PATH=$SPARK_HOME/bin:$PATH export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf export JAVA_HOME=/usr/lib/jvm/java-1.8.0 # 全局常量 HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4/ BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4/ MODEL_PATH=/dw/recommend/model/35_ad_model_test/ PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data_test/ TABLE=alg_recsys_ad_sample_all MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/ today_early_1="$(date -d '1 days ago' +%Y%m%d)" feature_file=20240703_ad_feature_name.txt # 线上模型名 model_name=model_xgb_351_1000_v2_test # 训练用的数据路径 train_data_path="" # 评估用的数据路径 predict_date_path="" #评估结果保存路径 predict_result_path="" # 模型保存路径 model_save_path="" # 模型本地临时保存路径 model_local_path=/root/zhaohp/XGB # 任务开始时间 start_time=$(date +%s) # 校验命令的退出码 check_run_status() { local status=$1 local step_start_time=$2 local step_name=$3 local step_end_time=$(date +%s) local step_elapsed=$(($step_end_time - $step_start_time)) if [ $status -ne 0 ]; then echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed" local elapsed=$(($step_end_time - $start_time)) # /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 else echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed" fi } init() { predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1} model_save_path=${MODEL_PATH}/${model_name}_$(date -d +%Y%m%d) predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000 local count=1 local current_data="$(date -d '2 days ago' +%Y%m%d)" # 循环获取前 n 天的非节日日期 while [[ $count -lt 8 ]]; do date_key=$(date -d "$current_data" +%Y%m%d) # 判断是否是节日,并拼接训练数据路径 if [ $(is_not_holidays $date_key) -eq 1 ]; then if [[ -z ${train_data_path} ]]; then train_data_path="${BUCKET_FEATURE_PATH}/${date_key}" else train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}" fi count=$((count + 1)) else echo "日期: ${date_key}是节日,跳过" fi current_data=$(date -d "$current_data -1 day" +%Y%m%d) done echo "train_data_path: ${train_data_path}" echo "predict_date_path: ${predict_date_path}" echo "predict_result_path: ${predict_result_path}" echo "model_save_path: ${model_save_path}" echo "feature_file: ${feature_file}" echo "model_name: ${model_name}" echo "model_local_path: ${model_local_path}" echo "model_oss_path: ${MODEL_OSS_PATH}" } # 校验大数据任务是否执行完成 check_ad_hive() { local step_start_time=$(date +%s) local max_hour=05 local max_minute=30 local elapsed=0 while true; do local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23) elapsed=$(($(date +%s) - $step_start_time)) if [ "$python_return_code" -eq 0 ]; then break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 local current_hour=$(date +%H) local current_minute=$(date +%M) if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then local msg="大数据数据生产校验失败, 分区: ${today_early_1}" echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed" /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed" exit 1 fi done echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed" } xgb_train() { local step_start_time=$(date +%s) /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \ --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \ --master yarn --driver-memory 6G --executor-memory 9G --executor-cores 1 --num-executors 31 \ --conf spark.yarn.executor.memoryoverhead=1000 \ --conf spark.shuffle.service.enabled=true \ --conf spark.shuffle.service.port=7337 \ --conf spark.shuffle.consolidateFiles=true \ --conf spark.shuffle.manager=sort \ --conf spark.storage.memoryFraction=0.4 \ --conf spark.shuffle.memoryFraction=0.5 \ --conf spark.default.parallelism=200 \ /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \ featureFile:20240703_ad_feature_name.txt \ trainPath:${train_data_path} \ testPath:${predict_date_path} \ savePath:${predict_result_path} \ modelPath:${model_save_path} \ eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20 local return_code=$? check_run_status $return_code $step_start_time "XGB模型训练任务" } model_upload_oss() { cd ${model_local_path} $hadoop fs -get ${model_save_path} ./${model_name} if [ ! -d ./${model_name} ]; then echo "从HDFS下载模型失败" check_run_status 1 $step_start_time "XGB模型训练任务" exit 1 fi tar -czvf ${model_name}.tar.gz -C ${model_name} . rm -rf .${model_name}.tar.gz.crc $hadoop fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz $hadoop fs -put ${model_name}.tar.gz ${MODEL_OSS_PATH} check_run_status $return_code $step_start_time "模型上传OSS" } model_predict() { local python_return_code=$(python ${sh_path}/model_predict_analyse.py) } # 主方法 main() { # init # check_ad_hive # xgb_train # model_upload_oss model_predict } main