#!/bin/sh set -x export PATH=$SPARK_HOME/bin:$PATH export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf export JAVA_HOME=/usr/lib/jvm/java-1.8.0 sh_path=$(cd $(dirname $0); pwd) source ${sh_path}/00_common.sh source /root/anaconda3/bin/activate py37 # 全局常量 LOG_PREFIX=广告模型训练任务 HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4 BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4 TABLE=alg_recsys_ad_sample_all # 特征文件名 feature_file=20240703_ad_feature_name.txt # 模型本地临时保存路径 model_local_home=/root/zhaohp/XGB/ # 模型更新的天数 TRAIN_UPDATE_DAY_OF_WEEK=("星期一" "星期三" "星期五") # 模型HDFS保存路径,测试时修改为其他路径,避免影响线上 MODEL_PATH=/dw/recommend/model/35_ad_model # 预测结果保存路径,测试时修改为其他路径,避免影响线上 PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data # 模型OSS保存路径,测试时修改为其他路径,避免影响线上 MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/ # 线上模型名,测试时修改为其他模型名,避免影响线上 model_name=model_xgb_351_1000_v2 # 线上校准文件名 OSS_CALIBRATION_FILE_NAME=model_xgb_351_1000_v2_calibration # 用于存放一些临时的文件 PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache # 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上 model_path_file=${model_local_home}/online_model_path.txt # 获取当前是星期几,1表示星期一 current_day_of_week=$(get_day_of_week) # 任务开始时间 start_time=$(date +%s) # 前一天 today_early_1="$(date -d '1 days ago' +%Y%m%d)" # 线上模型在HDFS中的路径 online_model_path=`cat ${model_path_file}` # 训练用的数据路径 train_data_path="" # 评估用的数据路径 predict_date_path="" #评估结果保存路径 new_model_predict_result_path="" # 模型保存路径 model_save_path="" # 评测结果保存路径,后续需要根据此文件评估是否要更新模型 predict_analyse_file_path="" # 校准文件保存路径 calibration_file_path="" # 模型训练信息保存路径 model_train_info_file_path="" # 保存模型评估的分析结果 old_incr_rate_avg=0 new_incr_rate_avg=0 train_info_write_file() { if [ $# -ne 2 ]; then echo "训练信息写入文件失败,参数个数不等于0" exit 1 fi echo "${1}: ${2}" >> ${model_train_info_file_path} } init() { declare -a date_keys=() local count=1 local current_data="$(date -d '2 days ago' +%Y%m%d)" # 循环获取前 n 天的非节日日期 while [[ ${count} -le 7 ]]; do date_key=$(date -d "${current_data}" +%Y%m%d) # 判断是否是节日,并拼接训练数据路径 if [ $(is_not_holidays ${date_key}) -eq 1 ]; then # 将 date_key 放入数组 date_keys+=("${date_key}") if [[ -z ${train_data_path} ]]; then train_data_path="${BUCKET_FEATURE_PATH}/${date_key}" else train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}" fi count=$((count + 1)) else echo "日期: ${date_key}是节日,跳过" fi current_data=$(date -d "${current_data} -1 day" +%Y%m%d) done last_index=$((${#date_keys[@]} - 1)) train_first_day=${date_keys[$last_index]} train_last_day=${date_keys[0]} model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4} model_train_info_file_path=${model_local_home}/model_train_info/${model_name}_${train_first_day: -4}_${train_last_day: -4}.txt predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1} new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4} online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9} predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt echo "init param train_data_path: ${train_data_path}" echo "init param predict_date_path: ${predict_date_path}" echo "init param new_model_predict_result_path: ${new_model_predict_result_path}" echo "init param online_model_predict_result_path: ${online_model_predict_result_path}" echo "init param model_save_path: ${model_save_path}" echo "init param online_model_path: ${online_model_path}" echo "init param feature_file: ${feature_file}" echo "init param model_name: ${model_name}" echo "init param model_local_home: ${model_local_home}" echo "init param model_oss_path: ${MODEL_OSS_PATH}" echo "init param predict_analyse_file_path: ${predict_analyse_file_path}" echo "init param calibration_file_path: ${calibration_file_path}" echo "init param current_day_of_week: ${current_day_of_week}" echo "init param model_train_info_file_path: ${model_train_info_file_path}" echo "当前Python环境安装的Python版本: $(python --version)" echo "当前Python环境安装的三方包: $(python -m pip list)" train_info_write_file "模型名" ${model_name} train_info_write_file "开始时间" ${start_time} train_info_write_file "python版本" $(python --version) train_info_write_file "训练数据路径" ${train_data_path} train_info_write_file "验证数据路径" ${predict_date_path} train_info_write_file "新模型路径" ${model_save_path} train_info_write_file "新模型验证结果保存路径" ${new_model_predict_result_path} train_info_write_file "线上模型路径" ${online_model_path} train_info_write_file "线上模型验证结果保存路径" ${online_model_predict_result_path} train_info_write_file "模型验证结果分析文件路径" ${predict_analyse_file_path} train_info_write_file "是否测试" "是" } check_run_status_v2() { if [ $# -ne 4 ]; then echo "校验状态码函数异常: 参数个数不等于4" train_info_write_file "广告模型自动更新任务结果" "异常结束" train_info_write_file "广告模型自动更新失败原因" "校验状态码函数异常: 参数个数不等于4" exit 1 fi local status=$1 local step_name=$2 local step_start_time=$3 local failed_reason=$4 local step_end_time=$(date +%s) local step_elapsed=$(($step_end_time - $step_start_time)) train_info_write_file ${step_name}耗时 ${elapsed} if [ ${status} -ne 0 ]; then train_info_write_file ${step_name} "失败" train_info_write_file "广告模型自动更新任务结果" "失败" train_info_write_file "广告模型自动更新失败原因" "${failed_reason}" train_info_write_file "结束时间" ${step_end_time} /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --tif $model_train_info_file_path} exit 1 else train_info_write_file ${step_name} "成功" fi } # 校验大数据任务是否执行完成 check_ad_hive() { local step_start_time=$(date +%s) local max_hour=05 local max_minute=30 local elapsed=0 local result=0 while true; do local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23) elapsed=$(($(date +%s) - ${step_start_time})) if [ "${python_return_code}" -eq 0 ]; then break fi echo "Python程序返回非0值,等待五分钟后再次调用。" sleep 300 local current_hour=$(date +%H) local current_minute=$(date +%M) if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then result=1 break fi done check_run_status_v2 ${result} ${step_start_time} "大数据数据生产校验任务" "大数据数据生产校验失败, 分区: ${today_early_1},请检查!" echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 ${elapsed}" } origin_data() { ( source ${sh_path}/25_xgb_make_data_origin_bucket.sh make_origin_data_v2 ) } bucket_feature() { ( source ${sh_path}/25_xgb_make_data_origin_bucket.sh make_bucket_feature_v2 ) } xgb_train() { local step_start_time=$(date +%s) /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \ --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \ --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 31 \ --conf spark.yarn.executor.memoryoverhead=2048 \ --conf spark.shuffle.service.enabled=true \ --conf spark.shuffle.service.port=7337 \ --conf spark.shuffle.consolidateFiles=true \ --conf spark.shuffle.manager=sort \ --conf spark.storage.memoryFraction=0.4 \ --conf spark.shuffle.memoryFraction=0.5 \ --conf spark.default.parallelism=200 \ /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \ featureFile:20240703_ad_feature_name.txt \ trainPath:${train_data_path} \ testPath:${predict_date_path} \ savePath:${new_model_predict_result_path} \ modelPath:${model_save_path} \ eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20 local return_code=$? check_run_status_v2 ${return_code} "XGB模型训练任务" ${step_start_time} "XGB模型训练失败" } calc_model_predict() { local count=0 local max_line=10 local old_total_diff=0 local new_total_diff=0 while read -r line && [ ${count} -lt ${max_line} ]; do # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑 if [[ "${line}" == *"cid"* ]]; then continue fi read -a numbers <<< "${line}" old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l ) new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l ) old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l ) new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l ) count=$((${count} + 1)) done < "${predict_analyse_file_path}" local calc_top_code=$? old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l ) local old_rate_avg_code=$? new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l ) local new_rate_avg_code=$? if [ ${calc_top_code} -ne 0 ] || [ ${old_rate_avg_code} -ne 0 ] || [ ${new_rate_avg_code} -ne 0 ]; then check_run_status_v2 1 "计算新老模型Top10差异" ${step_start_time} "计算新老模型Top10差异异常" fi echo "老模型素材打分相对误差abs后的均值: ${old_incr_rate_avg}" echo "新模型素材打分相对误差abs后的均值: ${new_incr_rate_avg}" } calc_auc() { local old_auc=`cat ${PREDICT_CACHE_PATH}/old_1.txt | /root/sunmingze/AUC/AUC` train_info_write_file "老模型AUC" "${old_auc}" local new_auc=`cat ${PREDICT_CACHE_PATH}/new_1.txt | /root/sunmingze/AUC/AUC` train_info_write_file "新模型AUC" "${new_auc}" } model_predict() { # 线上模型评估最新的数据 local step_start_time=$(date +%s) /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \ --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \ --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 30 \ --conf spark.yarn.executor.memoryoverhead=1024 \ --conf spark.shuffle.service.enabled=true \ --conf spark.shuffle.service.port=7337 \ --conf spark.shuffle.consolidateFiles=true \ --conf spark.shuffle.manager=sort \ --conf spark.storage.memoryFraction=0.4 \ --conf spark.shuffle.memoryFraction=0.5 \ --conf spark.default.parallelism=200 \ /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \ featureFile:20240703_ad_feature_name.txt \ testPath:${predict_date_path} \ savePath:${online_model_predict_result_path} \ modelPath:${online_model_path} local return_code=$? # 结果分析 local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path}) if [ ${return_code} -ne 0 ] || [ ${python_return_code} -ne 0 ] || [ $? -ne 0 ]; then check_run_status_v2 1 "线上模型评估昨天数据任务" ${step_start_time} "线上模型评估昨天数据任务异常" else check_run_status_v2 0 "线上模型评估昨天数据任务" ${step_start_time} "线上模型评估昨天数据任务异常" fi } model_predict_analyse_check() { local step_start_time=$(date +%s) calc_model_predict calc_auc if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then check_run_status_v2 1 "判断模型素材打分相对误差abs后的均值" ${step_start_time} "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查" exit 1 fi # 对比两个模型的差异 score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l ) if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查" check_run_status_v2 1 "计算新老模型相对误差abs后的均值差异" ${step_start_time} "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05" exit 1 fi } model_upload_oss() { local step_start_time=$(date +%s) ( cd ${model_local_home} ${HADOOP} fs -get ${model_save_path} ${model_name} if [ ! -d ${model_name} ]; then echo "从HDFS下载模型失败" check_run_status_v2 1 "HDFS下载模型任务" ${step_start_time} "HDFS下载模型失败" exit 1 fi tar -czvf ${model_name}.tar.gz -C ${model_name} . rm -rf ${model_name}.tar.gz.crc # 从OSS中移除模型文件和校准文件 ${HADOOP} fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz ${MODEL_OSS_PATH}/${OSS_CALIBRATION_FILE_NAME}.txt # 将模型文件和校准文件推送到OSS上 ${HADOOP} fs -put ${model_name}.tar.gz ${OSS_CALIBRATION_FILE_NAME}.txt ${MODEL_OSS_PATH} local return_code=$? check_run_status_v2 ${return_code} "模型上传OSS任务" ${step_start_time} "模型上传OSS失败" echo ${model_save_path} > ${model_path_file} # rm -f ./${model_name}.tar.gz rm -rf ./${model_name} rm -rf ${OSS_CALIBRATION_FILE_NAME}.txt ) local return_code=$? check_run_status_v2 ${return_code} "模型上传OSS任务" ${step_start_time} "模型上传OSS失败" local step_end_time=$(date +%s) local elapsed=$((${step_end_time} - ${start_time})) echo -e "${LOG_PREFIX} -- 模型更新完成 -- 模型更新成功: 耗时 ${elapsed}" train_info_write_file "结束时间" $(date +%s) train_info_write_file "广告模型自动更新任务结果" "完成" } # 主方法 main() { init check_ad_hive origin_data bucket_feature if [[ ${TRAIN_UPDATE_DAY_OF_WEEK[@]} =~ ${current_day_of_week} ]]; then echo "当前${current_day_of_week},开始训练并更新模型" xgb_train model_predict model_predict_analyse_check model_upload_oss else echo "当前是周一,周三或周五,不更新模型" fi } main