Browse Source

解决数据膨胀问题

xueyiming 2 weeks ago
parent
commit
e97e33de63

+ 30 - 18
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_addFeatureToHive_20250708.scala

@@ -31,37 +31,49 @@ object makedata_ad_33_addFeatureToHive_20250708 {
         table = "ad_easyrec_train_realtime_data_v3_sampled",
         partition = partition,
         transfer = func,
-        numPartition = tablePart)
+        numPartition = tablePart
+      )
 
-      // 读取特征表
+      // 读取特征表,并按 mid 去重(保留最后一条)
       val featureRdd = odpsOps.readTable(
-        project = project, // 替换为实际项目名
+        project = project,
         table = "user_conver_ad_class_temp",
-        partition = partition, // 替换为实际分区
+        partition = partition,
         transfer = func,
-        numPartition = tablePart // 特征表通常较小
+        numPartition = tablePart
       )
 
-      // 将 RDD 转换为键值对,保留完整元组
-      val samplePairRdd = sampleRdd.map(row => (row._1, row)) // (mid, 完整样本元组)
-      val featurePairRdd = featureRdd.map(row => (row._1, row)) // (mid, 完整特征元组)
+      // 特征表去重(确保每个 mid 唯一)
+      val featureRddDistinct = featureRdd
+        .map(row => (row._1, row))
+        .reduceByKey((a, b) => b) // 保留最后一条
+        .map(_._2)
+
+      // 转换为 PairRDD
+      val samplePairRdd = sampleRdd.map(row => (row._1, row))
+      val featurePairRdd = featureRddDistinct.map(row => (row._1, row))
 
-      // 3. 基于 mid 关联两个 RDD,并合并 Map
+      // 左外连接(确保样本表行数不变)
       val recordRdd = samplePairRdd.leftOuterJoin(featurePairRdd).map {
         case (mid, (sampleMap, featureOption)) =>
-          val sampleData = sampleMap._2 // 提取样本数据的 Map
+          val sampleData = sampleMap._2
+          val featureData = featureOption.map(_._2).getOrElse(Map.empty[String, String])
 
-          // 处理特征数据(可能不存在)
-          val featureData = featureOption match {
-            case Some(featureMap) => featureMap._2 // 有匹配的特征数据
-            case None => Map.empty[String, String] // 无匹配数据,使用空 Map
-          }
-
-          // 合并两个 Map,重复键以 featureData 为准
+          // 合并 Map,添加 mid 字段
           val mergedMap = sampleData ++ featureData
           mergedMap
       }
-      odpsOps.saveToTable(project, "ad_easyrec_train_realtime_data_v3_sampled_temp", partition, recordRdd, write, defaultCreate = true, overwrite = true)
+
+      // 写入表
+      odpsOps.saveToTable(
+        project,
+        "ad_easyrec_train_realtime_data_v3_sampled_temp",
+        partition,
+        recordRdd,
+        write,
+        defaultCreate = true,
+        overwrite = true
+      )
     }