浏览代码

repart 更新

zhangbo 10 月之前
父节点
当前提交
857327c88f

+ 19 - 22
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_15_bucket_20240608.scala

@@ -41,6 +41,7 @@ object makedata_15_bucket_20240608 {
     val readPath = param.getOrElse("readPath", "/dw/recommend/model/14_feature_data/20240607")
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/15_bucket_data/")
     val fileName = param.getOrElse("fileName", "20240607_200")
+    val sampleRate = param.getOrElse("sampleRate", "0.1").toDouble
     val bucketNum = param.getOrElse("bucketNum", "200").toInt
 
     val data = sc.textFile(readPath)
@@ -48,37 +49,33 @@ object makedata_15_bucket_20240608 {
       val rList = r.split("\t")
       val doubles = rList(2).split(",").map(_.toDouble)
       doubles
-    })
+    }).sample(false, sampleRate).collect()
 
     val result = new ArrayBuffer[String]()
 
     for (i <- contentList.indices){
       println("特征:" + contentList(i))
-      val data2 = data1.map(r => r(i)).filter(_ > 1E-8).collect().sorted
-//      if (data2.map(_.toString).toSet.size < bucketNum*10){
-//        println("无法分桶:" + data2.map(_.toString).toSet.size.toString)
-//      }else{
-        val len = data2.length
-        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
-        val buffers = new ArrayBuffer[Double]()
+      val data2 = data1.map(r => r(i)).filter(_ > 1E-8).sorted
+      val len = data2.length
+      val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
+      val buffers = new ArrayBuffer[Double]()
 
-        var lastBucketValue = data2(0) // 记录上一个桶的切分点
-        for (j <- 0 until len by oneBucketNum) {
-          val d = data2(j)
-          if (j > 0 && d != lastBucketValue) {
-            // 如果当前切分点不同于上一个切分点,则保存当前切分点
-            buffers += d
-          }
-          lastBucketValue = d // 更新上一个桶的切分点
+      var lastBucketValue = data2(0) // 记录上一个桶的切分点
+      for (j <- 0 until len by oneBucketNum) {
+        val d = data2(j)
+        if (j > 0 && d != lastBucketValue) {
+          // 如果当前切分点不同于上一个切分点,则保存当前切分点
+          buffers += d
         }
+        lastBucketValue = d // 更新上一个桶的切分点
+      }
 
-        // 最后一个桶的结束点应该是数组的最后一个元素
-        if (!buffers.contains(data2.last)) {
-          buffers += data2.last
-        }
-        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
+      // 最后一个桶的结束点应该是数组的最后一个元素
+      if (!buffers.contains(data2.last)) {
+        buffers += data2.last
       }
-//    }
+      result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
+    }
     val data3 = sc.parallelize(result)
 
 

+ 8 - 8
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -69,19 +69,19 @@ savePath:/dw/recommend/model/04_str_data/ beginStr:20240311 endStr:20240312 feat
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_13_originData_20240529 \
---master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+--master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 \
-beginStr:2024060700 endStr:2024060723 \
+tablePart:32 \
+beginStr:2024060706 endStr:2024060715 \
 table:alg_recsys_sample_all \
-> p13_data0607.log 2>&1 &
+> p13_data060706.log 2>&1 &
 
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_14_valueData_20240608 \
---master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+--master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-beginStr:20240606 endStr:20240607 repartition:1000 \
+beginStr:20240606 endStr:20240606 repartition:1000 \
 > p14_data.log 2>&1 &
 
 
@@ -96,9 +96,9 @@ bucketNum:200 \
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_16_bucketData_20240609 \
---master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+--master yarn --driver-memory 1G --executor-memory 4G --executor-cores 1 --num-executors 16 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-beginStr:20240606 endStr:20240607 repartition:1000 \
+beginStr:20240606 endStr:20240606 repartition:1000 \
 > p16_data.log 2>&1 &