01_ad_model_update.sh 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #!/bin/sh
  2. set -x
  3. source /root/anaconda3/bin/activate py37
  4. sh_path=$(dirname $0)
  5. source ${sh_path}/00_common.sh
  6. export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
  7. export PATH=$SPARK_HOME/bin:$PATH
  8. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  9. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  10. # 全局常量
  11. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  12. TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4/
  13. BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4/
  14. MODEL_PATH=/dw/recommend/model/35_ad_model_test/
  15. PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data_test/
  16. TABLE=alg_recsys_ad_sample_all
  17. MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
  18. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  19. feature_file=20240703_ad_feature_name.txt
  20. # 线上模型名
  21. model_name=model_xgb_351_1000_v2_test
  22. # 训练用的数据路径
  23. train_data_path=""
  24. # 评估用的数据路径
  25. predict_date_path=""
  26. #评估结果保存路径
  27. predict_result_path=""
  28. # 模型保存路径
  29. model_save_path=""
  30. # 模型本地临时保存路径
  31. model_local_path=/root/zhaohp/XGB
  32. # 任务开始时间
  33. start_time=$(date +%s)
  34. # 校验命令的退出码
  35. check_run_status() {
  36. local status=$1
  37. local step_start_time=$2
  38. local step_name=$3
  39. local step_end_time=$(date +%s)
  40. local step_elapsed=$(($step_end_time - $step_start_time))
  41. if [ $status -ne 0 ]; then
  42. echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
  43. local elapsed=$(($step_end_time - $start_time))
  44. # /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  45. exit 1
  46. else
  47. echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
  48. fi
  49. }
  50. init() {
  51. predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
  52. model_save_path=${MODEL_PATH}/${model_name}_$(date -d +%Y%m%d)
  53. predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000
  54. local count=1
  55. local current_data="$(date -d '2 days ago' +%Y%m%d)"
  56. # 循环获取前 n 天的非节日日期
  57. while [[ $count -lt 8 ]]; do
  58. date_key=$(date -d "$current_data" +%Y%m%d)
  59. # 判断是否是节日,并拼接训练数据路径
  60. if [ $(is_not_holidays $date_key) -eq 1 ]; then
  61. if [[ -z ${train_data_path} ]]; then
  62. train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
  63. else
  64. train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
  65. fi
  66. count=$((count + 1))
  67. else
  68. echo "日期: ${date_key}是节日,跳过"
  69. fi
  70. current_data=$(date -d "$current_data -1 day" +%Y%m%d)
  71. done
  72. echo "train_data_path: ${train_data_path}"
  73. echo "predict_date_path: ${predict_date_path}"
  74. echo "predict_result_path: ${predict_result_path}"
  75. echo "model_save_path: ${model_save_path}"
  76. echo "feature_file: ${feature_file}"
  77. echo "model_name: ${model_name}"
  78. echo "model_local_path: ${model_local_path}"
  79. echo "model_oss_path: ${MODEL_OSS_PATH}"
  80. }
  81. # 校验大数据任务是否执行完成
  82. check_ad_hive() {
  83. local step_start_time=$(date +%s)
  84. local max_hour=05
  85. local max_minute=30
  86. local elapsed=0
  87. while true; do
  88. local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
  89. elapsed=$(($(date +%s) - $step_start_time))
  90. if [ "$python_return_code" -eq 0 ]; then
  91. break
  92. fi
  93. echo "Python程序返回非0值,等待五分钟后再次调用。"
  94. sleep 300
  95. local current_hour=$(date +%H)
  96. local current_minute=$(date +%M)
  97. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  98. local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
  99. echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
  100. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  101. exit 1
  102. fi
  103. done
  104. echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
  105. }
  106. xgb_train() {
  107. local step_start_time=$(date +%s)
  108. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  109. --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
  110. --master yarn --driver-memory 6G --executor-memory 9G --executor-cores 1 --num-executors 31 \
  111. --conf spark.yarn.executor.memoryoverhead=1000 \
  112. --conf spark.shuffle.service.enabled=true \
  113. --conf spark.shuffle.service.port=7337 \
  114. --conf spark.shuffle.consolidateFiles=true \
  115. --conf spark.shuffle.manager=sort \
  116. --conf spark.storage.memoryFraction=0.4 \
  117. --conf spark.shuffle.memoryFraction=0.5 \
  118. --conf spark.default.parallelism=200 \
  119. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  120. featureFile:20240703_ad_feature_name.txt \
  121. trainPath:${train_data_path} \
  122. testPath:${predict_date_path} \
  123. savePath:${predict_result_path} \
  124. modelPath:${model_save_path} \
  125. eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
  126. local return_code=$?
  127. check_run_status $return_code $step_start_time "XGB模型训练任务"
  128. }
  129. model_upload_oss() {
  130. cd ${model_local_path}
  131. $hadoop fs -get ${model_save_path} ./${model_name}
  132. if [ ! -d ./${model_name} ]; then
  133. echo "从HDFS下载模型失败"
  134. check_run_status 1 $step_start_time "XGB模型训练任务"
  135. exit 1
  136. fi
  137. tar -czvf ${model_name}.tar.gz -C ${model_name} .
  138. rm -rf .${model_name}.tar.gz.crc
  139. $hadoop fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz
  140. $hadoop fs -put ${model_name}.tar.gz ${MODEL_OSS_PATH}
  141. check_run_status $return_code $step_start_time "模型上传OSS"
  142. }
  143. model_predict() {
  144. local python_return_code=$(python ${sh_path}/model_predict_analyse.py)
  145. }
  146. # 主方法
  147. main() {
  148. # init
  149. # check_ad_hive
  150. # xgb_train
  151. # model_upload_oss
  152. model_predict
  153. }
  154. main