05_update_everyday_2model.sh 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #!/bin/sh
  2. set -ex
  3. # 0 全局变量/参数
  4. samplePath=/dw/recommend/model/10_sample_data_v3/
  5. savePath=/dw/recommend/model/12_ros_data_v3/
  6. model_name=model_jerry
  7. today="$(date +%Y%m%d)"
  8. today_early_1="$(date -d '1 days ago' +%Y%m%d)"
  9. yesterday="$(date -d '1 days ago' +%Y%m%d)"
  10. HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
  11. FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
  12. MODEL_PATH="/root/zhangbo/recommend-emr-dataprocess/zhangbo/model/"
  13. OSS_PATH="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/"
  14. export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
  15. export PATH=$SPARK_HOME/bin:$PATH
  16. export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  17. export JAVA_HOME=/usr/lib/jvm/java-1.8.0
  18. # 0 判断上游表是否生产完成,最长等待到11点
  19. source /root/anaconda3/bin/activate py37
  20. max_hour=11
  21. max_minute=00
  22. while true; do
  23. python_return_code=$(python utils.py --excute_program check_hive --partition ${today_early_1} --project loghubods --table alg_recsys_view_sample_v3)
  24. if [ $python_return_code -eq 0 ]; then
  25. echo "Python程序返回0,退出循环。"
  26. break
  27. fi
  28. echo "Python程序返回非0值,等待五分钟后再次调用。"
  29. sleep 300
  30. current_hour=$(date +%H)
  31. current_minute=$(date +%M)
  32. if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
  33. echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
  34. exit 1
  35. fi
  36. done
  37. #conda deactivate
  38. # 1 生产数据
  39. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  40. --class com.aliyun.odps.spark.examples.makedata.makedata_10_originData_v3 \
  41. --name every_day_origindata_${model_name}_${today} \
  42. --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
  43. /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  44. tablePart:32 savePath:${samplePath} beginStr:${today_early_1} endStr:${today_early_1}
  45. if [ $? -eq 1 ]; then
  46. echo "Spark原始样本生产任务执行失败"
  47. exit 1
  48. else
  49. echo "spark原始样本生产执行成功"
  50. fi
  51. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  52. --class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3 \
  53. --name makedata_12_rosData_v3_${model_name}_${today} \
  54. --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
  55. /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  56. readPath:${samplePath} savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1} ifRepart:10
  57. if [ $? -eq 1 ]; then
  58. echo "Spark训练样本-生产任务执行失败-ros"
  59. exit 1
  60. else
  61. echo "spark训练样本-生产执行成功-ros"
  62. fi
  63. # 2 加载上次模型 训练本轮数据 保存本轮模型
  64. end_date=${today}
  65. loop_date=${yesterday}
  66. while [[ "$loop_date" != "$end_date" ]]; do
  67. echo -------train ${loop_date}----------
  68. loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d)
  69. $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \
  70. -dim 0,1,0 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt
  71. if [ $? -eq 1 ]; then
  72. echo "训练失败"
  73. exit 1
  74. fi
  75. echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt----------
  76. loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d)
  77. done
  78. # 3 本轮模型格式转换
  79. cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
  80. | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
  81. > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
  82. # 4 转换后模型上传oss
  83. online_model_path=${OSS_PATH}/${model_name}.txt
  84. $HADOOP fs -test -e ${online_model_path}
  85. if [ $? -eq 0 ]; then
  86. echo "数据存在, 先删除。"
  87. $HADOOP fs -rm -r ${online_model_path}
  88. else
  89. echo "数据不存在"
  90. fi
  91. $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
  92. # 5 str数据生产
  93. savePath=/dw/recommend/model/11_str_data_v3/
  94. model_name=model_tom
  95. /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
  96. --class com.aliyun.odps.spark.examples.makedata.makedata_11_strData_v3 \
  97. --name makedata_11_strData_v3_${model_name}_${today} \
  98. --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 64 \
  99. /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
  100. readPath:${samplePath} savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1} ifRepart:100
  101. if [ $? -eq 1 ]; then
  102. echo "Spark训练样本-生产任务执行失败-str"
  103. exit 1
  104. else
  105. echo "spark训练样本-生产执行成功-str"
  106. fi
  107. # 6 加载上次模型 训练本轮数据 保存本轮模型
  108. end_date=${today}
  109. loop_date=${yesterday}
  110. while [[ "$loop_date" != "$end_date" ]]; do
  111. echo -------train ${loop_date}----------
  112. loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d)
  113. $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \
  114. -dim 0,1,0 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt
  115. if [ $? -eq 1 ]; then
  116. echo "训练失败"
  117. exit 1
  118. fi
  119. echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt----------
  120. loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d)
  121. done
  122. # 7 本轮模型格式转换
  123. cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
  124. | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
  125. > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
  126. # 8 转换后模型上传oss
  127. online_model_path=${OSS_PATH}/${model_name}.txt
  128. $HADOOP fs -test -e ${online_model_path}
  129. if [ $? -eq 0 ]; then
  130. echo "数据存在, 先删除。"
  131. $HADOOP fs -rm -r ${online_model_path}
  132. else
  133. echo "数据不存在"
  134. fi
  135. $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
  136. # nohup sh 05_update_everyday_2model.sh > p5.log 2>&1 &