Browse Source

diff数据

xueyiming 1 week ago
parent
commit
a1c7d8f7a8

+ 16 - 14
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250422.scala

@@ -62,7 +62,7 @@ object diff_data_20250422 {
             case (Some(num1), Some(num2)) =>
               val diff = math.abs(num1.doubleValue() - num2.doubleValue())
               if (diff > 0) {
-                Seq((key, (diff, 1)))
+                Seq((key, (diff, num1.doubleValue(), num2.doubleValue(), 1)))
               } else {
                 Seq.empty
               }
@@ -70,7 +70,7 @@ object diff_data_20250422 {
               val str1 = if (value1 != null) value1 else ""
               val str2 = if (value2 != null) value2 else ""
               if (str1 != str2) {
-                Seq((key, (0.0, 1)))
+                Seq((key, (0.0, 0.0, 0.0, 1)))
               } else {
                 Seq.empty
               }
@@ -82,8 +82,8 @@ object diff_data_20250422 {
     }
 
     // 使用 reduceByKey 进行聚合
-    val aggregatedPairs = diffPairs.reduceByKey { case ((diffSum1, count1), (diffSum2, count2)) =>
-      (diffSum1 + diffSum2, count1 + count2)
+    val aggregatedPairs = diffPairs.reduceByKey { case ((diffSum1, onlineSum1, offlineSum1, count1), (diffSum2, onlineSum2, offlineSum2, count2)) =>
+      (diffSum1 + diffSum2, onlineSum1 + onlineSum2, offlineSum1 + offlineSum2, count1 + count2)
     }
 
     // 收集结果
@@ -91,16 +91,20 @@ object diff_data_20250422 {
 
     // 处理结果
     val featureDiffSum = mutable.Map[String, Double]()
+    val featureOnlineSum = mutable.Map[String, Double]()
+    val featureOfflineSum = mutable.Map[String, Double]()
     val featureCount = mutable.Map[String, Int]()
 
-    result.foreach { case (key, (diffSum, count)) =>
+    result.foreach { case (key, (diffSum, onlineSum, offlineSum, count)) =>
       featureDiffSum(key) = diffSum
+      featureOnlineSum(key) = onlineSum
+      featureOfflineSum(key) = offlineSum
       featureCount(key) = count
     }
 
     // 打印结果
-    println("featureDiffSum: " + featureDiffSum)
-    println("featureCount: " + featureCount)
+//    println("featureDiffSum: " + featureDiffSum)
+//    println("featureCount: " + featureCount)
 
 
     val count2 = joinedRDD.count()
@@ -110,14 +114,12 @@ object diff_data_20250422 {
     println(featureDiffSum.size)
     featureDiffSum.foreach { case (feature, sum) =>
       val count = featureCount(feature)
+      val onlineSum = featureOnlineSum(feature)
+      val offlineSum = featureOfflineSum(feature)
       val averageDiff = sum / count
-      println(s"  Feature: $feature, Average Diff: $averageDiff  count Diff: $count")
-    }
-
-    featureDiffSum.foreach { case (feature, sum) =>
-      val count = featureCount(feature)
-      val diffRate = count / count2
-      println(s"  Feature: $feature, Diff Rate: $diffRate")
+      val averageOnline = onlineSum / count
+      val averageOffline = offlineSum / count
+      println(s"$feature,$averageDiff,$averageOnline,$averageOffline,$count")
     }
   }