zhangbo 1 năm trước cách đây
mục cha
commit
33ee29eb97

+ 4 - 4
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -12,7 +12,7 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3 \
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-savePath:/dw/recommend/model/12_ros_data_v3/ beginStr:20240226 endStr:20240227 ifRepart:10 \
+savePath:/dw/recommend/model/12_ros_data_v3/ beginStr:20240227 endStr:20240227 ifRepart:10 \
 > p12_1.log 2>&1 &
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
@@ -27,16 +27,16 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --class com.aliyun.odps.spark.examples.makedata.makedata_11_strData_v3 \
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 64 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-savePath:/dw/recommend/model/11_str_data_v3/ beginStr:20240226 endStr:20240227 ifRepart:100 \
+savePath:/dw/recommend/model/11_str_data_v3/ beginStr:20240227 endStr:20240227 ifRepart:100 \
 > p11.log 2>&1 &
 
 
-
+[user写redis]
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_09_user2redis_freq \
 --name makedata_09_user2redis_freq \
 --master yarn --driver-memory 1G --executor-memory 4G --executor-cores 1 --num-executors 32 \
 --conf spark.yarn.executor.memoryoverhead=1024 \
 /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-date:20240227 tablePart:64 expireDay:3 ifWriteRedisUser:True ifUser:False midDays:15 redisLimit:50000000 \
+date:20240228 tablePart:64 expireDay:3 ifWriteRedisUser:True ifUser:True midDays:14 redisLimit:80000000 \
 savePathUser:/dw/recommend/model/09_feature/user/ > p09.log 2>&1 &

+ 2 - 2
zhangbo/02_train_go.sh

@@ -22,5 +22,5 @@ while [[ "$current_date" != "$end_date" ]]; do
     current_date=$(date -d "$current_date + 1 day" +%Y%m%d)
 done
 
-# nohup sh 02_train_go.sh 20240223 20240226 model_tom112 /dw/recommend/model/11_str_data_v3/ 1,1,2 >p2_model_tom112.log 2>&1 &
-# nohup sh 02_train_go.sh 20240223 20240226 model_jerry_noweight /dw/recommend/model/12_ros_data_v3_noweight/ 0,1,0 >p2_model_jerry_noweight.log 2>&1 &
+# nohup sh 02_train_go.sh 20240226 20240228 model_tom /dw/recommend/model/11_str_data_v3/ 0,1,0 >p2_model_tom.log 2>&1 &
+# nohup sh 02_train_go.sh 20240226 20240228 model_jerry /dw/recommend/model/12_ros_data_v3/ 0,1,0 >p2_model_jerry.log 2>&1 &

+ 151 - 0
zhangbo/05_update_everyday_2model.sh

@@ -0,0 +1,151 @@
+#!/bin/sh
+set -ex
+# 0 全局变量/参数
+samplePath=/dw/recommend/model/10_sample_data_v3/
+savePath=/dw/recommend/model/12_ros_data_v3/
+model_name=model_jerry
+today="$(date +%Y%m%d)"
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+yesterday="$(date -d '1 days ago' +%Y%m%d)"
+
+
+HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
+FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
+MODEL_PATH="/root/zhangbo/recommend-emr-dataprocess/zhangbo/model/"
+OSS_PATH="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/"
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# 0 判断上游表是否生产完成,最长等待到11点
+source /root/anaconda3/bin/activate py37
+max_hour=11
+max_minute=00
+while true; do
+  python_return_code=$(python utils.py --excute_program check_hive --partition ${today_early_1} --project loghubods --table alg_recsys_view_sample_v3)
+  if [ $python_return_code -eq 0 ]; then
+    echo "Python程序返回0,退出循环。"
+    break
+  fi
+  echo "Python程序返回非0值,等待五分钟后再次调用。"
+  sleep 300
+  current_hour=$(date +%H)
+  current_minute=$(date +%M)
+  if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+    echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
+    exit 1
+  fi
+done
+#conda deactivate
+
+# 1 生产数据
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_10_originData_v3 \
+--name every_day_origindata_${model_name}_${today} \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+tablePart:32 savePath:${samplePath} beginStr:${today_early_1} endStr:${today_early_1}
+if [ $? -eq 1 ]; then
+    echo "Spark原始样本生产任务执行失败"
+    exit 1
+else
+    echo "spark原始样本生产执行成功"
+fi
+
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3 \
+--name makedata_12_rosData_v3_${model_name}_${today} \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+readPath:${samplePath} savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1} ifRepart:10
+if [ $? -eq 1 ]; then
+    echo "Spark训练样本-生产任务执行失败-ros"
+    exit 1
+else
+    echo "spark训练样本-生产执行成功-ros"
+fi
+
+# 2 加载上次模型 训练本轮数据 保存本轮模型
+end_date=${today}
+loop_date=${yesterday}
+while [[ "$loop_date" != "$end_date" ]]; do
+    echo -------train ${loop_date}----------
+    loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d)
+    $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \
+-dim 0,1,0 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt
+    if [ $? -eq 1 ]; then
+        echo "训练失败"
+        exit 1
+    fi
+    echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt----------
+    loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d)
+done
+
+# 3 本轮模型格式转换
+cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
+| sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
+> ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
+
+# 4 转换后模型上传oss
+online_model_path=${OSS_PATH}/${model_name}.txt
+$HADOOP fs -test -e ${online_model_path}
+if [ $? -eq 0 ]; then
+    echo "数据存在, 先删除。"
+    $HADOOP fs -rm -r ${online_model_path}
+else
+    echo "数据不存在"
+fi
+$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
+
+
+# 5 str数据生产
+savePath=/dw/recommend/model/11_str_data_v3/
+model_name=model_tom
+
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_11_strData_v3 \
+--name makedata_11_strData_v3_${model_name}_${today} \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 64 \
+/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+readPath:${samplePath} savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1} ifRepart:100
+if [ $? -eq 1 ]; then
+    echo "Spark训练样本-生产任务执行失败-str"
+    exit 1
+else
+    echo "spark训练样本-生产执行成功-str"
+fi
+# 6 加载上次模型 训练本轮数据 保存本轮模型
+end_date=${today}
+loop_date=${yesterday}
+while [[ "$loop_date" != "$end_date" ]]; do
+    echo -------train ${loop_date}----------
+    loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d)
+    $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \
+-dim 0,1,0 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt
+    if [ $? -eq 1 ]; then
+        echo "训练失败"
+        exit 1
+    fi
+    echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt----------
+    loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d)
+done
+
+# 7 本轮模型格式转换
+cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
+| sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
+> ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
+
+# 8 转换后模型上传oss
+online_model_path=${OSS_PATH}/${model_name}.txt
+$HADOOP fs -test -e ${online_model_path}
+if [ $? -eq 0 ]; then
+    echo "数据存在, 先删除。"
+    $HADOOP fs -rm -r ${online_model_path}
+else
+    echo "数据不存在"
+fi
+$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
+
+
+#nohup sh 05_update_everyday_2model.sh > p05.log 2>&1 &

