|
@@ -49,83 +49,7 @@ object diff_data_20250422 {
|
|
|
|
|
|
// 进行 join 操作
|
|
|
val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd2Pairs.join(rdd1Pairs)
|
|
|
- val tuple = joinedRDD.first()
|
|
|
- val value2 = tuple._2
|
|
|
- val rdd1 = value2._1
|
|
|
- val rdd2 = value2._2
|
|
|
-
|
|
|
- // val rdd1: JSONObject = odpsData1.first()
|
|
|
- println("rdd1")
|
|
|
- println(rdd1.get("pqtid").toString)
|
|
|
- println(rdd1.toString)
|
|
|
-
|
|
|
- // val rdd2: JSONObject = odpsData2.first()
|
|
|
- println("rdd2")
|
|
|
- println(rdd2.getString("pqtid"))
|
|
|
- println(rdd2.toString)
|
|
|
-
|
|
|
- val keys = rdd1.keySet().asScala.toSet ++ rdd2.keySet().asScala.toSet
|
|
|
- println("keys")
|
|
|
- println(keys)
|
|
|
-
|
|
|
- // 用于存储每个特征的总差异和比较次数
|
|
|
- val featureDiffSum1 = mutable.Map[String, Double]()
|
|
|
- val featureCount1 = mutable.Map[String, Int]()
|
|
|
-
|
|
|
- keys.foreach { key =>
|
|
|
- if (rdd1.containsKey(key) && rdd2.containsKey(key)) {
|
|
|
- val value1 = rdd1.getString(key)
|
|
|
- val value2 = rdd2.getString(key)
|
|
|
-
|
|
|
- def tryToNumber(value: Any): Option[java.lang.Number] = {
|
|
|
- value match {
|
|
|
- case num: java.lang.Number => Some(num)
|
|
|
- case str: String =>
|
|
|
- try {
|
|
|
- Some(str.toDouble)
|
|
|
- } catch {
|
|
|
- case _: NumberFormatException => None
|
|
|
- }
|
|
|
- case _ => None
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- (tryToNumber(value1), tryToNumber(value2)) match {
|
|
|
- case (Some(num1), Some(num2)) =>
|
|
|
- val diff = math.abs(num1.doubleValue() - num2.doubleValue())
|
|
|
- if (diff > 0) {
|
|
|
- featureDiffSum1(key) = featureDiffSum1.getOrElse(key, 0.0) + diff
|
|
|
- featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
|
|
|
- }
|
|
|
- case _ =>
|
|
|
- val str1 = if (value1 != null) value1 else ""
|
|
|
- val str2 = if (value2 != null) value2 else ""
|
|
|
- if (str1 != str2) {
|
|
|
- featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // 输出每个特征的平均差异
|
|
|
- println("每个特征的平均差异:")
|
|
|
- println(featureDiffSum1.size)
|
|
|
- featureDiffSum1.foreach { case (feature, sum) =>
|
|
|
- val count = featureCount1(feature)
|
|
|
- val averageDiff = sum / count
|
|
|
- println(s" Feature: $feature, Average Diff: $averageDiff")
|
|
|
- }
|
|
|
-
|
|
|
- val count1 = 1
|
|
|
- println(s"对比总数: $count1")
|
|
|
- println("每个特征的差异率:")
|
|
|
- println(featureCount1.size)
|
|
|
- featureCount1.foreach { case (feature, sum) =>
|
|
|
- val rateDiff = sum / count1
|
|
|
- println(s" Feature: $feature, Rate Diff: $rateDiff")
|
|
|
- }
|
|
|
-
|
|
|
- println("=====================================================================")
|
|
|
+ println("joinedRDD count", joinedRDD.count)
|
|
|
|
|
|
// 使用 map 操作生成键值对
|
|
|
val diffPairs = joinedRDD.flatMap { case (_, (map1, map2)) =>
|
|
@@ -180,14 +104,20 @@ object diff_data_20250422 {
|
|
|
|
|
|
|
|
|
val count2 = joinedRDD.count()
|
|
|
+ println("count2:" + count2)
|
|
|
// 输出每个特征的平均差异
|
|
|
println("每个特征的平均差异:")
|
|
|
println(featureDiffSum.size)
|
|
|
featureDiffSum.foreach { case (feature, sum) =>
|
|
|
val count = featureCount(feature)
|
|
|
val averageDiff = sum / count
|
|
|
- val rateDiff = count / count2
|
|
|
- println(s" Feature: $feature, Average Diff: $averageDiff Rate Diff: $rateDiff")
|
|
|
+ println(s" Feature: $feature, Average Diff: $averageDiff count Diff: $count")
|
|
|
+ }
|
|
|
+
|
|
|
+ featureDiffSum.foreach { case (feature, sum) =>
|
|
|
+ val count = featureCount(feature)
|
|
|
+ val diffRate = count / count2
|
|
|
+ println(s" Feature: $feature, Diff Rate: $diffRate")
|
|
|
}
|
|
|
}
|
|
|
|