02_ad_model_update_twice_daily_v2.sh 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #!/bin/sh
  2. set -x
  3. export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
  4. export PATH=$SPARK_HOME/bin:$PATH
  5. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  6. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  7. source /root/anaconda3/bin/activate py37
  8. # 全局常量
  9. originDataSavePath=/dw/recommend/model/31_ad_sample_data_v3_auto_test
  10. bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v3_auto_test
  11. model_name=model_bkb8_v3_test
  12. LAST_MODEL_HOME=/root/zhaohp/model_online_test
  13. MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model
  14. OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model
  15. PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict
  16. HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
  17. FM_HOME=/root/sunmingze/alphaFM
  18. today="$(date +%Y%m%d)"
  19. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  20. start_time=$(date +%s)
  21. elapsed=0
  22. LOG_PREFIX=广告模型自动更新任务
  23. # 训练和预测数据分区
  24. train_begin_str=''
  25. train_end_str=''
  26. predict_begin_str=''
  27. predict_end_str=''
  28. # HDFS保存数据的目录
  29. trainBucketFeaturePath=${bucketFeatureSavePathHome}
  30. predictBucketFeaturePath=${bucketFeatureSavePathHome}
  31. local_model_file_path=${MODEL_HOME}/${model_name}.txt
  32. local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt
  33. max_hour=21
  34. max_minute=20
  35. # 全局初始化
  36. global_init() {
  37. # 获取当前小时,确定需要使用的数据分区范围
  38. local current_hour="$(date +%H)"
  39. # if [ $current_hour -lt 08 ]; then
  40. train_begin_str=${today_early_1}14
  41. train_end_str=${today_early_1}21
  42. predict_begin_str=${today_early_1}22
  43. predict_end_str=${today_early_1}23
  44. trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train
  45. predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict
  46. local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt
  47. local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt
  48. max_hour=12
  49. # elif [ $current_hour -ge 20 ]; then
  50. # train_begin_str=${today_early_1}22
  51. # train_end_str=${today}13
  52. # predict_begin_str=${today}14
  53. # predict_end_str=${today}15
  54. # trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/train
  55. # predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/predict
  56. # local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
  57. # local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
  58. # max_hour=21
  59. # else
  60. # echo "当前时间段异常: 退出任务"
  61. # exit 1
  62. # fi
  63. # 删除HDFS目录,保证本次任务运行时目录干净
  64. $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath}
  65. $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath}
  66. echo "全局变量初始化化: "
  67. echo " train_begin_str=${train_begin_str}"
  68. echo " train_end_str=${train_end_str}"
  69. echo " predict_begin_str=${predict_begin_str}"
  70. echo " predict_end_str=${predict_end_str}"
  71. echo " originDataSavePath=${originDataSavePath}"
  72. echo " trainBucketFeaturePath=${trainBucketFeaturePath}"
  73. echo " predictBucketFeaturePath=${predictBucketFeaturePath}"
  74. echo " local_model_file_path=${local_model_file_path}"
  75. echo " local_change_model_file_path=${local_change_model_file_path}"
  76. echo " max_hour=${max_hour}"
  77. }
  78. # 校验命令的退出码
  79. check_run_status() {
  80. local status=$1
  81. local step_start_time=$2
  82. local step_name=$3
  83. local step_end_time=$(date +%s)
  84. local step_elapsed=$(($step_end_time - $step_start_time))
  85. if [ $status -ne 0 ]; then
  86. echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
  87. local elapsed=$(($step_end_time - $start_time))
  88. # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  89. exit 1
  90. else
  91. echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
  92. fi
  93. }
  94. # 校验大数据任务是否执行完成
  95. check_ad_hive() {
  96. local step_start_time=$(date +%s)
  97. while true; do
  98. local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:8:10})
  99. local step_end_time=$(date +%s)
  100. local elapsed=$(($step_end_time - $step_start_time))
  101. if [ "$python_return_code" -eq 0 ]; then
  102. break
  103. fi
  104. echo "Python程序返回非0值,等待五分钟后再次调用。"
  105. sleep 300
  106. local current_hour=$(date +%H)
  107. local current_minute=$(date +%M)
  108. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  109. local msg="大数据数据生产校验失败, 分区: ${today}10"
  110. echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
  111. # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
  112. exit 1
  113. fi
  114. done
  115. echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
  116. }
  117. # 原始特征生产
  118. make_origin_data() {
  119. local step_start_time=$(date +%s)
  120. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  121. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
  122. --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
  123. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  124. tablePart:64 repartition:16 \
  125. beginStr:${train_begin_str} endStr:${predict_end_str} \
  126. savePath:${originDataSavePath} \
  127. table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
  128. idDefaultValue:0.01
  129. local return_code=$?
  130. check_run_status $return_code $step_start_time "Spark原始样本生产任务"
  131. }
  132. # 特征分桶,训练用的数据和预测用的数据分不同的目录
  133. make_bucket_feature() {
  134. local step_start_time=$(date +%s)
  135. # 训练用的数据
  136. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  137. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
  138. --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
  139. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  140. beginStr:${train_begin_str} endStr:${train_end_str} repartition:100 \
  141. filterNames:adid_,targeting_conversion_ \
  142. readPath:${originDataSavePath} \
  143. savePath:${trainBucketFeaturePath}
  144. local return_code=$?
  145. check_run_status $return_code $step_start_time "Spark特征分桶任务: 训练数据分桶"
  146. # 预测用的数据
  147. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  148. --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
  149. --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
  150. ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  151. beginStr:${predict_begin_str} endStr:${predict_end_str} repartition:100 \
  152. filterNames:adid_,targeting_conversion_ \
  153. readPath:${originDataSavePath} \
  154. savePath:${predictBucketFeaturePath}
  155. return_code=$?
  156. check_run_status $return_code $step_start_time "Spark特征分桶任务: 预测数据分桶"
  157. }
  158. main() {
  159. global_init
  160. check_ad_hive
  161. make_origin_data
  162. make_bucket_feature
  163. }
  164. main