瀏覽代碼

feat:修改str特征生产脚本

zhaohaipeng 1 月之前
父節點
當前提交
b69ae826af

+ 5 - 3
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/v20250218/makedata_recsys_41_str_train_data_sample_20250319.scala

@@ -8,6 +8,7 @@ import examples.extractor.ExtractorUtils
 import examples.extractor.v20250218.ExtractFeature20250218
 import examples.extractor.v20250218.ExtractFeature20250218
 import examples.utils.{FestiveUtil, SimilarityUtils, StatisticsUtil}
 import examples.utils.{FestiveUtil, SimilarityUtils, StatisticsUtil}
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession
 
 
 import java.util
 import java.util
@@ -51,9 +52,10 @@ object makedata_recsys_41_str_train_data_sample_20250319 {
       s"dt=$dt,hh=$hh"
       s"dt=$dt,hh=$hh"
     }
     }
 
 
-    val odpsData = partitions.map { partition => {
+    var odpsData: RDD[String] = sc.emptyRDD[String] // 初始化空RDD
+    for (partition <- partitions) {
       println(s"开始读取分区: $partition")
       println(s"开始读取分区: $partition")
-      odpsOps.readTable(
+      val partitionData = odpsOps.readTable(
           project = project,
           project = project,
           table = table,
           table = table,
           partition = partition,
           partition = partition,
@@ -173,8 +175,8 @@ object makedata_recsys_41_str_train_data_sample_20250319 {
 
 
           })
           })
         })
         })
+      odpsData = odpsData.union(partitionData)
     }
     }
-    }.reduce(_ union _)
     println(s"odos count: " + odpsData.count())
     println(s"odos count: " + odpsData.count())
     // 4 保存数据到hdfs
     // 4 保存数据到hdfs
     val hdfsPath = savePath
     val hdfsPath = savePath