Sfoglia il codice sorgente

Update makedata_ad_33_bucketDataFromOriginToHive_20250228: split data output

StrayWarrior 5 giorni fa
parent
commit
3747b71b22

+ 11 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250228.scala

@@ -39,12 +39,15 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
     val project = param.getOrElse("project", "loghubods")
     val inputTable = param.getOrElse("inputTable", "alg_recsys_ad_sample_all")
     val outputTable = param.getOrElse("outputTable", "ad_easyrec_train_data_v1_sampled")
+    val outputTable2 = param.getOrElse("outputTable2", "")
     val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
     val idDefaultValue = param.getOrElse("idDefaultValue", "1.0").toDouble
     val filterNames = param.getOrElse("filterNames", "").split(",").filter(_.nonEmpty).toSet
     val filterAdverIds = param.getOrElse("filterAdverIds", "").split(",").filter(_.nonEmpty).toSet
     val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
     val negSampleRate = param.getOrElse("negSampleRate", "1").toDouble
+    // 分割样本集的比例,splitRate部分输出至outputTable,补集输出至outputTable2(如果outputTable2不为空)
+    val splitRate = param.getOrElse("splitRate", "0.9").toDouble
 
     val loader = getClass.getClassLoader
     val resourceUrlBucket = loader.getResource("20250217_ad_bucket_688.txt")
@@ -587,7 +590,14 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
         }.coalesce(128)
 
       val partition = s"dt=$dt"
-      odpsOps.saveToTable(project, outputTable, partition, recordRdd, write, defaultCreate = true, overwrite = true)
+      if (outputTable2.isEmpty) {
+        odpsOps.saveToTable(project, outputTable, partition, recordRdd, write, defaultCreate = true, overwrite = true)
+      } else {
+        // 固定seed以保证可重入
+        val splitRdds = recordRdd.randomSplit(Array(splitRate, 1 - splitRate), seed = dt.toLong)
+        odpsOps.saveToTable(project, outputTable, partition, splitRdds(0), write, defaultCreate = true, overwrite = true)
+        odpsOps.saveToTable(project, outputTable2, partition, splitRdds(1), write, defaultCreate = true, overwrite = true)
+      }
     }
   }