소스 검색

增加对比任务

xueyiming 2 달 전
부모
커밋
83c070834d
1개의 변경된 파일33개의 추가작업 그리고 1개의 파일을 삭제
  1. 33 1
      src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20240718.scala

+ 33 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20240718.scala

@@ -4,6 +4,7 @@ import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.{ParamUtils, env}
 import examples.utils.AdUtil
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 
 
@@ -25,7 +26,7 @@ object diff_data_20240718 {
     val table = param.getOrElse("table", "ad_easyrec_train_data_v1")
     val partition = "dt=20250101"
 
-    val readPath = param.getOrElse("readPath", "/test/33_ad_train_data/20250101")
+    val readPath = param.getOrElse("readPath", "/test/33_ad_train_data/20250213*")
     val data = sc.textFile(readPath)
     val hdfsData = data.map(r => {
       val map = processString(r)
@@ -42,6 +43,37 @@ object diff_data_20240718 {
       numPartition = 64)
 
 
+    // 假设用于匹配的 key 是 "id"
+    val matchKey = "logkey"
+
+    // 将 RDD 转换为以 matchKey 为键的键值对 RDD
+    val rdd1Pairs: RDD[(String, Map[String, String])] = hdfsData.map(map => (map(matchKey), map))
+    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData.map(map => (map(matchKey), map))
+
+    // 进行 join 操作,将相同 id 的 Map 组合在一起
+    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
+
+    // 对比相同 id 的 Map 中相同 key 的 value
+    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+      map1.foreach { case (key, value1) =>
+        if (map2.contains(key) && value1 != map2(key)) {
+          println(s"logkey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}")
+        }
+      }
+    }
+
+    // 找出 rdd1 中存在但 rdd2 中不存在的 Map
+    val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
+    rdd1Only.foreach { case (logkey, map) =>
+      println(s"logkey: $logkey, Map only exists in RDD1: $map")
+    }
+
+    // 找出 rdd2 中存在但 rdd1 中不存在的 Map
+    val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
+    rdd2Only.foreach { case (logkey, map) =>
+      println(s"logkey: $logkey, Map only exists in RDD2: $map")
+    }
+
 
   }