xueyiming před 1 měsícem
rodič
revize
5e28128a4d

+ 132 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -0,0 +1,132 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+
+/*
+   diff data
+ */
+
+object diff_data_20250319 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    val odpsData1 = odpsOps.readTable(project = "loghubods",
+      table = "ad_easyrec_eval_data_v3_sampled",
+      partition = "dt=20250319",
+      transfer = func,
+      numPartition = 64)
+
+    val odpsData2 = odpsOps.readTable(project = "loghubods",
+      table = "alg_recsys_ad_sample_all",
+      partition = "dt=20250319",
+      transfer = func,
+      numPartition = 64)
+
+    var result: List[String] = List.empty
+
+    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
+
+    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
+
+    // 以 logkey 为键进行转换
+    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
+    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
+
+
+
+    // 进行 join 操作
+    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
+
+    // 比较相同 logkey 对应的 Map 中相同键的 value
+    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+      println(logkey)
+      println(map1)
+      println(map2)
+      return
+//      map1.foreach { case (key, value1) =>
+//        if (key != "logkey") {
+//          if(value1 == "\\N")
+//
+//          if (map2.contains(key) && value1 != map2(key)) {
+//            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
+//            println(res)
+//            result = result :+ res
+//          } else if (!map2.contains(key)) {
+//            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
+//            println(res)
+//            result = result :+ res
+//          }
+//        }
+//      }
+    }
+
+    // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
+//    val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
+//    rdd1Only.foreach { case (logkey, map) =>
+//      val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
+//      println(res)
+//      result = result :+ res
+//    }
+//
+//    // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
+//    val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
+//    rdd2Only.foreach { case (logkey, map) =>
+//      val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
+//      println(res)
+//      result = result :+ res
+//    }
+//
+//    result = result :+ "test"
+//    println(result)
+//    val rdd = sc.parallelize(result)
+
+
+  }
+
+  def func(record: Record, schema: TableSchema): Map[String, String] = {
+    var map: Map[String, String] = Map.empty
+    val columns = schema.getColumns
+    for (i <- 0 until columns.size()) {
+      val column = columns.get(i)
+      val name = column.getName
+      val value = Option(record.get(name)).map(_.toString).getOrElse("")
+      map += (name -> value)
+    }
+    map
+  }
+
+  private def processString(input: String): Map[String, String] = {
+    // 去除多余空格并按空格分割成键值对数组
+    val parts = input.trim.split("\t")
+    var resultMap = Map[String, String]()
+    // 处理第一个元素,假设为特殊标识
+    resultMap += ("has_conversion" -> parts(0))
+    // 处理后续的键值对
+    parts.drop(1).foreach { part =>
+      part.split(":", 2) match {
+        case Array(keyStr, value) =>
+          var key = keyStr.replace("*", "_x_").replace("(view)", "_view")
+          if (key == "ad_is_click") {
+            key = "has_click"
+          }
+          resultMap += (key -> value)
+        case _ => // 若无法解析成键值对则丢弃
+      }
+    }
+    resultMap
+  }
+
+
+}