02_ad_model_update_ twice_daily.sh 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #!/bin/sh
  2. set -x
  3. export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
  4. export PATH=$SPARK_HOME/bin:$PATH
  5. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  6. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  7. source /root/anaconda3/bin/activate py37
  8. # 全局常量
  9. originDataSavePath=/dw/recommend/model/31_ad_sample_data_v3_auto_test
  10. bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v3_auto_test
  11. model_name=model_bkb8_v3_test
  12. LAST_MODEL_HOME=/root/zhaohp/model_online_test
  13. MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model
  14. OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model
  15. PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict
  16. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  17. FM_HOME=/root/sunmingze/alphaFM
  18. today="$(date +%Y%m%d)"
  19. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  20. start_time=$(date "+%Y-%m-%d %H:%M:%S")
  21. elapsed=0
  22. LOG_PREFIX=广告模型自动更新任务
  23. # 训练和预测数据分区
  24. train_begin_str=''
  25. train_end_str=''
  26. predict_begin_str=''
  27. predict_end_str=''
  28. # HDFS保存数据的目录
  29. trainBucketFeaturePath=${bucketFeatureSavePathHome}
  30. predictBucketFeaturePath=${bucketFeatureSavePathHome}
  31. local_model_file_path=${MODEL_HOME}/${model_name}.txt
  32. local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt
  33. # 全局初始化
  34. global_init() {
  35. # 获取当前小时,确定需要使用的数据分区范围
  36. local current_hour=$(date +%H)
  37. if [ $current_hour -lt 08 ]; then
  38. train_begin_str=${today_early_1}14
  39. train_end_str=${today_early_1}21
  40. predict_begin_str=${today_early_1}22
  41. predict_end_str=${today_early_1}23
  42. trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train
  43. predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict
  44. local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
  45. local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
  46. elif [ $current_hour -ge 20 ]; then
  47. train_begin_str=${today_early_1}22
  48. train_end_str=${today}13
  49. predict_begin_str=${today}14
  50. predict_end_str=${today}15
  51. trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/train
  52. predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/predict
  53. local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
  54. local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
  55. else
  56. echo "当前时间段异常: 退出任务"
  57. exit 1
  58. fi
  59. # 删除HDFS目录,保证本次任务运行时目录干净
  60. $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath}
  61. $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath}
  62. echo "全局变量初始化化: "
  63. echo " train_begin_str=${train_begin_str}"
  64. echo " train_end_str=${train_end_str}"
  65. echo " predict_begin_str=${predict_begin_str}"
  66. echo " predict_end_str=${predict_end_str}"
  67. echo " trainBucketFeaturePath=${trainBucketFeaturePath}"
  68. echo " predictBucketFeaturePath=${predictBucketFeaturePath}"
  69. echo " local_model_file_path=${local_model_file_path}"
  70. echo " local_change_model_file_path=${local_change_model_file_path}"
  71. }
  72. # 校验命令的退出码
  73. check_run_status() {
  74. local status=$1
  75. local step_start_time=$2
  76. local step_name=$3
  77. local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
  78. local step_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time")))
  79. if [ $status -ne 0 ]; then
  80. echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
  81. local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
  82. /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  83. exit 1
  84. else
  85. echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
  86. fi
  87. }
  88. # 校验大数据任务是否执行完成
  89. check_ad_hive() {
  90. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  91. while true; do
  92. local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:9:10})
  93. local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
  94. local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step_start_time")))
  95. if [ "$python_return_code" -eq 0 ]; then
  96. break
  97. fi
  98. echo "Python程序返回非0值,等待五分钟后再次调用。"
  99. sleep 300
  100. local current_hour=$(date +%H)
  101. local current_minute=$(date +%M)
  102. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  103. local msg="大数据数据生产校验失败, 分区: ${today}10"
  104. echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
  105. /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  106. exit 1
  107. fi
  108. done
  109. echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
  110. }
  111. # 原始特征生产
  112. make_origin_data() {
  113. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  114. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  115. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
  116. --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
  117. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  118. tablePart:64 repartition:16 \
  119. beginStr:${train_begin_str} endStr:${predict_end_str} \
  120. savePath:${originDataSavePath} \
  121. table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
  122. idDefaultValue:0.01
  123. local return_code=$?
  124. check_run_status $return_code $step_start_time "Spark原始样本生产任务"
  125. }
  126. # 特征分桶,训练用的数据和预测用的数据分不同的目录
  127. make_bucket_feature() {
  128. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  129. # 训练用的数据
  130. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  131. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
  132. --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
  133. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  134. beginStr:${train_begin_str} endStr:${train_end_str} repartition:100 \
  135. filterNames:adid_,targeting_conversion_ \
  136. readPath:${originDataSavePath} \
  137. savePath:${trainBucketFeaturePath}
  138. local return_code=$?
  139. check_run_status $return_code $step_start_time "Spark特征分桶任务: 训练数据分桶"
  140. # 预测用的数据
  141. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  142. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
  143. --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
  144. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  145. beginStr:${predict_begin_str} endStr:${predict_end_str} repartition:100 \
  146. filterNames:adid_,targeting_conversion_ \
  147. readPath:${originDataSavePath} \
  148. savePath:${predictBucketFeaturePath}
  149. return_code=$?
  150. check_run_status $return_code $step_start_time "Spark特征分桶任务: 预测数据分桶"
  151. }
  152. # 模型训练
  153. model_train() {
  154. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  155. $HADOOP fs -text ${trainBucketFeaturePath}/* | ${FM_HOME}/bin/fm_train -m ${local_model_file_path} -dim 1,1,8 -im ${LAST_MODEL_HOME}/model_online.txt -core 8
  156. local return_code=$?
  157. check_run_status $return_code $step_start_time "模型训练"
  158. }
  159. # AUC对比
  160. auc_compare() {
  161. local step5_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  162. # 5.1 计算线上模型的AUC
  163. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  164. $HADOOP fs -text ${predictBucketFeaturePath}/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt
  165. online_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_online.txt | /root/sunmingze/AUC/AUC`
  166. local return_code=$?
  167. check_run_status $return_code $step_start_time "线上模型AUC计算"
  168. # 5.2 计算新模型的AUC
  169. step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  170. $HADOOP fs -text ${predictBucketFeaturePath}/* | ${FM_HOME}/bin/fm_predict -m ${local_model_file_path} -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt
  171. new_auc=`cat ${PREDICT_PATH}/${model_name}_${train_end_str}_new.txt | /root/sunmingze/AUC/AUC`
  172. return_code=$?
  173. check_run_status $return_code $step_start_time "新模型的AUC计算"
  174. echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}"
  175. # 5.3 计算新模型与线上模型的AUC差值的绝对值
  176. auc_diff=$(echo "$online_auc - $new_auc" | bc -l)
  177. local auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l)
  178. local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
  179. local step5_elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$step5_start_time")))
  180. # 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型
  181. if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then
  182. local msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}"
  183. echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
  184. elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then
  185. local msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
  186. echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
  187. else
  188. local msg="新模型与线上模型差值大于等于阈值0.005或新模型的AUC小于0.73 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff"
  189. echo -e "$LOG_PREFIX -- $msg: 耗时 $step5_elapsed"
  190. local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
  191. /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  192. exit 1
  193. fi
  194. }
  195. # 模型格式转换
  196. model_to_online_format() {
  197. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  198. cat ${local_model_file_path} |
  199. awk -F " " '{
  200. if (NR == 1) {
  201. print $1"\t"$2
  202. } else {
  203. split($0, fields, " ");
  204. OFS="\t";
  205. line=""
  206. for (i = 1; i <= 10 && i <= length(fields); i++) {
  207. line = (line ? line "\t" : "") fields[i];
  208. }
  209. print line
  210. }
  211. }' > ${local_change_model_file_path}
  212. local return_code=$?
  213. check_run_status $return_code $step_start_time "模型格式转换"
  214. }
  215. # 模型文件上传OSS
  216. model_upload_oss() {
  217. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  218. local online_model_path=${OSS_PATH}/${model_name}.txt
  219. $HADOOP fs -test -e ${online_model_path}
  220. if [ $? -eq 0 ]; then
  221. echo "删除已存在的OSS模型文件"
  222. $HADOOP fs -rm -r -skipTrash ${online_model_path}
  223. fi
  224. $HADOOP fs -put ${local_change_model_file_path} ${online_model_path}
  225. local return_code=$?
  226. check_run_status $return_code $step_start_time "模型文件上传OSS"
  227. }
  228. # 模型文件本地备份
  229. model_local_back() {
  230. local step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
  231. # 将之前的线上模型进行备份,表示从上一个备份时间到当前时间内,使用的线上模型都是此文件
  232. # 假设当前是07-11,上一次备份时间为07-07。备份之后表示从07-07下午至07-11上午线上使用的模型文件都是model_online_20240711.txt
  233. cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_${$(date "+%Y%m%d%H")}.txt
  234. cp -f ${local_model_file_path} ${LAST_MODEL_HOME}/model_online.txt
  235. local return_code=$?
  236. check_run_status $return_code $step_start_time "模型备份"
  237. }
  238. # 任务完成通知
  239. success_inform() {
  240. local step_end_time=$(date "+%Y-%m-%d %H:%M:%S")
  241. local msg="\n\t - 广告模型文件更新完成 \n\t - 前一天线上模型全天数据AUC: $yesterday_online_auc \n\t - 前一天新模型全天数据AUC: $yesterday_new_auc \n\t - 新模型AUC: $new_auc \n\t - 线上模型AUC: $online_auc \n\t - AUC差值: $auc_diff \n\t - 模型上传路径: $online_model_path"
  242. echo -e "$LOG_PREFIX -- 模型更新完成 -- $msg: 耗时 $step_elapsed"
  243. local elapsed=$(($(date +%s -d "$step_end_time") - $(date +%s -d "$start_time")))
  244. /root/anaconda3/bin/python ad/ad_monitor_util.py --level info --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  245. }
  246. main() {
  247. global_init
  248. check_ad_hive
  249. make_origin_data
  250. make_bucket_feature
  251. model_train
  252. auc_compare
  253. model_to_online_format
  254. model_upload_oss
  255. model_local_back
  256. success_inform
  257. }
  258. main