|
@@ -59,10 +59,16 @@ object diff_data_20240718 {
|
|
// 比较相同 logkey 对应的 Map 中相同键的 value
|
|
// 比较相同 logkey 对应的 Map 中相同键的 value
|
|
joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
map1.foreach { case (key, value1) =>
|
|
map1.foreach { case (key, value1) =>
|
|
- if (key != "logkey" && map2.contains(key) && value1 != map2(key)) {
|
|
|
|
- val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
|
- println(res)
|
|
|
|
- result = result :+ res
|
|
|
|
|
|
+ if (key != "logkey") {
|
|
|
|
+ if (map2.contains(key) && value1 != map2(key)) {
|
|
|
|
+ val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
|
+ println(res)
|
|
|
|
+ result += res
|
|
|
|
+ } else if (!map2.contains(key)) {
|
|
|
|
+ val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
|
|
|
|
+ println(res)
|
|
|
|
+ result += res
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -89,7 +95,7 @@ object diff_data_20240718 {
|
|
if (hdfsPath.nonEmpty && hdfsPath.startsWith("/test/")) {
|
|
if (hdfsPath.nonEmpty && hdfsPath.startsWith("/test/")) {
|
|
println("删除路径并开始数据写入:" + hdfsPath)
|
|
println("删除路径并开始数据写入:" + hdfsPath)
|
|
MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
- rdd.repartition(100).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
|
|
|
+ rdd.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
} else {
|
|
} else {
|
|
println("路径不合法,无法写入:" + hdfsPath)
|
|
println("路径不合法,无法写入:" + hdfsPath)
|
|
}
|
|
}
|