Преглед на файлове

增加测试两个表是否相同

xueyiming преди 2 месеца
родител
ревизия
87a57de35b
променени са 1 файла, в които са добавени 140 реда и са изтрити 0 реда
  1. 140 0
      src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_1_20240718.scala

+ 140 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_1_20240718.scala

@@ -0,0 +1,140 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+
+/*
+   diff data
+ */
+
+object diff_data_1_20240718 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "ad_easyrec_train_data_v1")
+
+    val savePath = param.getOrElse("savePath", "/test/diff")
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    val odpsData1 = odpsOps.readTable(project = project,
+      table = table,
+      partition = "dt=20250101",
+      transfer = func,
+      numPartition = 64)
+
+    val odpsData2 = odpsOps.readTable(project = project,
+      table = table,
+      partition = "dt=20250216",
+      transfer = func,
+      numPartition = 64)
+
+    var result: List[String] = List.empty
+
+    result = result :+ "odpsData size =" + odpsData1.count();
+
+    result = result :+ "hdfsData size =" + odpsData2.count();
+
+    // 以 logkey 为键进行转换
+    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
+    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => (map("logkey"), map))
+
+    // 进行 join 操作
+    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
+
+    // 比较相同 logkey 对应的 Map 中相同键的 value
+    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+      map1.foreach { case (key, value1) =>
+        if (key != "logkey") {
+          if (map2.contains(key) && value1 != map2(key)) {
+            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
+            println(res)
+            result = result :+ res
+          } else if (!map2.contains(key)) {
+            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
+            println(res)
+            result = result :+ res
+          }
+        }
+      }
+    }
+
+    // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
+    val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
+    rdd1Only.foreach { case (logkey, map) =>
+      val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
+      println(res)
+      result = result :+ res
+    }
+
+    // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
+    val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
+    rdd2Only.foreach { case (logkey, map) =>
+      val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
+      println(res)
+      result = result :+ res
+    }
+
+    result = result :+ "test"
+    println(result)
+    val rdd = sc.parallelize(result)
+
+    val hdfsPath = savePath
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/test/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      rdd.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+
+
+  }
+
+  def func(record: Record, schema: TableSchema): Map[String, String] = {
+    var map: Map[String, String] = Map.empty
+    val columns = schema.getColumns
+    for (i <- 0 until columns.size()) {
+      val column = columns.get(i)
+      val name = column.getName
+      val value = Option(record.get(name)).map(_.toString).getOrElse("")
+      map += (name -> value)
+    }
+    map
+  }
+
+  private def processString(input: String): Map[String, String] = {
+    // 去除多余空格并按空格分割成键值对数组
+    val parts = input.trim.split("\t")
+    var resultMap = Map[String, String]()
+    // 处理第一个元素,假设为特殊标识
+    resultMap += ("has_conversion" -> parts(0))
+    // 处理后续的键值对
+    parts.drop(1).foreach { part =>
+      part.split(":", 2) match {
+        case Array(keyStr, value) =>
+          var key = keyStr.replace("*", "_x_").replace("(view)", "_view")
+          if (key == "ad_is_click") {
+            key = "has_click"
+          }
+          resultMap += (key -> value)
+        case _ => // 若无法解析成键值对则丢弃
+      }
+    }
+    resultMap
+  }
+
+
+}