|
@@ -8,6 +8,8 @@ import org.apache.spark.rdd.RDD
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import java.util.Base64
|
|
|
+import scala.collection.JavaConverters.asScalaSetConverter
|
|
|
+import scala.collection.mutable
|
|
|
|
|
|
|
|
|
/*
|
|
@@ -73,60 +75,199 @@ object diff_data_20250319 {
|
|
|
// 进行 join 操作
|
|
|
val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
|
|
|
|
|
|
- val firstElement = joinedRDD.first()
|
|
|
- firstElement match {
|
|
|
- case (logkey, (map1, map2)) =>
|
|
|
- println(logkey)
|
|
|
- println(map1)
|
|
|
- println(map2)
|
|
|
- }
|
|
|
-
|
|
|
- // 比较相同 logkey 对应的 Map 中相同键的 value
|
|
|
// joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
- // println(logkey)
|
|
|
- // println(map1)
|
|
|
- // println(map2)
|
|
|
- // return
|
|
|
- // map1.foreach { case (key, value1) =>
|
|
|
- // if (key != "logkey") {
|
|
|
- // if(value1 == "\\N")
|
|
|
- //
|
|
|
- // if (map2.contains(key) && value1 != map2(key)) {
|
|
|
- // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
- // println(res)
|
|
|
- // result = result :+ res
|
|
|
- // } else if (!map2.contains(key)) {
|
|
|
- // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
|
|
|
- // println(res)
|
|
|
- // result = result :+ res
|
|
|
- // }
|
|
|
- // }
|
|
|
+ // val (diffRates, averageDiff) = calculateFeatureDiff(map1, map2)
|
|
|
+ // println(s"Logkey: $logkey")
|
|
|
+ // println("各个特征的差异率:")
|
|
|
+ // diffRates.foreach { case (feature, rate) =>
|
|
|
+ // println(s" $feature: ${rate * 100}%")
|
|
|
// }
|
|
|
+ // println(s"平均差异: $averageDiff")
|
|
|
// }
|
|
|
|
|
|
- // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
|
|
|
- // val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
|
|
|
- // rdd1Only.foreach { case (logkey, map) =>
|
|
|
- // val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
|
|
|
- // println(res)
|
|
|
- // result = result :+ res
|
|
|
- // }
|
|
|
- //
|
|
|
- // // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
|
|
|
- // val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
|
|
|
- // rdd2Only.foreach { case (logkey, map) =>
|
|
|
- // val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
|
|
|
- // println(res)
|
|
|
- // result = result :+ res
|
|
|
+
|
|
|
+ // 用于存储每个特征的总差异和比较次数
|
|
|
+ val featureDiffSum = mutable.Map[String, Double]()
|
|
|
+ val featureCount = mutable.Map[String, Int]()
|
|
|
+ // 用于存储每个 logkey 下每个特征的差异率
|
|
|
+ val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
|
|
|
+
|
|
|
+ joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
+ val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
|
|
|
+ // val currentDiffRates = mutable.Map[String, Double]()
|
|
|
+
|
|
|
+ keys.foreach { key =>
|
|
|
+ if (map1.containsKey(key) && map2.containsKey(key)) {
|
|
|
+ val value1 = map1.get(key)
|
|
|
+ val value2 = map2.get(key)
|
|
|
+ (value1, value2) match {
|
|
|
+ case (num1: java.lang.Number, num2: java.lang.Number) =>
|
|
|
+ val diff = math.abs(num1.doubleValue() - num2.doubleValue())
|
|
|
+ if (diff != 0) {
|
|
|
+ featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
|
|
|
+ featureCount(key) = featureCount.getOrElse(key, 0) + 1
|
|
|
+ }
|
|
|
+ // val diffRate = if (num1.doubleValue() != 0) diff / num1.doubleValue() else diff
|
|
|
+ // currentDiffRates(key) = diffRate
|
|
|
+
|
|
|
+ case (num1: java.lang.String, num2: java.lang.String) =>
|
|
|
+ if (num1 != num2) {
|
|
|
+ featureCount(key) = featureCount.getOrElse(key, 0) + 1
|
|
|
+ }
|
|
|
+ case _ =>
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // featureDiffRates(logkey) = currentDiffRates
|
|
|
+ }
|
|
|
+
|
|
|
+ // 输出每个 logkey 下每个特征的差异率
|
|
|
+ // featureDiffRates.foreach { case (logkey, rates) =>
|
|
|
+ // println(s"Logkey: $logkey")
|
|
|
+ // rates.foreach { case (feature, rate) =>
|
|
|
+ // println(s" Feature: $feature, Diff Rate: ${rate * 100}%")
|
|
|
+ // }
|
|
|
// }
|
|
|
- //
|
|
|
- // result = result :+ "test"
|
|
|
- // println(result)
|
|
|
- // val rdd = sc.parallelize(result)
|
|
|
|
|
|
+ // 输出每个特征的平均差异
|
|
|
+ println("每个特征的平均差异:")
|
|
|
+ featureDiffSum.foreach { case (feature, sum) =>
|
|
|
+ val count = featureCount(feature)
|
|
|
+ val averageDiff = sum / count
|
|
|
+ println(s" Feature: $feature, Average Diff: $averageDiff")
|
|
|
+ }
|
|
|
|
|
|
+ val count = joinedRDD.count()
|
|
|
+ println(s"对比总数: $count")
|
|
|
+ println("每个特征的差异率:")
|
|
|
+ featureCount.foreach { case (feature, sum) =>
|
|
|
+ val rateDiff = sum / count
|
|
|
+ println(s" Feature: $feature, Rate Diff: $rateDiff")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // val firstElement = joinedRDD.first()
|
|
|
+ // firstElement match {
|
|
|
+ // case (logkey, (map1, map2)) =>
|
|
|
+ // println(compareJSONObjects(map1, map2, logkey))
|
|
|
+ // }
|
|
|
+
|
|
|
+ // 比较相同 logkey 对应的 Map 中相同键的 value
|
|
|
+ // joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
+ // println(logkey)
|
|
|
+ // println(map1)
|
|
|
+ // println(map2)
|
|
|
+ // return
|
|
|
+ // map1.foreach { case (key, value1) =>
|
|
|
+ // if (key != "logkey") {
|
|
|
+ // if(value1 == "\\N")
|
|
|
+ //
|
|
|
+ // if (map2.contains(key) && value1 != map2(key)) {
|
|
|
+ // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
|
|
|
+ // println(res)
|
|
|
+ // result = result :+ res
|
|
|
+ // } else if (!map2.contains(key)) {
|
|
|
+ // val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
|
|
|
+ // println(res)
|
|
|
+ // result = result :+ res
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+
|
|
|
+ // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
|
|
|
+ // val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
|
|
|
+ // rdd1Only.foreach { case (logkey, map) =>
|
|
|
+ // val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
|
|
|
+ // println(res)
|
|
|
+ // result = result :+ res
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
|
|
|
+ // val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
|
|
|
+ // rdd2Only.foreach { case (logkey, map) =>
|
|
|
+ // val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
|
|
|
+ // println(res)
|
|
|
+ // result = result :+ res
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // result = result :+ "test"
|
|
|
+ // println(result)
|
|
|
+ // val rdd = sc.parallelize(result)
|
|
|
+
|
|
|
+
|
|
|
+// def calculateFeatureDiff(map1: JSONObject, map2: JSONObject): (Map[String, Double], Double) = {
|
|
|
+// val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
|
|
|
+// var totalDiff = 0.0
|
|
|
+// var validCount = 0
|
|
|
+// val diffRates = collection.mutable.Map[String, Double]()
|
|
|
+//
|
|
|
+// keys.foreach { key =>
|
|
|
+// if (map1.containsKey(key) && map2.containsKey(key)) {
|
|
|
+// val value1 = map1.get(key)
|
|
|
+// val value2 = map2.get(key)
|
|
|
+// (value1, value2) match {
|
|
|
+// case (num1: java.lang.Number, num2: java.lang.Number) =>
|
|
|
+// val diff = math.abs(num1.doubleValue() - num2.doubleValue())
|
|
|
+// val diffRate = if (num1.doubleValue() != 0) diff / num1.doubleValue() else diff
|
|
|
+// diffRates(key) = diffRate
|
|
|
+// totalDiff += diff
|
|
|
+// validCount += 1
|
|
|
+// case _ =>
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// val averageDiff = if (validCount > 0) totalDiff / validCount else 0.0
|
|
|
+// (diffRates.toMap, averageDiff)
|
|
|
+// }
|
|
|
+
|
|
|
+// def compareJSONObjects(json1: JSONObject, json2: JSONObject, logkey: String): String = {
|
|
|
+// // 存储不同值的键和对应不同的值
|
|
|
+// val differentValues = collection.mutable.Map[String, (Any, Any)]()
|
|
|
+// // 存储 json1 中缺少的键
|
|
|
+// val missingInJson1 = collection.mutable.Set[String]()
|
|
|
+// // 存储 json2 中缺少的键
|
|
|
+// val missingInJson2 = collection.mutable.Set[String]()
|
|
|
+//
|
|
|
+// // 获取 json1 和 json2 的所有键
|
|
|
+// val keys1 = json1.keySet().asScala
|
|
|
+// val keys2 = json2.keySet().asScala
|
|
|
+//
|
|
|
+// // 找出不同值的键和对应不同的值
|
|
|
+// keys1.foreach { key =>
|
|
|
+// if (keys2.contains(key)) {
|
|
|
+// val value1 = json1.get(key)
|
|
|
+// val value2 = json2.get(key)
|
|
|
+// if (value1 != value2) {
|
|
|
+// differentValues(key) = (value1, value2)
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// missingInJson2.add(key)
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 找出 json1 中缺少的键
|
|
|
+// keys2.foreach { key =>
|
|
|
+// if (!keys1.contains(key)) {
|
|
|
+// missingInJson1.add(key)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// val differentValuesStr = differentValues.map { case (key, (value1, value2)) =>
|
|
|
+// s"$key: $value1 vs $value2"
|
|
|
+// }.mkString(", ")
|
|
|
+//
|
|
|
+// val missingInJson1Str = missingInJson1.mkString(", ")
|
|
|
+// val missingInJson2Str = missingInJson2.mkString(", ")
|
|
|
+//
|
|
|
+// val res = s"logkey: $logkey\t不同值: $differentValuesStr\tjson1 缺少的键: $missingInJson1Str\tjson2 缺少的键: $missingInJson2Str"
|
|
|
+// res
|
|
|
+// }
|
|
|
+
|
|
|
+
|
|
|
// def func(record: Record, schema: TableSchema): Map[String, String] = {
|
|
|
// var map: Map[String, String] = Map.empty
|
|
|
// val columns = schema.getColumns
|