浏览代码

推荐模型自动化更新-特征分桶数据生产

Joe 10 月之前
父节点
当前提交
771aed204a
共有 2 个文件被更改,包括 34 次插入16 次删除
  1. 23 12
      qiaojialiang/checkHiveDataUtil.py
  2. 11 4
      qiaojialiang/handle_rov.sh

+ 23 - 12
qiaojialiang/checkHiveDataUtil.py

@@ -19,23 +19,34 @@ def check_origin_hive(args):
     # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理
     # 如果全都为空报警
     time_sequence = generate_time_sequence(beginStr, endStr)
-    exist_partition = []
+    # exist_partition = []
     for time_str in time_sequence:
         result = split_date_time(time_str)
         partitionDt = result[0]
         partitionHh = result[1]
         count = check_data(project, table, partitionDt, partitionHh)
-        if count != 0:
-            exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')
-    if len(exist_partition) == 0:
-        print('1')
-        exit(1)
-    else:
-        bot = FeishuBot()
-        msg = (
-            f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
-        # bot.send_message(msg)
-        print('0')
+        if count == 0:
+            bot = FeishuBot()
+            # msg = (
+            #     f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
+            msg = (
+                f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:error\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}分区数据不存在,继续检查')
+            bot.send_message(msg)
+            print('1')
+            exit(1)
+        else:
+            continue
+    print('0')
+        # exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')
+# if len(exist_partition) == 0:
+#     print('1')
+#     exit(1)
+# else:
+#     bot = FeishuBot()
+#     msg = (
+#         f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
+#     bot.send_message(msg)
+# print('0')
 
 
 def check_data(project, table, partitionDt, partitionDtHh) -> int:

+ 11 - 4
qiaojialiang/handle_rov.sh

@@ -11,6 +11,10 @@ beginStr="$(date -d '1 days ago' +%Y%m%d)"
 endStr="$(date -d '1 days ago' +%Y%m%d)"
 beginHhStr=08
 endHhStr=08
+# 各节点产出hdfs文件绝对路径
+originDataPath=/dw/recommend/model/13_sample_data/
+valueDataPath=/dw/recommend/model/14_feature_data/
+bucketDataPath=/dw/recommend/model/16_train_data/
 
 # 0 判断上游表是否生产完成,最长等待到12点
 # shellcheck disable=SC2039
@@ -42,7 +46,7 @@ echo "----------step2------------开始根据${table}生产原始数据"
 ../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 tablePart:64 repartition:32 \
 beginStr:${beginStr}${beginHhStr} endStr:${endStr}${endHhStr} \
-savePath:/dw/recommend/model/13_sample_data/ \
+savePath:${originDataPath} \
 table:${table}
 if [ $? -ne 0 ]; then
    echo "Spark原始样本生产任务执行失败"
@@ -53,12 +57,13 @@ fi
 
 
 # 2 特征值拼接
+echo "----------step3------------开始特征值拼接"
 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata_qiao.makedata_14_valueData_20240705 \
 --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 32 \
 ../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:/dw/recommend/model/13_sample_data/ \
-savePath:/dw/recommend/model/14_feature_data/ \
+readPath:${originDataPath} \
+savePath:${valueDataPath} \
 beginStr:${beginStr} endStr:${endStr} repartition:1000
 if [ $? -ne 0 ]; then
    echo "Spark特征值拼接处理任务执行失败"
@@ -68,11 +73,13 @@ else
 fi
 
 # 3 特征分桶
-echo "----------step3------------根据特征分桶生产重打分特征数据"
+echo "----------step4------------根据特征分桶生产重打分特征数据"
 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata_qiao.makedata_16_bucketData_20240705 \
 --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
 ../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+readPath:${valueDataPath} \
+savePath:${bucketDataPath} \
 beginStr:${beginStr} endStr:${endStr} repartition:1000
 if [ $? -ne 0 ]; then
    echo "Spark特征分桶处理任务执行失败"