+ 0 - 56
zhangbo/05_update_everyday_ros.sh

@@ -1,56 +0,0 @@
-#!/bin/sh
-set -e
-set -x
-# 0 全局变量/参数
-savePath=/dw/recommend/model/ros_sample_v2/
-model_name=model_ros_v2
-today="$(date +%Y%m%d)"
-today_early_1="$(date -d '1 days ago' +%Y%m%d)"
-#yesterday="$(date -d '1 days ago' +%Y%m%d)"
-yesterday=20231227
-jar_main=$makedata_04_rosHdfsFromTablev2
-
-HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
-FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
-MODEL_PATH="/root/zhangbo/recommend-emr-dataprocess/zhangbo/model/"
-OSS_PATH="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_str_model"
-
-# 1 生产数据
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata.${jar_main} \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-/root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:32 savePath:${savePath} beginStr:${today_early_1} endStr:${today_early_1}
-
-if [ $? -eq 1 ]; then
-    echo "Spark任务执行失败"
-    exit 1
-fi
-
-# 2 加载上次模型 训练本轮数据 保存本轮模型
-end_date=${today}
-loop_date=${yesterday}
-while [[ "$loop_date" != "$end_date" ]]; do
-    echo -------train ${loop_date}----------
-    loop_date_model=$(date -d "$loop_date - 1 day" +%Y%m%d)
-    $HADOOP fs -text ${savePath}/dt=${loop_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${loop_date}.txt \
--dim 1,1,0 -core 8 -im ${MODEL_PATH}/${model_name}_${loop_date_model}.txt
-    if [ $? -eq 1 ]; then
-        echo "训练失败"
-        exit 1
-    fi
-    echo -------save ${MODEL_PATH}/${model_name}_${loop_date}.txt----------
-    loop_date=$(date -d "$loop_date + 1 day" +%Y%m%d)
-done
-
-# 3 本轮模型格式转换
-cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
-| sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
-> ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
-
-# 4 转换后模型上传oss
-$HADOOP fs -rm -r oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_str_model/${model_name}_change.txt
-$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${OSS_PATH}/${model_name}_change.txt
-
-
-#nohup sh 05_update_everyday_ros.sh > p.log 2>&1 &

+ 14 - 0
zhangbo/utils.py

@@ -66,12 +66,24 @@ def check_user_hive(args):
         exit(1)
     else:
         print("0")
+def check_hive(args):
+    project = args.project
+    table = args.table
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
 
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(description='脚本utils')
     parser.add_argument('--excute_program', type=str, help='执行程序')
     parser.add_argument('--partition', type=str, help='表分区')
+    parser.add_argument('--project', type=str, help='表空间')
+    parser.add_argument('--table', type=str, help='表名')
     args = parser.parse_args()
     if args.excute_program == "check_origin_hive":
         check_origin_hive(args)
@@ -79,6 +91,8 @@ if __name__ == '__main__':
         check_item_hive(args)
     elif args.excute_program == "check_user_hive":
         check_user_hive(args)
+    elif args.excute_program == "check_hive":
+            check_hive(args)
     else:
         print("无合法参数,验证失败。")
         exit(999)