01_ad_model_update.sh 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. #!/bin/sh
  2. set -x
  3. export PATH=$SPARK_HOME/bin:$PATH
  4. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  5. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  6. sh_path=$(cd $(dirname $0); pwd)
  7. source ${sh_path}/00_common.sh
  8. source /root/anaconda3/bin/activate py37
  9. # 全局常量
  10. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  11. TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
  12. BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
  13. TABLE=alg_recsys_ad_sample_all
  14. # 特征文件名
  15. feature_file=20240703_ad_feature_name.txt
  16. # 模型本地临时保存路径
  17. model_local_home=/root/zhaohp/XGB/
  18. # 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
  19. MODEL_PATH=/dw/recommend/model/35_ad_model
  20. # 预测结果保存路径,测试时修改为其他路径,避免影响线上
  21. PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data
  22. # 模型OSS保存路径,测试时修改为其他路径,避免影响线上
  23. MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
  24. # 线上模型名,测试时修改为其他模型名,避免影响线上
  25. model_name=model_xgb_351_1000_v2
  26. # 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
  27. model_path_file=${model_local_home}/online_model_path.txt
  28. # 获取当前是星期几,1表示星期一
  29. current_day_of_week="$(date +"%u")"
  30. # 任务开始时间
  31. start_time=$(date +%s)
  32. # 前一天
  33. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  34. # 线上模型在HDFS中的路径
  35. online_model_path=`cat ${model_path_file}`
  36. # 训练用的数据路径
  37. train_data_path=""
  38. # 评估用的数据路径
  39. predict_date_path=""
  40. #评估结果保存路径
  41. new_model_predict_result_path=""
  42. # 模型保存路径
  43. model_save_path=""
  44. # 评测结果保存路径,后续需要根据此文件评估是否要更新模型
  45. predict_analyse_file_path=""
  46. # 保存模型评估的分析结果
  47. old_incr_rate_avg=0
  48. new_incr_rate_avg=0
  49. old_incr_rate_list=""
  50. new_incr_rate_list=""
  51. # 校验命令的退出码
  52. check_run_status() {
  53. local status=$1
  54. local step_start_time=$2
  55. local step_name=$3
  56. local step_end_time=$(date +%s)
  57. local step_elapsed=$(($step_end_time - $step_start_time))
  58. if [ $status -ne 0 ]; then
  59. echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
  60. local elapsed=$(($step_end_time - $start_time))
  61. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  62. exit 1
  63. else
  64. echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
  65. fi
  66. }
  67. init() {
  68. declare -a date_keys=()
  69. local count=1
  70. local current_data="$(date -d '2 days ago' +%Y%m%d)"
  71. # 循环获取前 n 天的非节日日期
  72. while [[ $count -lt 7 ]]; do
  73. date_key=$(date -d "$current_data" +%Y%m%d)
  74. # 判断是否是节日,并拼接训练数据路径
  75. if [ $(is_not_holidays $date_key) -eq 1 ]; then
  76. # 将 date_key 放入数组
  77. date_keys+=("$date_key")
  78. if [[ -z ${train_data_path} ]]; then
  79. train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
  80. else
  81. train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
  82. fi
  83. count=$((count + 1))
  84. else
  85. echo "日期: ${date_key}是节日,跳过"
  86. fi
  87. current_data=$(date -d "$current_data -1 day" +%Y%m%d)
  88. done
  89. last_index=$((${#date_keys[@]} - 1))
  90. train_first_day=${date_keys[$last_index]}
  91. train_last_day=${date_keys[0]}
  92. model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
  93. predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
  94. new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4}
  95. online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9}
  96. predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt
  97. echo "init param train_data_path: ${train_data_path}"
  98. echo "init param predict_date_path: ${predict_date_path}"
  99. echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
  100. echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
  101. echo "init param model_save_path: ${model_save_path}"
  102. echo "init param online_model_path: ${online_model_path}"
  103. echo "init param feature_file: ${feature_file}"
  104. echo "init param model_name: ${model_name}"
  105. echo "init param model_local_home: ${model_local_home}"
  106. echo "init param model_oss_path: ${MODEL_OSS_PATH}"
  107. echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
  108. echo "init param current_day_of_week: ${current_day_of_week}"
  109. echo "当前Python环境安装的Python版本: $(python --version)"
  110. echo "当前Python环境安装的三方包: $(python -m pip list)"
  111. }
  112. # 校验大数据任务是否执行完成
  113. check_ad_hive() {
  114. local step_start_time=$(date +%s)
  115. local max_hour=05
  116. local max_minute=30
  117. local elapsed=0
  118. while true; do
  119. local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
  120. elapsed=$(($(date +%s) - $step_start_time))
  121. if [ "$python_return_code" -eq 0 ]; then
  122. break
  123. fi
  124. echo "Python程序返回非0值,等待五分钟后再次调用。"
  125. sleep 300
  126. local current_hour=$(date +%H)
  127. local current_minute=$(date +%M)
  128. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  129. local msg="大数据数据生产校验失败, 分区: ${today_early_1}"
  130. echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
  131. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  132. exit 1
  133. fi
  134. done
  135. echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
  136. }
  137. origin_data() {
  138. (
  139. source ${sh_path}/25_xgb_make_data_origin_bucket.sh
  140. make_origin_data
  141. )
  142. }
  143. bucket_feature() {
  144. (
  145. source ${sh_path}/25_xgb_make_data_origin_bucket.sh
  146. make_bucket_feature
  147. )
  148. }
  149. xgb_train() {
  150. local step_start_time=$(date +%s)
  151. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  152. --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
  153. --master yarn --driver-memory 6G --executor-memory 9G --executor-cores 1 --num-executors 31 \
  154. --conf spark.yarn.executor.memoryoverhead=1000 \
  155. --conf spark.shuffle.service.enabled=true \
  156. --conf spark.shuffle.service.port=7337 \
  157. --conf spark.shuffle.consolidateFiles=true \
  158. --conf spark.shuffle.manager=sort \
  159. --conf spark.storage.memoryFraction=0.4 \
  160. --conf spark.shuffle.memoryFraction=0.5 \
  161. --conf spark.default.parallelism=200 \
  162. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  163. featureFile:20240703_ad_feature_name.txt \
  164. trainPath:${train_data_path} \
  165. testPath:${predict_date_path} \
  166. savePath:${new_model_predict_result_path} \
  167. modelPath:${model_save_path} \
  168. eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
  169. local return_code=$?
  170. check_run_status $return_code $step_start_time "XGB模型训练任务"
  171. }
  172. calc_model_predict() {
  173. local count=0
  174. local max_line=10
  175. local old_total_diff=0
  176. local new_total_diff=0
  177. while read -r line && [ ${count} -lt ${max_line} ]; do
  178. # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
  179. if [[ "${line}" == *"cid"* ]]; then
  180. continue
  181. fi
  182. read -a numbers <<< "${line}"
  183. if [[ -z ${old_incr_rate_list} ]];then
  184. old_incr_rate_list="${numbers[6]}"
  185. new_incr_rate_list="${numbers[7]}"
  186. else
  187. old_incr_rate_list="${old_incr_rate_list};${numbers[6]}"
  188. new_incr_rate_list="${new_incr_rate_list};${numbers[7]}"
  189. fi
  190. old_total_diff=$( echo "${old_total_diff} + ${numbers[6]}" | bc -l )
  191. new_total_diff=$( echo "${new_total_diff} + ${numbers[7]}" | bc -l )
  192. count=$((${count} + 1))
  193. done < "${predict_analyse_file_path}"
  194. local return_code=$?
  195. check_run_status $return_code $step_start_time "计算Top10差异"
  196. old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
  197. check_run_status $? $step_start_time "计算老模型Top10差异"
  198. new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
  199. check_run_status $? $step_start_time "计算新模型Top10差异"
  200. echo "老模型Top10差异平均值: ${old_incr_rate_avg}"
  201. echo "老模型Top10差异列表: ${old_incr_rate_list}"
  202. echo "新模型Top10差异平均值: ${new_incr_rate_avg}"
  203. echo "新模型Top10差异列表: ${new_incr_rate_list}"
  204. }
  205. model_predict() {
  206. # 线上模型评估最新的数据
  207. local step_start_time=$(date +%s)
  208. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  209. --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
  210. --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 30 \
  211. --conf spark.yarn.executor.memoryoverhead=1024 \
  212. --conf spark.shuffle.service.enabled=true \
  213. --conf spark.shuffle.service.port=7337 \
  214. --conf spark.shuffle.consolidateFiles=true \
  215. --conf spark.shuffle.manager=sort \
  216. --conf spark.storage.memoryFraction=0.4 \
  217. --conf spark.shuffle.memoryFraction=0.5 \
  218. --conf spark.default.parallelism=200 \
  219. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  220. featureFile:20240703_ad_feature_name.txt \
  221. testPath:${predict_date_path} \
  222. savePath:${online_model_predict_result_path} \
  223. modelPath:${online_model_path}
  224. local return_code=$?
  225. check_run_status $return_code $step_start_time "线上模型评估${predict_date_path: -8}的数据"
  226. # 结果分析
  227. local python_return_code=$(python ${sh_path}/model_predict_analyse.py -p ${online_model_predict_result_path} ${new_model_predict_result_path} -f ${predict_analyse_file_path})
  228. check_run_status $python_return_code $step_start_time "线上模型评估${predict_date_path: -8}的数据"
  229. calc_model_predict
  230. if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then
  231. check_run_status 1 $step_start_time "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
  232. echo "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
  233. exit 1
  234. fi
  235. }
  236. model_upload_oss() {
  237. local step_start_time=$(date +%s)
  238. (
  239. cd ${model_local_home}
  240. ${HADOOP} fs -get ${model_save_path} ${model_name}
  241. if [ ! -d ${model_name} ]; then
  242. echo "从HDFS下载模型失败"
  243. check_run_status 1 $step_start_time "HDFS下载模型任务"
  244. exit 1
  245. fi
  246. tar -czvf ${model_name}.tar.gz -C ${model_name} .
  247. rm -rf ${model_name}.tar.gz.crc
  248. ${HADOOP} fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz
  249. ${HADOOP} fs -put ${model_name}.tar.gz ${MODEL_OSS_PATH}
  250. local return_code=$?
  251. check_run_status $return_code $step_start_time "模型上传OSS任务"
  252. echo ${model_save_path} > ${model_path_file}
  253. rm -f ./${model_name}.tar.gz
  254. rm -rf ./${model_name}
  255. )
  256. local return_code=$?
  257. check_run_status $return_code $step_start_time "模型上传OSS任务"
  258. local step_end_time=$(date +%s)
  259. local elapsed=$(($step_end_time - $start_time))
  260. echo -e "$LOG_PREFIX -- 模型更新完成 -- 模型更新成功: 耗时 $elapsed"
  261. local msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天Top差异平均值: ${old_incr_rate_avg} \n\t - 前一天线上模型全天Top差异: ${old_incr_rate_list} \n\t - 前一天新模型全天Top10差异平均值: ${new_incr_rate_avg} \n\t - 前一天新模型全天Top差异: ${new_incr_rate_list} \n\t - 模型上传路径: ${MODEL_OSS_PATH}/${model_name}.tar.gz"
  262. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --level info --msg "${msg}" --start "${start_time}" --elapsed "${elapsed}"
  263. }
  264. # 主方法
  265. main() {
  266. init
  267. check_ad_hive
  268. origin_data
  269. bucket_feature
  270. if [ "${current_day_of_week}" -eq 2 ] || [ "${current_day_of_week}" -eq 5 ]; then
  271. echo "当前是周二或周五,开始训练并更新模型"
  272. xgb_train
  273. model_predict
  274. model_upload_oss
  275. else
  276. echo "今天不是周二也不是周五,不更新模型"
  277. fi
  278. }
  279. main