浏览代码

feat:修改分析脚本

zhaohaipeng 2 月之前
父节点
当前提交
e6a1019b9c

+ 27 - 41
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_45_feature_cover_degree.scala

@@ -22,53 +22,39 @@ object makedata_recsys_45_feature_cover_degree {
     val resource = getClass.getClassLoader.getResource(featureNameFile)
     val fileContent = FileUtils.readFile(resource)
 
-    // 解析特征名
-    val featureNameSet = fileContent.split("\n")
-      .map(_.trim)
-      .filter(_.nonEmpty)
-      .toSet
+    // **优化点 1**: 用 `RDD.broadcast` 传递特征名
+    val featureNameSet = fileContent.split("\n").map(_.trim).filter(_.nonEmpty).toSet
     val featureNameSet_br = sc.broadcast(featureNameSet)
 
-    // 读取数据
-    val data = sc.textFile(readPath)
-
-    // 使用 Accumulator 代替 HashMap
-    val allMapAcc = sc.collectionAccumulator[(String, Long)]("AllMap")
-    val isShareMapAcc = sc.collectionAccumulator[(String, Long)]("IsShareMap")
-    val isReturnNoSelfMapAcc = sc.collectionAccumulator[(String, Long)]("IsReturnNoSelfMap")
-
-    data.foreach { line =>
+    // **优化点 2**: `map` 而不是 `foreach`,让 Spark 计算
+    val featureStatsRDD = sc.textFile(readPath).map { line =>
       val allMap = new util.HashMap[String, java.lang.Long]()
       val isShareMap = new util.HashMap[String, java.lang.Long]()
       val isReturnNoSelfMap = new util.HashMap[String, java.lang.Long]()
 
-      // 调用 Java 方法处理
+      // 解析特征覆盖度
       StatisticsUtil.featureCoverRate(line, featureNameSet_br.value.asJava, allMap, isShareMap, isReturnNoSelfMap)
 
-      // 将结果累加到 Accumulator
-      allMap.asScala.foreach { case (key, value) => allMapAcc.add((key, value)) }
-      isShareMap.asScala.foreach { case (key, value) => isShareMapAcc.add((key, value)) }
-      isReturnNoSelfMap.asScala.foreach { case (key, value) => isReturnNoSelfMapAcc.add((key, value)) }
-    }
-
-    // **转换 Accumulator 数据**
-    val allMapSeq = allMapAcc.value.asScala.toSeq
-    val isShareMapSeq = isShareMapAcc.value.asScala.toSeq
-    val isReturnNoSelfMapSeq = isReturnNoSelfMapAcc.value.asScala.toSeq
-
-    // **使用 groupBy 聚合**
-    val finalAllMap = allMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
-    val finalIsShareMap = isShareMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
-    val finalIsReturnNoSelfMap = isReturnNoSelfMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
-
-    // **输出最终结果**
-    println("AllMap 结果:")
-    finalAllMap.foreach { case (key, value) => println(s"$key -> $value") }
-
-    println("IsShareMap 结果:")
-    finalIsShareMap.foreach { case (key, value) => println(s"$key -> $value") }
-
-    println("IsReturnNoSelfMap 结果:")
-    finalIsReturnNoSelfMap.foreach { case (key, value) => println(s"$key -> $value") }
+      // **优化点 3**: 用 `Iterator` 直接返回 `RDD`
+      (
+        allMap.asScala.toSeq.map { case (k, v) => (("allMap", k), v) } ++
+          isShareMap.asScala.toSeq.map { case (k, v) => (("isShareMap", k), v) } ++
+          isReturnNoSelfMap.asScala.toSeq.map { case (k, v) => (("isReturnNoSelfMap", k), v) }
+        ).iterator
+    }.flatMap(identity)  // **重要**: 展开 `Iterator`
+
+    // **优化点 4**: `reduceByKey` 直接在 Worker 端聚合数据
+    val aggregatedFeatureStatsRDD = featureStatsRDD
+      .reduceByKey(_ + _)
+
+    // **优化点 5**: 分别提取 `allMap`, `isShareMap`, `isReturnNoSelfMap`
+    val allMap = aggregatedFeatureStatsRDD.filter(_._1._1 == "allMap").map { case ((_, key), value) => (key, value) }.collectAsMap()
+    val isShareMap = aggregatedFeatureStatsRDD.filter(_._1._1 == "isShareMap").map { case ((_, key), value) => (key, value) }.collectAsMap()
+    val isReturnNoSelfMap = aggregatedFeatureStatsRDD.filter(_._1._1 == "isReturnNoSelfMap").map { case ((_, key), value) => (key, value) }.collectAsMap()
+
+    // **优化点 6**: 用 `.mkString` 方式减少 `println` 造成的性能影响
+    println(s"AllMap 结果:\n${allMap.mkString("\n")}")
+    println(s"IsShareMap 结果:\n${isShareMap.mkString("\n")}")
+    println(s"IsReturnNoSelfMap 结果:\n${isReturnNoSelfMap.mkString("\n")}")
   }
-}
+}