|
@@ -51,9 +51,6 @@ object diff_data_20250319 {
|
|
|
})
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
// 以 logkey 为键进行转换
|
|
|
val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
|
|
|
val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
|
|
@@ -62,24 +59,18 @@ object diff_data_20250319 {
|
|
|
// 进行 join 操作
|
|
|
val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
|
|
|
|
|
|
- // 用于存储每个特征的总差异和比较次数
|
|
|
- val featureDiffSum = mutable.Map[String, Double]()
|
|
|
- val featureCount = mutable.Map[String, Int]()
|
|
|
- // 用于存储每个 logkey 下每个特征的差异率
|
|
|
- val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
|
|
|
|
|
|
val tuple = joinedRDD.first()
|
|
|
- val value1 = tuple._1
|
|
|
val value2 = tuple._2
|
|
|
val rdd1 = value2._1
|
|
|
val rdd2 = value2._2
|
|
|
|
|
|
-// val rdd1: JSONObject = odpsData1.first()
|
|
|
+ // val rdd1: JSONObject = odpsData1.first()
|
|
|
println("rdd1")
|
|
|
println(rdd1.get("logkey").toString)
|
|
|
println(rdd1.toString)
|
|
|
|
|
|
-// val rdd2: JSONObject = odpsData2.first()
|
|
|
+ // val rdd2: JSONObject = odpsData2.first()
|
|
|
println("rdd2")
|
|
|
println(rdd2.getString("logkey"))
|
|
|
println(rdd2.toString)
|
|
@@ -148,7 +139,12 @@ object diff_data_20250319 {
|
|
|
println("=====================================================================")
|
|
|
|
|
|
|
|
|
+ // 用于存储每个特征的总差异和比较次数
|
|
|
+ val featureDiffSum = mutable.Map[String, Double]()
|
|
|
+ val featureCount = mutable.Map[String, Int]()
|
|
|
|
|
|
+ val countJsonObject1 = new JSONObject();
|
|
|
+ val countJsonObject2 = new JSONObject();
|
|
|
|
|
|
joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
|
val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
|
|
@@ -175,6 +171,17 @@ object diff_data_20250319 {
|
|
|
case (Some(num1), Some(num2)) =>
|
|
|
val diff = math.abs(num1.doubleValue() - num2.doubleValue())
|
|
|
if (diff > 0) {
|
|
|
+ if (countJsonObject1.containsKey(key)) {
|
|
|
+ countJsonObject1.put(key, diff)
|
|
|
+ } else {
|
|
|
+ countJsonObject1.put(key, countJsonObject1.getDouble(key) + diff)
|
|
|
+ }
|
|
|
+ if (countJsonObject2.containsKey(key)) {
|
|
|
+ countJsonObject2.put(key, 1)
|
|
|
+ } else {
|
|
|
+ countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
|
|
|
+ }
|
|
|
+
|
|
|
featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
|
|
|
featureCount(key) = featureCount.getOrElse(key, 0) + 1
|
|
|
}
|
|
@@ -182,6 +189,11 @@ object diff_data_20250319 {
|
|
|
val str1 = if (value1 != null) value1 else ""
|
|
|
val str2 = if (value2 != null) value2 else ""
|
|
|
if (str1 != str2) {
|
|
|
+ if (countJsonObject2.containsKey(key)) {
|
|
|
+ countJsonObject2.put(key, 1)
|
|
|
+ } else {
|
|
|
+ countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
|
|
|
+ }
|
|
|
featureCount(key) = featureCount.getOrElse(key, 0) + 1
|
|
|
}
|
|
|
}
|
|
@@ -189,6 +201,10 @@ object diff_data_20250319 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ println("json")
|
|
|
+ println(countJsonObject1)
|
|
|
+ println(countJsonObject2)
|
|
|
+
|
|
|
|
|
|
// 输出每个特征的平均差异
|
|
|
println("每个特征的平均差异:")
|
|
@@ -210,9 +226,6 @@ object diff_data_20250319 {
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
def func(record: Record, schema: TableSchema): JSONObject = {
|
|
|
val featureMap = new JSONObject()
|
|
|
val columns = schema.getColumns
|