|
@@ -34,6 +34,23 @@ object diff_data_20250319 {
|
|
transfer = func,
|
|
transfer = func,
|
|
numPartition = 64)
|
|
numPartition = 64)
|
|
|
|
|
|
|
|
+
|
|
|
|
+ val rdd1 = odpsData1.first()
|
|
|
|
+ val rdd2 = odpsData2.first()
|
|
|
|
+
|
|
|
|
+ println("rdd1")
|
|
|
|
+ rdd1.foreach {
|
|
|
|
+ case (key, value) =>
|
|
|
|
+ println(key + ":" + value)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ println("rdd2")
|
|
|
|
+ rdd2.foreach {
|
|
|
|
+ case (key, value) =>
|
|
|
|
+ println(key + ":" + value)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
var result: List[String] = List.empty
|
|
var result: List[String] = List.empty
|
|
|
|
|
|
result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
|
|
result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
|
|
@@ -45,7 +62,6 @@ object diff_data_20250319 {
|
|
val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
|
|
val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
// 进行 join 操作
|
|
// 进行 join 操作
|
|
val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
|
|
val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
|
|
|
|
|
|
@@ -58,47 +74,47 @@ object diff_data_20250319 {
|
|
}
|
|
}
|
|
|
|
|
|
// 比较相同 logkey 对应的 Map 中相同键的 value
|
|
// 比较相同 logkey 对应的 Map 中相同键的 value
|
|
-// joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
|
-// println(logkey)
|
|
|
|
-// println(map1)
|
|
|
|
-// println(map2)
|
|
|
|
-// return
|
|
|
|
-// map1.foreach { case (key, value1) =>
|
|
|
|
-// if (key != "logkey") {
|
|
|
|
-// if(value1 == "\\N")
|
|
|
|
-//
|
|
|
|
-// if (map2.contains(key) && value1 != map2(key)) {
|
|
|
|
-// val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
|
-// println(res)
|
|
|
|
-// result = result :+ res
|
|
|
|
-// } else if (!map2.contains(key)) {
|
|
|
|
-// val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
|
|
|
|
-// println(res)
|
|
|
|
-// result = result :+ res
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
|
|
+ // joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
|
+ // println(logkey)
|
|
|
|
+ // println(map1)
|
|
|
|
+ // println(map2)
|
|
|
|
+ // return
|
|
|
|
+ // map1.foreach { case (key, value1) =>
|
|
|
|
+ // if (key != "logkey") {
|
|
|
|
+ // if(value1 == "\\N")
|
|
|
|
+ //
|
|
|
|
+ // if (map2.contains(key) && value1 != map2(key)) {
|
|
|
|
+ // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
|
+ // println(res)
|
|
|
|
+ // result = result :+ res
|
|
|
|
+ // } else if (!map2.contains(key)) {
|
|
|
|
+ // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
|
|
|
|
+ // println(res)
|
|
|
|
+ // result = result :+ res
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
|
|
// 检查 rdd1 中存在但 rdd2 中不存在的 logkey
|
|
// 检查 rdd1 中存在但 rdd2 中不存在的 logkey
|
|
-// val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
|
|
|
|
-// rdd1Only.foreach { case (logkey, map) =>
|
|
|
|
-// val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
|
|
|
|
-// println(res)
|
|
|
|
-// result = result :+ res
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
|
|
|
|
-// val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
|
|
|
|
-// rdd2Only.foreach { case (logkey, map) =>
|
|
|
|
-// val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
|
|
|
|
-// println(res)
|
|
|
|
-// result = result :+ res
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// result = result :+ "test"
|
|
|
|
-// println(result)
|
|
|
|
-// val rdd = sc.parallelize(result)
|
|
|
|
|
|
+ // val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
|
|
|
|
+ // rdd1Only.foreach { case (logkey, map) =>
|
|
|
|
+ // val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
|
|
|
|
+ // println(res)
|
|
|
|
+ // result = result :+ res
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
|
|
|
|
+ // val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
|
|
|
|
+ // rdd2Only.foreach { case (logkey, map) =>
|
|
|
|
+ // val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
|
|
|
|
+ // println(res)
|
|
|
|
+ // result = result :+ res
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // result = result :+ "test"
|
|
|
|
+ // println(result)
|
|
|
|
+ // val rdd = sc.parallelize(result)
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|