01_ad_model_update_20241202.sh 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. #!/bin/sh
  2. set -x
  3. export PATH=$SPARK_HOME/bin:$PATH
  4. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  5. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  6. sh_path=$(cd $(dirname $0); pwd)
  7. source ${sh_path}/00_common.sh
  8. source /root/anaconda3/bin/activate py37
  9. # 全局常量
  10. LOG_PREFIX=广告模型训练任务
  11. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  12. TRAIN_PATH=/dw/recommend/model/31_ad_sample_data_v4
  13. BUCKET_FEATURE_PATH=/dw/recommend/model/33_ad_train_data_v4
  14. TABLE=alg_recsys_ad_sample_all
  15. # 特征文件名
  16. feature_file=20240703_ad_feature_name.txt
  17. # 模型本地临时保存路径
  18. model_local_home=/root/zhaohp/XGB/
  19. # 模型更新的天数
  20. TRAIN_UPDATE_DAY_OF_WEEK=("星期一" "星期三" "星期五")
  21. # 模型HDFS保存路径,测试时修改为其他路径,避免影响线上
  22. MODEL_PATH=/dw/recommend/model/35_ad_model
  23. # 预测结果保存路径,测试时修改为其他路径,避免影响线上
  24. PREDICT_RESULT_SAVE_PATH=/dw/recommend/model/34_ad_predict_data
  25. # 模型OSS保存路径,测试时修改为其他路径,避免影响线上
  26. MODEL_OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
  27. # 线上模型名,测试时修改为其他模型名,避免影响线上
  28. model_name=model_xgb_351_1000_v2
  29. # 线上校准文件名
  30. OSS_CALIBRATION_FILE_NAME=model_xgb_351_1000_v2_calibration
  31. # 用于存放一些临时的文件
  32. PREDICT_CACHE_PATH=/root/zhaohp/XGB/predict_cache
  33. # 本地保存HDFS模型路径文件,测试时修改为其他模型名,避免影响线上
  34. model_path_file=${model_local_home}/online_model_path.txt
  35. # 获取当前是星期几,1表示星期一
  36. current_day_of_week=$(get_day_of_week)
  37. # 任务开始时间
  38. start_time=$(date +%s)
  39. # 前一天
  40. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  41. # 线上模型在HDFS中的路径
  42. online_model_path=`cat ${model_path_file}`
  43. # 训练用的数据路径
  44. train_data_path=""
  45. # 评估用的数据路径
  46. predict_date_path=""
  47. #评估结果保存路径
  48. new_model_predict_result_path=""
  49. # 模型保存路径
  50. model_save_path=""
  51. # 评测结果保存路径,后续需要根据此文件评估是否要更新模型
  52. predict_analyse_file_path=""
  53. # 校准文件保存路径
  54. calibration_file_path=""
  55. # 模型训练信息保存路径
  56. model_train_info_file_path=""
  57. # 保存模型评估的分析结果
  58. old_incr_rate_avg=0
  59. new_incr_rate_avg=0
  60. train_info_write_file() {
  61. if [ $# -ne 2 ]; then
  62. echo "训练信息写入文件失败,参数个数不等于0"
  63. exit 1
  64. fi
  65. echo "${1}: ${2}" >> ${model_train_info_file_path}
  66. }
  67. init() {
  68. declare -a date_keys=()
  69. local count=1
  70. local current_data="$(date -d '2 days ago' +%Y%m%d)"
  71. # 循环获取前 n 天的非节日日期
  72. while [[ ${count} -le 7 ]]; do
  73. date_key=$(date -d "${current_data}" +%Y%m%d)
  74. # 判断是否是节日,并拼接训练数据路径
  75. if [ $(is_not_holidays ${date_key}) -eq 1 ]; then
  76. # 将 date_key 放入数组
  77. date_keys+=("${date_key}")
  78. if [[ -z ${train_data_path} ]]; then
  79. train_data_path="${BUCKET_FEATURE_PATH}/${date_key}"
  80. else
  81. train_data_path="${BUCKET_FEATURE_PATH}/${date_key},${train_data_path}"
  82. fi
  83. count=$((count + 1))
  84. else
  85. echo "日期: ${date_key}是节日,跳过"
  86. fi
  87. current_data=$(date -d "${current_data} -1 day" +%Y%m%d)
  88. done
  89. last_index=$((${#date_keys[@]} - 1))
  90. train_first_day=${date_keys[$last_index]}
  91. train_last_day=${date_keys[0]}
  92. model_save_path=${MODEL_PATH}/${model_name}_${train_first_day: -4}_${train_last_day: -4}
  93. model_train_info_file_path=${model_local_home}/model_train_info/${model_name}_${train_first_day: -4}_${train_last_day: -4}.txt
  94. predict_date_path=${BUCKET_FEATURE_PATH}/${today_early_1}
  95. new_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${train_first_day: -4}_${train_last_day: -4}
  96. online_model_predict_result_path=${PREDICT_RESULT_SAVE_PATH}/${today_early_1}_351_1000_${online_model_path: -9}
  97. predict_analyse_file_path=${model_local_home}/predict_analyse_file/${today_early_1}_351_1000_analyse.txt
  98. calibration_file_path=${model_local_home}/${OSS_CALIBRATION_FILE_NAME}.txt
  99. echo "init param train_data_path: ${train_data_path}"
  100. echo "init param predict_date_path: ${predict_date_path}"
  101. echo "init param new_model_predict_result_path: ${new_model_predict_result_path}"
  102. echo "init param online_model_predict_result_path: ${online_model_predict_result_path}"
  103. echo "init param model_save_path: ${model_save_path}"
  104. echo "init param online_model_path: ${online_model_path}"
  105. echo "init param feature_file: ${feature_file}"
  106. echo "init param model_name: ${model_name}"
  107. echo "init param model_local_home: ${model_local_home}"
  108. echo "init param model_oss_path: ${MODEL_OSS_PATH}"
  109. echo "init param predict_analyse_file_path: ${predict_analyse_file_path}"
  110. echo "init param calibration_file_path: ${calibration_file_path}"
  111. echo "init param current_day_of_week: ${current_day_of_week}"
  112. echo "init param model_train_info_file_path: ${model_train_info_file_path}"
  113. echo "当前Python环境安装的Python版本: $(python --version)"
  114. echo "当前Python环境安装的三方包: $(python -m pip list)"
  115. train_info_write_file "模型名" ${model_name}
  116. train_info_write_file "开始时间" ${start_time}
  117. train_info_write_file "python版本" $(python --version)
  118. train_info_write_file "训练数据路径" ${train_data_path}
  119. train_info_write_file "验证数据路径" ${predict_date_path}
  120. train_info_write_file "新模型路径" ${model_save_path}
  121. train_info_write_file "新模型验证结果保存路径" ${new_model_predict_result_path}
  122. train_info_write_file "线上模型路径" ${online_model_path}
  123. train_info_write_file "线上模型验证结果保存路径" ${online_model_predict_result_path}
  124. train_info_write_file "模型验证结果分析文件路径" ${predict_analyse_file_path}
  125. train_info_write_file "是否测试" "是"
  126. }
  127. check_run_status_v2() {
  128. if [ $# -ne 4 ]; then
  129. echo "校验状态码函数异常: 参数个数不等于4"
  130. train_info_write_file "广告模型自动更新任务结果" "异常结束"
  131. train_info_write_file "广告模型自动更新失败原因" "校验状态码函数异常: 参数个数不等于4"
  132. exit 1
  133. fi
  134. local status=$1
  135. local step_name=$2
  136. local step_start_time=$3
  137. local failed_reason=$4
  138. local step_end_time=$(date +%s)
  139. local step_elapsed=$(($step_end_time - $step_start_time))
  140. train_info_write_file ${step_name}耗时 ${elapsed}
  141. if [ ${status} -ne 0 ]; then
  142. train_info_write_file ${step_name} "失败"
  143. train_info_write_file "广告模型自动更新任务结果" "失败"
  144. train_info_write_file "广告模型自动更新失败原因" "${failed_reason}"
  145. train_info_write_file "结束时间" ${step_end_time}
  146. /root/anaconda3/bin/python ${sh_path}/ad_monitor_util.py --tif $model_train_info_file_path}
  147. exit 1
  148. else
  149. train_info_write_file ${step_name} "成功"
  150. fi
  151. }
  152. # 校验大数据任务是否执行完成
  153. check_ad_hive() {
  154. local step_start_time=$(date +%s)
  155. local max_hour=05
  156. local max_minute=30
  157. local elapsed=0
  158. local result=0
  159. while true; do
  160. local python_return_code=$(python ${sh_path}/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh 23)
  161. elapsed=$(($(date +%s) - ${step_start_time}))
  162. if [ "${python_return_code}" -eq 0 ]; then
  163. break
  164. fi
  165. echo "Python程序返回非0值,等待五分钟后再次调用。"
  166. sleep 300
  167. local current_hour=$(date +%H)
  168. local current_minute=$(date +%M)
  169. if (( ${current_hour} > ${max_hour} || ( ${current_hour} == ${max_hour} && ${current_minute} >= ${max_minute} ) )); then
  170. result=1
  171. break
  172. fi
  173. done
  174. check_run_status_v2 ${result} ${step_start_time} "大数据数据生产校验任务" "大数据数据生产校验失败, 分区: ${today_early_1},请检查!"
  175. echo "${LOG_PREFIX} -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 ${elapsed}"
  176. }
  177. origin_data() {
  178. (
  179. source ${sh_path}/25_xgb_make_data_origin_bucket.sh
  180. make_origin_data_v2
  181. )
  182. }
  183. bucket_feature() {
  184. (
  185. source ${sh_path}/25_xgb_make_data_origin_bucket.sh
  186. make_bucket_feature_v2
  187. )
  188. }
  189. xgb_train() {
  190. local step_start_time=$(date +%s)
  191. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  192. --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
  193. --master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors 31 \
  194. --conf spark.yarn.executor.memoryoverhead=2048 \
  195. --conf spark.shuffle.service.enabled=true \
  196. --conf spark.shuffle.service.port=7337 \
  197. --conf spark.shuffle.consolidateFiles=true \
  198. --conf spark.shuffle.manager=sort \
  199. --conf spark.storage.memoryFraction=0.4 \
  200. --conf spark.shuffle.memoryFraction=0.5 \
  201. --conf spark.default.parallelism=200 \
  202. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  203. featureFile:20240703_ad_feature_name.txt \
  204. trainPath:${train_data_path} \
  205. testPath:${predict_date_path} \
  206. savePath:${new_model_predict_result_path} \
  207. modelPath:${model_save_path} \
  208. eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:30 repartition:20
  209. local return_code=$?
  210. check_run_status_v2 ${return_code} "XGB模型训练任务" ${step_start_time} "XGB模型训练失败"
  211. }
  212. calc_model_predict() {
  213. local count=0
  214. local max_line=10
  215. local old_total_diff=0
  216. local new_total_diff=0
  217. while read -r line && [ ${count} -lt ${max_line} ]; do
  218. # 使用 ! 取反判断,只有当行中不包含 "cid" 时才执行继续的逻辑
  219. if [[ "${line}" == *"cid"* ]]; then
  220. continue
  221. fi
  222. read -a numbers <<< "${line}"
  223. old_abs_score=$( echo "${numbers[6]} * ((${numbers[6]} >= 0) * 2 - 1)" | bc -l )
  224. new_abs_score=$( echo "${numbers[7]} * ((${numbers[7]} >= 0) * 2 - 1)" | bc -l )
  225. old_total_diff=$( echo "${old_total_diff} + ${old_abs_score}" | bc -l )
  226. new_total_diff=$( echo "${new_total_diff} + ${new_abs_score}" | bc -l )
  227. count=$((${count} + 1))
  228. done < "${predict_analyse_file_path}"
  229. local calc_top_code=$?
  230. old_incr_rate_avg=$( echo "scale=6; ${old_total_diff} / ${count}" | bc -l )
  231. local old_rate_avg_code=$?
  232. new_incr_rate_avg=$( echo "scale=6; ${new_total_diff} / ${count}" | bc -l )
  233. local new_rate_avg_code=$?
  234. if [ ${calc_top_code} -ne 0 ] || [ ${old_rate_avg_code} -ne 0 ] || [ ${new_rate_avg_code} -ne 0 ]; then
  235. check_run_status_v2 1 "计算新老模型Top10差异" ${step_start_time} "计算新老模型Top10差异异常"
  236. fi
  237. echo "老模型素材打分相对误差abs后的均值: ${old_incr_rate_avg}"
  238. echo "新模型素材打分相对误差abs后的均值: ${new_incr_rate_avg}"
  239. }
  240. calc_auc() {
  241. local old_auc=`cat ${PREDICT_CACHE_PATH}/old_1.txt | /root/sunmingze/AUC/AUC`
  242. train_info_write_file "老模型AUC" "${old_auc}"
  243. local new_auc=`cat ${PREDICT_CACHE_PATH}/new_1.txt | /root/sunmingze/AUC/AUC`
  244. train_info_write_file "新模型AUC" "${new_auc}"
  245. }
  246. model_predict() {
  247. # 线上模型评估最新的数据
  248. local step_start_time=$(date +%s)
  249. /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
  250. --class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_hdfsfile_20240813 \
  251. --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 30 \
  252. --conf spark.yarn.executor.memoryoverhead=1024 \
  253. --conf spark.shuffle.service.enabled=true \
  254. --conf spark.shuffle.service.port=7337 \
  255. --conf spark.shuffle.consolidateFiles=true \
  256. --conf spark.shuffle.manager=sort \
  257. --conf spark.storage.memoryFraction=0.4 \
  258. --conf spark.shuffle.memoryFraction=0.5 \
  259. --conf spark.default.parallelism=200 \
  260. /root/zhangbo/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
  261. featureFile:20240703_ad_feature_name.txt \
  262. testPath:${predict_date_path} \
  263. savePath:${online_model_predict_result_path} \
  264. modelPath:${online_model_path}
  265. local return_code=$?
  266. # 结果分析
  267. local python_return_code=$(python ${sh_path}/model_predict_analyse.py -op ${online_model_predict_result_path} -np ${new_model_predict_result_path} -af ${predict_analyse_file_path} -cf ${calibration_file_path})
  268. if [ ${return_code} -ne 0 ] || [ ${python_return_code} -ne 0 ] || [ $? -ne 0 ]; then
  269. check_run_status_v2 1 "线上模型评估昨天数据任务" ${step_start_time} "线上模型评估昨天数据任务异常"
  270. else
  271. check_run_status_v2 0 "线上模型评估昨天数据任务" ${step_start_time} "线上模型评估昨天数据任务异常"
  272. fi
  273. }
  274. model_predict_analyse_check() {
  275. local step_start_time=$(date +%s)
  276. calc_model_predict
  277. calc_auc
  278. if (( $(echo "${new_incr_rate_avg} > 0.100000" | bc -l ) ));then
  279. check_run_status_v2 1 "判断模型素材打分相对误差abs后的均值" ${step_start_time} "线上模型评估${predict_date_path: -8}的数据,绝对误差大于0.1,请检查"
  280. exit 1
  281. fi
  282. # 对比两个模型的差异
  283. score_diff=$( echo "${new_incr_rate_avg} - ${old_incr_rate_avg}" | bc -l )
  284. if (( $(echo "${score_diff} > 0.050000" | bc -l ) ));then
  285. echo "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查"
  286. check_run_status_v2 1 "计算新老模型相对误差abs后的均值差异" ${step_start_time} "两个模型评估${predict_date_path: -8}的数据,两个模型分数差异为: ${score_diff}, 大于0.05"
  287. exit 1
  288. fi
  289. }
  290. model_upload_oss() {
  291. local step_start_time=$(date +%s)
  292. (
  293. cd ${model_local_home}
  294. ${HADOOP} fs -get ${model_save_path} ${model_name}
  295. if [ ! -d ${model_name} ]; then
  296. echo "从HDFS下载模型失败"
  297. check_run_status_v2 1 "HDFS下载模型任务" ${step_start_time} "HDFS下载模型失败"
  298. exit 1
  299. fi
  300. tar -czvf ${model_name}.tar.gz -C ${model_name} .
  301. rm -rf ${model_name}.tar.gz.crc
  302. # 从OSS中移除模型文件和校准文件
  303. ${HADOOP} fs -rm -r -skipTrash ${MODEL_OSS_PATH}/${model_name}.tar.gz ${MODEL_OSS_PATH}/${OSS_CALIBRATION_FILE_NAME}.txt
  304. # 将模型文件和校准文件推送到OSS上
  305. ${HADOOP} fs -put ${model_name}.tar.gz ${OSS_CALIBRATION_FILE_NAME}.txt ${MODEL_OSS_PATH}
  306. local return_code=$?
  307. check_run_status_v2 ${return_code} "模型上传OSS任务" ${step_start_time} "模型上传OSS失败"
  308. echo ${model_save_path} > ${model_path_file}
  309. #
  310. rm -f ./${model_name}.tar.gz
  311. rm -rf ./${model_name}
  312. rm -rf ${OSS_CALIBRATION_FILE_NAME}.txt
  313. )
  314. local return_code=$?
  315. check_run_status_v2 ${return_code} "模型上传OSS任务" ${step_start_time} "模型上传OSS失败"
  316. local step_end_time=$(date +%s)
  317. local elapsed=$((${step_end_time} - ${start_time}))
  318. echo -e "${LOG_PREFIX} -- 模型更新完成 -- 模型更新成功: 耗时 ${elapsed}"
  319. train_info_write_file "结束时间" $(date +%s)
  320. train_info_write_file "广告模型自动更新任务结果" "完成"
  321. }
  322. # 主方法
  323. main() {
  324. init
  325. check_ad_hive
  326. origin_data
  327. bucket_feature
  328. if [[ ${TRAIN_UPDATE_DAY_OF_WEEK[@]} =~ ${current_day_of_week} ]]; then
  329. echo "当前${current_day_of_week},开始训练并更新模型"
  330. xgb_train
  331. model_predict
  332. model_predict_analyse_check
  333. model_upload_oss
  334. else
  335. echo "当前是周一,周三或周五,不更新模型"
  336. fi
  337. }
  338. main