Browse Source

feat:添加分桶脚本

zhaohaipeng 1 month ago
parent
commit
fedd1014cb

+ 21 - 0
src/main/java/examples/utils/StatisticsUtil.java

@@ -74,6 +74,27 @@ public class StatisticsUtil {
         }
     }
 
+    public static void featureCoverRateByLibSvm(String record, Set<String> featureNames, Map<String, Long> allMap) {
+        String[] rSplit = record.split("\t");
+        // 异常样本统计
+        if (rSplit.length != 3) {
+            StatisticsUtil.mapKeyAddOne(allMap, "errorSampleNum");
+            return;
+        }
+
+        StatisticsUtil.mapKeyAddOne(allMap, "recommendSampleNum");
+        String[] features = rSplit[2].split("\t");
+        for (String feature : features) {
+            String[] split = feature.split(":");
+            String key = split[0];
+            String value = split[1];
+            if (Double.parseDouble(value) > 0) {
+                StatisticsUtil.mapKeyAddOne(allMap, key);
+            }
+        }
+
+    }
+
     public static boolean isRecommendScene(String page, String recommendPageType) {
         if (StringUtils.equals("详情后沉浸页", page)) {
             return true;

+ 3 - 3
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/v20250218/makedata_recsys_43_bucketData_20250218.scala

@@ -74,13 +74,13 @@ object makedata_recsys_43_bucketData_20250218 {
         .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
+            (logKey, label, features)
         }
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
           row.foreach {
-            case (label, features) =>
+            case (logKey, label, features) =>
               val featuresBucket = features.map {
                 case (name, score) =>
                   var ifFilter = false
@@ -109,7 +109,7 @@ object makedata_recsys_43_bucketData_20250218 {
                     }
                   }
               }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
+              result.add(logKey + "\t" + label + "\t" + featuresBucket.mkString("\t"))
           }
           result.iterator
         })

+ 52 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/v20250218/makedata_recsys_45_feature_cover_degree_libsvm.scala

@@ -0,0 +1,52 @@
+package com.aliyun.odps.spark.examples.makedata_recsys.v20250218
+
+import com.aliyun.odps.spark.examples.myUtils.{FileUtils, ParamUtils}
+import examples.utils.StatisticsUtil
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import scala.collection.JavaConverters._
+
+object makedata_recsys_45_feature_cover_degree_libsvm {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/43_recsys_train_data_20250218/*")
+    val featureNameFile = param.getOrElse("featureNameFile", "feature_name_20250218.txt")
+
+    val resource = getClass.getClassLoader.getResource(featureNameFile)
+    val fileContent = FileUtils.readFile(resource)
+
+    // **优化点 1**: 用 `RDD.broadcast` 传递特征名
+    val featureNameSet = fileContent.split("\n").map(_.trim).filter(_.nonEmpty).toSet
+    val featureNameSet_br = sc.broadcast(featureNameSet)
+
+    // **优化点 2**: `map` 而不是 `foreach`,让 Spark 计算
+    val featureStatsRDD = sc.textFile(readPath).map { line =>
+      val allMap = new util.HashMap[String, java.lang.Long]()
+
+      // 解析特征覆盖度
+      StatisticsUtil.featureCoverRateByLibSvm(line, featureNameSet_br.value.asJava, allMap)
+
+      // **优化点 3**: 用 `Iterator` 直接返回 `RDD`
+      (
+        allMap.asScala.toSeq.map { case (k, v) => (("allMap", k), v) }
+        ).iterator
+    }.flatMap(identity) // **重要**: 展开 `Iterator`
+
+    // **优化点 4**: `reduceByKey` 直接在 Worker 端聚合数据
+    val aggregatedFeatureStatsRDD = featureStatsRDD
+      .reduceByKey(_ + _)
+
+    // **优化点 5**: 分别提取 `allMap`, `isShareMap`, `isReturnNoSelfMap`
+    val allMap = aggregatedFeatureStatsRDD.filter(_._1._1 == "allMap").map { case ((_, key), value) => (key, value) }.collectAsMap()
+
+    // **优化点 6**: 用 `.mkString` 方式减少 `println` 造成的性能影响
+    println(s"AllMap 结果:\n${allMap.mkString("\n")}")
+  }
+}