Parcourir la source

Update makedata_ad_33_bucketData_20250110: use custom coalescer to avoid necessary shuffle

StrayWarrior il y a 3 mois
Parent
commit
d5ea7d0fd0

+ 25 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketData_20250110.scala

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
 import examples.extractor.ExtractorUtils
 import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.rdd.{PartitionCoalescer, PartitionGroup, RDD}
 import org.apache.spark.sql.SparkSession
 
 import scala.collection.JavaConversions._
@@ -14,6 +15,27 @@ import scala.io.Source
  */
 
 object makedata_ad_33_bucketData_20250110 {
+  class RandomUniformCoalescer extends PartitionCoalescer {
+    override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
+      val parentPartitions = parent.getNumPartitions
+      val shuffledPartitions = parent.partitions.toList
+      scala.util.Random.shuffle(shuffledPartitions)
+      val shufflePartitionsArray = shuffledPartitions.toArray
+      val expectNumPartitions = math.min(maxPartitions, parentPartitions)
+      // 避免出现最后一个group过大导致长尾现象,故groupSize放缩至更大的值
+      val groupSize = (parentPartitions + expectNumPartitions - 1) / expectNumPartitions
+      val numPartitions = (parentPartitions + groupSize - 1) / groupSize
+      val groups = new Array[PartitionGroup](numPartitions)
+      for (i <- 0 until numPartitions) {
+        val start = i * groupSize
+        val end = math.min(start + groupSize, parentPartitions)
+        groups(i) = new PartitionGroup()
+        groups(i).partitions += shufflePartitionsArray.slice(start, end)
+      }
+      groups
+    }
+  }
+
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession
@@ -118,7 +140,9 @@ object makedata_ad_33_bucketData_20250110 {
       if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
         println("删除路径并开始数据写入:" + hdfsPath)
         MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        data.coalesce(repartition, shuffle = false, partitionCoalescer = Option(new RandomUniformCoalescer()))
+          .saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        // data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
       } else {
         println("路径不合法,无法写入:" + hdfsPath)
       }