xueyiming 1 tháng trước cách đây
mục cha
commit
a7d47e8593

+ 98 - 46
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -140,70 +140,122 @@ object diff_data_20250319 {
 
 
     // 用于存储每个特征的总差异和比较次数
-    val featureDiffSum = mutable.Map[String, Double]()
-    val featureCount = mutable.Map[String, Int]()
-
-    val countJsonObject1 = new JSONObject();
-    val countJsonObject2 = new JSONObject();
-
-    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+//    val featureDiffSum = mutable.Map[String, Double]()
+//    val featureCount = mutable.Map[String, Int]()
+//
+//    val countJsonObject1 = new JSONObject();
+//    val countJsonObject2 = new JSONObject();
+//
+//    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+//      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
+//      println(logkey)
+//      keys.foreach { key =>
+//        if (map1.containsKey(key) && map2.containsKey(key)) {
+//          val value1 = map1.getString(key)
+//          val value2 = map2.getString(key)
+//          def tryToNumber(value: Any): Option[java.lang.Number] = {
+//            value match {
+//              case num: java.lang.Number => Some(num)
+//              case str: String =>
+//                try {
+//                  Some(str.toDouble)
+//                } catch {
+//                  case _: NumberFormatException => None
+//                }
+//              case _ => None
+//            }
+//          }
+//          (tryToNumber(value1), tryToNumber(value2)) match {
+//            case (Some(num1), Some(num2)) =>
+//              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+//              if (diff > 0) {
+//                if (countJsonObject1.containsKey(key)) {
+//                  countJsonObject1.put(key, countJsonObject1.getDouble(key) + diff)
+//                } else {
+//                  countJsonObject1.put(key, diff)
+//                }
+//                if (countJsonObject2.containsKey(key)) {
+//                  countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
+//                } else {
+//                  countJsonObject2.put(key, 1)
+//
+//                }
+//                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
+//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+//              }
+//            case _ =>
+//              val str1 = if (value1 != null) value1 else ""
+//              val str2 = if (value2 != null) value2 else ""
+//              if (str1 != str2) {
+//                if (countJsonObject2.containsKey(key)) {
+//                  countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
+//                } else {
+//                  countJsonObject2.put(key, 1)
+//                }
+//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+//              }
+//          }
+//        }
+//      }
+//    }
+
+    // 使用 map 操作生成键值对
+    val diffPairs = joinedRDD.flatMap { case (_, (map1, map2)) =>
       val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
-      println(logkey)
-      keys.foreach { key =>
-        if (map1.containsKey(key) && map2.containsKey(key)) {
+      keys.flatMap { key =>
+        if (map1.has(key) && map2.has(key)) {
           val value1 = map1.getString(key)
           val value2 = map2.getString(key)
-
-          def tryToNumber(value: Any): Option[java.lang.Number] = {
-            value match {
-              case num: java.lang.Number => Some(num)
-              case str: String =>
-                try {
-                  Some(str.toDouble)
-                } catch {
-                  case _: NumberFormatException => None
-                }
-              case _ => None
-            }
-          }
-
           (tryToNumber(value1), tryToNumber(value2)) match {
             case (Some(num1), Some(num2)) =>
               val diff = math.abs(num1.doubleValue() - num2.doubleValue())
               if (diff > 0) {
-                if (countJsonObject1.containsKey(key)) {
-                  countJsonObject1.put(key, countJsonObject1.getDouble(key) + diff)
-                } else {
-                  countJsonObject1.put(key, diff)
-
-                }
-                if (countJsonObject2.containsKey(key)) {
-                  countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
-                } else {
-                  countJsonObject2.put(key, 1)
-
-                }
-
-                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
-                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+                Seq((key, (diff, 1)))
+              } else {
+                Seq.empty
               }
             case _ =>
               val str1 = if (value1 != null) value1 else ""
               val str2 = if (value2 != null) value2 else ""
               if (str1 != str2) {
-                if (countJsonObject2.containsKey(key)) {
-                  countJsonObject2.put(key, countJsonObject2.getIntValue(key) + 1)
-                } else {
-                  countJsonObject2.put(key, 1)
-
-                }
-                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+                Seq((key, (0.0, 1)))
+              } else {
+                Seq.empty
               }
           }
+        } else {
+          Seq.empty
         }
       }
     }
 
+    // 使用 reduceByKey 进行聚合
+    val aggregatedPairs = diffPairs.reduceByKey { case ((diffSum1, count1), (diffSum2, count2)) =>
+      (diffSum1 + diffSum2, count1 + count2)
+    }
+
+    // 收集结果
+    val result = aggregatedPairs.collect()
+
+    // 处理结果
+    val featureDiffSum = mutable.Map[String, Double]()
+    val featureCount = mutable.Map[String, Int]()
+    val countJsonObject1 = new JSONObject()
+    val countJsonObject2 = new JSONObject()
+
+    result.foreach { case (key, (diffSum, count)) =>
+      featureDiffSum(key) = diffSum
+      featureCount(key) = count
+      countJsonObject1.put(key, diffSum)
+      countJsonObject2.put(key, count)
+    }
+
+    // 打印结果
+    println("featureDiffSum: " + featureDiffSum)
+    println("featureCount: " + featureCount)
+    println("countJsonObject1: " + countJsonObject1)
+    println("countJsonObject2: " + countJsonObject2)
+
     println("json")
     println(countJsonObject1)
     println(countJsonObject2)