Ver Fonte

Seperate RandomUniformCoalescer

StrayWarrior há 3 meses atrás
pai
commit
f1a6085f07

+ 1 - 21
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketData_20250110.scala

@@ -2,6 +2,7 @@ package com.aliyun.odps.spark.examples.makedata_ad.v20240718
 
 import com.alibaba.fastjson.JSON
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
+import com.tzld.piaoquan.ad.spark.common.RandomUniformCoalescer
 import examples.extractor.ExtractorUtils
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.rdd.{PartitionCoalescer, PartitionGroup, RDD}
@@ -15,27 +16,6 @@ import scala.io.Source
  */
 
 object makedata_ad_33_bucketData_20250110 {
-  class RandomUniformCoalescer extends PartitionCoalescer {
-    override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
-      val parentPartitions = parent.getNumPartitions
-      val shuffledPartitions = parent.partitions.toList
-      scala.util.Random.shuffle(shuffledPartitions)
-      val shufflePartitionsArray = shuffledPartitions.toArray
-      val expectNumPartitions = math.min(maxPartitions, parentPartitions)
-      // 避免出现最后一个group过大导致长尾现象,故groupSize放缩至更大的值
-      val groupSize = (parentPartitions + expectNumPartitions - 1) / expectNumPartitions
-      val numPartitions = (parentPartitions + groupSize - 1) / groupSize
-      val groups = new Array[PartitionGroup](numPartitions)
-      for (i <- 0 until numPartitions) {
-        val start = i * groupSize
-        val end = math.min(start + groupSize, parentPartitions)
-        groups(i) = new PartitionGroup()
-        groups(i).partitions.appendAll(shufflePartitionsArray.slice(start, end))
-      }
-      groups
-    }
-  }
-
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession

+ 22 - 0
src/main/scala/com/tzld/piaoquan/ad/spark/common/RandomUniformCoalescer.scala

@@ -0,0 +1,22 @@
+package com.tzld.piaoquan.ad.spark.common
+import scala.util.Random
+import org.apache.spark.rdd.{PartitionCoalescer, PartitionGroup, RDD}
+
+class RandomUniformCoalescer extends PartitionCoalescer with Serializable  {
+  override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
+    val parentPartitions = parent.getNumPartitions
+    val shufflePartitions = Random.shuffle(parent.partitions.toList).toArray
+    val expectNumPartitions = math.min(maxPartitions, parentPartitions)
+    // 避免出现最后一个group过大导致长尾现象,故groupSize放缩至更大的值
+    val groupSize = (parentPartitions + expectNumPartitions - 1) / expectNumPartitions
+    val numPartitions = (parentPartitions + groupSize - 1) / groupSize
+    val groups = new Array[PartitionGroup](numPartitions)
+    for (i <- 0 until numPartitions) {
+      val start = i * groupSize
+      val end = math.min(start + groupSize, parentPartitions)
+      groups(i) = new PartitionGroup()
+      groups(i).partitions.appendAll(shufflePartitions.slice(start, end))
+    }
+    groups
+  }
+}