zhangbo пре 1 година
родитељ
комит
39da0cc9cc
2 измењених фајлова са 103 додато и 0 уклоњено
  1. 79 0
      zhangbo/06_update_everyday_feature.sh
  2. 24 0
      zhangbo/utils.py

+ 79 - 0
zhangbo/06_update_everyday_feature.sh

@@ -0,0 +1,79 @@
+#!/bin/sh
+set -ex
+# 0 全局变量/参数
+today="$(date +%Y%m%d)"
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+yesterday="$(date -d '1 days ago' +%Y%m%d)"
+
+HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
+
+source activate py37
+max_hour=11
+max_minute=00
+
+# 0 判断上游表是否生产完成,最长等待到12点
+while true; do
+  python_return_code=$(python utils.py --excute_program check_item_hive --partition ${today_early_1})
+  if [ $python_return_code -eq 0 ]; then
+    echo "Python程序返回0,退出循环。"
+    break
+  fi
+  echo "Python程序返回非0值,等待五分钟后再次调用。"
+  sleep 300
+  current_hour=$(date +%H)
+  current_minute=$(date +%M)
+  if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+    echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
+    exit 1
+  fi
+done
+# 1 item 生产数据
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_08_item2redis \
+--name makedata_08_item2redis_${model_name}_${today} \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 16 \
+/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+date:${today_early_1} tablePart:32 expireDay:4 ifDebug:False \
+ifVideo:True ifWriteRedis:True savePathVideo:/dw/recommend/model/feature/video
+
+if [ $? -eq 1 ]; then
+    echo "item写入redis执行失败"
+    exit 1
+else
+    echo "item写入redis执行成功"
+fi
+
+# 2 检查user上游表
+while true; do
+  python_return_code=$(python utils.py --excute_program check_user_hive --partition ${today_early_1})
+  if [ $python_return_code -eq 0 ]; then
+    echo "Python程序返回0,退出循环。"
+    break
+  fi
+  echo "Python程序返回非0值,等待五分钟后再次调用。"
+  sleep 300
+  current_hour=$(date +%H)
+  current_minute=$(date +%M)
+  if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+    echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
+    exit 1
+  fi
+done
+# 3 user 生产数据
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_09_user2redis \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+--conf spark.yarn.executor.memoryoverhead=128 \
+/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+date:${today_early_1} tablePart:32 expireDay:3 ifDebug:False \
+ifUser:True ifDeleteRedisUser:False ifWriteRedisUser:True sampleRate:1.0 midDays:6 \
+savePathUser:/dw/recommend/model/feature/user/
+
+if [ $? -eq 1 ]; then
+    echo "user写入redis执行失败"
+    exit 1
+else
+    echo "user写入redis执行成功"
+fi
+
+#nohup sh 06_update_everyday_feature.sh > p.log 2>&1 &

+ 24 - 0
zhangbo/utils.py

@@ -45,6 +45,26 @@ def check_origin_hive(args):
         exit(1)
     else:
         print("0")
+def check_item_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_video_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+def check_user_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_user_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
 
 
 if __name__ == '__main__':
@@ -54,6 +74,10 @@ if __name__ == '__main__':
     args = parser.parse_args()
     if args.excute_program == "check_origin_hive":
         check_origin_hive(args)
+    if args.excute_program == "check_item_hive":
+        check_item_hive(args)
+    if args.excute_program == "check_user_hive":
+        check_user_hive(args)
     else:
         print("无合法参数,验证失败。")
         exit(999)