xueyiming 2 hónapja
szülő
commit
23c09a9529

+ 35 - 15
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20240718.scala

@@ -2,11 +2,14 @@ package com.aliyun.odps.spark.examples.makedata_ad.v20240718
 
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{ParamUtils, env}
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
 import examples.utils.AdUtil
+import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable.ArrayBuffer
+
 
 /*
    diff data
@@ -27,6 +30,8 @@ object diff_data_20240718 {
     val partition = "dt=20250101"
 
     val readPath = param.getOrElse("readPath", "/test/33_ad_train_data/20250213*")
+    val savePath = param.getOrElse("savePath", "/test/diff")
+
     val data = sc.textFile(readPath)
     val hdfsData = data.map(r => {
       val map = processString(r)
@@ -42,36 +47,51 @@ object diff_data_20240718 {
       transfer = func,
       numPartition = 64)
 
+    var result: List[String] = List.empty
 
-    // 假设用于匹配的 key 是 "id"
-    val matchKey = "logkey"
-
-    // 将 RDD 转换为以 matchKey 为键的键值对 RDD
-    val rdd1Pairs: RDD[(String, Map[String, String])] = hdfsData.map(map => (map(matchKey), map))
-    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData.map(map => (map(matchKey), map))
+    // 以 logkey 为键进行转换
+    val rdd1Pairs: RDD[(String, Map[String, String])] = hdfsData.map(map => (map("logkey"), map))
+    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData.map(map => (map("logkey"), map))
 
-    // 进行 join 操作,将相同 id 的 Map 组合在一起
+    // 进行 join 操作
     val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
 
-    // 对比相同 id 的 Map 中相同 key 的 value
+    // 比较相同 logkey 对应的 Map 中相同键的 value
     joinedRDD.foreach { case (logkey, (map1, map2)) =>
       map1.foreach { case (key, value1) =>
-        if (map2.contains(key) && value1 != map2(key)) {
-          println(s"logkey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}")
+        if (key != "logkey" && map2.contains(key) && value1 != map2(key)) {
+          val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
+          println(res)
+          result += res
         }
       }
     }
 
-    // 找出 rdd1 中存在但 rdd2 中不存在的 Map
+    // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
     val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
     rdd1Only.foreach { case (logkey, map) =>
-      println(s"logkey: $logkey, Map only exists in RDD1: $map")
+      val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
+      println(res)
+      result += res
     }
 
-    // 找出 rdd2 中存在但 rdd1 中不存在的 Map
+    // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
     val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
     rdd2Only.foreach { case (logkey, map) =>
-      println(s"logkey: $logkey, Map only exists in RDD2: $map")
+      val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
+      println(res)
+      result += res
+    }
+
+    val rdd = sc.parallelize(result)
+
+    val hdfsPath = savePath
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/test/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      rdd.repartition(100).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
     }