01_ad_model_update.sh 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. #!/bin/sh
  2. set -x
  3. export PATH=$SPARK_HOME/bin:$PATH
  4. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  5. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  6. export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
  7. sh_path=$(dirname $0)
  8. source ${sh_path}/00_common.sh
  9. source /root/anaconda3/bin/activate py37
  10. # 全局常量
  11. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  12. TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4_test
  13. BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4_test
  14. MODEL_PATH=/dw/recommend/model/35_ad_model_test
  15. PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data_test
  16. TABLE=alg_recsys_ad_sample_all
  17. # 特征文件名
  18. feature_file=20240703_ad_feature_name.txt
  19. # 模型OSS保存路径,测试时修改为其他路径,避免影响线上
  20. MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/model/
  21. # 线上模型名,测试时修改为其他模型名,避免影响线上
  22. model_name=model_xgb_351_1000_v2_test
  23. # 本地保存HDFS模型路径文件
  24. model_path_file=/root/zhaohp/XGB/online_model_path_test.txt
  25. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  26. # 训练用的数据路径
  27. train_data_path=""
  28. # 评估用的数据路径
  29. predict_date_path=""
  30. #评估结果保存路径
  31. new_model_predict_result_path=""
  32. # 模型保存路径
  33. model_save_path=""
  34. # 模型本地临时保存路径
  35. model_local_path=/root/zhaohp/XGB
  36. # 任务开始时间
  37. start_time=$(date +%s)
  38. # 线上模型在HDFS中的路径
  39. online_model_path=`cat ${model_path_file}`
  40. # 校验命令的退出码
  41. check_run_status() {
  42. local status=$1
  43. local step_start_time=$2
  44. local step_name=$3
  45. local step_end_time=$(date +%s)
  46. local step_elapsed=$(($step_end_time - $step_start_time))
  47. if [ $status -ne 0 ]; then
  48. echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
  49. local elapsed=$(($step_end_time - $start_time))
  50. # /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  51. exit 1
  52. else
  53. echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
  54. fi
  55. }
  56. init() {
  57. declare -a date_keys=()
  58. local count=1
  59. local current_data="$(date -d '2 days ago' +%Y%m%d)"
  60. # 循环获取前 n 天的非节日日期
  61. while [[ $count -lt 7 ]]; do
  62. date_key=$(date -d "$current_data" +%Y%m%d)
  63. # 判断是否是节日,并拼接训练数据路径
  64. if [ $(is_not_holidays $date_key) -eq 1 ]; then
  65. # 将 date_key 放入数组
  66. date_keys+=("$date_key")
  67. if [[ -z ${train_data_path} ]]; then
  68. train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
  69. else
  70. train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
  71. fi
  72. count=$((count + 1))
  73. else
  74. echo "日期: ${date_key}是节日,跳过"
  75. fi
  76. current_data=$(date -d "$current_data -1 day" +%Y%m%d)
  77. done
  78. last_index=$((${#date_keys[@]} - 1))
  79. train_first_day=${date_keys[$last_index]}
  80. train_last_day=${date_keys[0]}
  81. model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
  82. predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
  83. new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4}
  84. online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9}
  85. echo "init param train_data_path: ${train_data_path}"
  86. echo "init param predict_date_path: ${predict_date_path}"
  87. echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
  88. echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
  89. echo "init param model_save_path: ${model_save_path}"
  90. echo "init param online_model_path: ${online_model_path}"
  91. echo "init param feature_file: ${feature_file}"
  92. echo "init param model_name: ${model_name}"
  93. echo "init param model_local_path: ${model_local_path}"
  94. echo "init param model_oss_path: ${MODEL_OSS_PATH}"
  95. echo "当前Python环境安装的Python版本: $(python --version)"
  96. echo "当前Python环境安装的三方包: $(python -m pip list)"
  97. }
  98. # 校验大数据任务是否执行完成
  99. check_ad_hive() {
  100. local step_start_time=$(date +%s)
  101. local max_hour=05
  102. local max_minute=30
  103. local elapsed=0
  104. while true; do
  105. local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
  106. elapsed=$(($(date +%s) - $step_start_time))
  107. if [ "$python_return_code" -eq 0 ]; then
  108. break
  109. fi
  110. echo "Python程序返回非0值,等待五分钟后再次调用。"
  111. sleep 300
  112. local current_hour=$(date +%H)
  113. local current_minute=$(date +%M)
  114. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  115. local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
  116. echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
  117. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  118. exit 1
  119. fi
  120. done
  121. echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
  122. }
  123. make_origin_data() {
  124. local step_start_time=$(date +%s)
  125. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  126. --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
  127. --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
  128. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  129. tablePart:64 repartition:32 \
  130. beginStr:${today_early_1}00 endStr:${today_early_1}12 \
  131. savePath:${TRAIN_PATH} \
  132. table:${TABLE} \
  133. filterHours:00,01,02,03,04,05,06,07 \
  134. idDefaultValue:0.1 &
  135. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  136. --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
  137. --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
  138. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  139. tablePart:64 repartition:32 \
  140. beginStr:${today_early_1}13 endStr:${today_early_1}18 \
  141. savePath:${TRAIN_PATH} \
  142. table:${TABLE} \
  143. filterHours:00,01,02,03,04,05,06,07 \
  144. idDefaultValue:0.1 &
  145. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  146. --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718 \
  147. --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
  148. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  149. tablePart:64 repartition:32 \
  150. beginStr:${today_early_1}19 endStr:${today_early_1}23 \
  151. savePath:${TRAIN_PATH} \
  152. table:${TABLE} \
  153. filterHours:00,01,02,03,04,05,06,07 \
  154. idDefaultValue:0.1 &
  155. wait
  156. local return_code=$?
  157. check_run_status $return_code $step_start_time "spark原始样本生产任务"
  158. }
  159. make_bucket_feature() {
  160. local step_start_time=$(date +%s)
  161. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  162. --class com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_33_bucketData_20240718 \
  163. --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
  164. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  165. beginStr:${today_early_1} endStr:${today_early_1} repartition:100 \
  166. filterNames:_4h_,_5h_,adid_,targeting_conversion_ \
  167. readPath:${TRAIN_PATH} \
  168. savePath:${BUCKET_FEATURE_PATH}
  169. local return_code=$?
  170. check_run_status $return_code $step_start_time "spark特征分桶任务"
  171. }
  172. xgb_train() {
  173. local step_start_time=$(date +%s)
  174. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  175. --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
  176. --master yarn --driver-memory 6G --executor-memory 9G --executor-cores 1 --num-executors 31 \
  177. --conf spark.yarn.executor.memoryoverhead=1000 \
  178. --conf spark.shuffle.service.enabled=true \
  179. --conf spark.shuffle.service.port=7337 \
  180. --conf spark.shuffle.consolidateFiles=true \
  181. --conf spark.shuffle.manager=sort \
  182. --conf spark.storage.memoryFraction=0.4 \
  183. --conf spark.shuffle.memoryFraction=0.5 \
  184. --conf spark.default.parallelism=200 \
  185. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  186. featureFile:20240703_ad_feature_name.txt \
  187. trainPath:${train_data_path} \
  188. testPath:${predict_date_path} \
  189. savePath:${new_model_predict_result_path} \
  190. modelPath:${model_save_path} \
  191. eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
  192. local return_code=$?
  193. check_run_status $return_code $step_start_time "XGB模型训练任务"
  194. }
  195. model_predict() {
  196. # 线上模型评估最新的数据
  197. local step_start_time=$(date +%s)
  198. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  199. --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
  200. --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 30 \
  201. --conf spark.yarn.executor.memoryoverhead=1024 \
  202. --conf spark.shuffle.service.enabled=true \
  203. --conf spark.shuffle.service.port=7337 \
  204. --conf spark.shuffle.consolidateFiles=true \
  205. --conf spark.shuffle.manager=sort \
  206. --conf spark.storage.memoryFraction=0.4 \
  207. --conf spark.shuffle.memoryFraction=0.5 \
  208. --conf spark.default.parallelism=200 \
  209. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  210. featureFile:20240703_ad_feature_name.txt \
  211. testPath:${predict_date_path} \
  212. savePath:${online_model_predict_result_path} \
  213. modelPath:${online_model_path}
  214. local return_code=$?
  215. check_run_status $return_code $step_start_time "线上模型评估${predict_date_path: -8}的数据"
  216. local mean_abs_diff=$(python ${sh_path}/model_predict_analyse.py -p ${online_model_predict_result_path} ${new_model_predict_result_path})
  217. if (( $(echo "${mean_abs_diff} > 0.000400" | bc -l ) ));then
  218. check_run_status 1 $step_start_time "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.000400,请检查"
  219. echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.000400,请检查"
  220. exit 1
  221. fi
  222. }
  223. model_upload_oss() {
  224. cd ${model_local_path}
  225. $hadoop fs -get ${model_save_path} ./${model_name}
  226. if [ ! -d ./${model_name} ]; then
  227. echo "从HDFS下载模型失败"
  228. check_run_status 1 $step_start_time "XGB模型训练任务"
  229. exit 1
  230. fi
  231. tar -czvf ${model_name}.tar.gz -C ${model_name} .
  232. rm -rf .${model_name}.tar.gz.crc
  233. $hadoop fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz
  234. $hadoop fs -put ${model_name}.tar.gz ${MODEL_OSS_PATH}
  235. check_run_status $return_code $step_start_time "模型上传OSS"
  236. echo ${model_save_path} > ${model_path_file}
  237. local step_end_time=$(date +%s)
  238. local elapsed=$(($step_end_time - $start_time))
  239. echo -e "$LOG_PREFIX -- 模型更新完成 -- 模型更新成功: 耗时 $elapsed"
  240. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "模型更新成功" --start "$start_time" --elapsed "$elapsed"
  241. }
  242. # 主方法
  243. main() {
  244. init
  245. check_ad_hive
  246. make_origin_data
  247. make_bucket_feature
  248. xgb_train
  249. model_predict
  250. model_upload_oss
  251. }
  252. main