xueyiming 1 月之前
父節點
當前提交
bb66399cee
共有 1 個文件被更改,包括 129 次插入79 次删除
  1. 129 79
      src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

+ 129 - 79
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -60,79 +60,129 @@ object diff_data_20250319 {
     println(rdd2.getString("logkey"))
     println(rdd2.toString)
 
-
-    //    var result: List[String] = List.empty
-    //
-    //    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
-    //
-    //    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
-
-    // 以 logkey 为键进行转换
-    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
-    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
-
-
-    // 进行 join 操作
-    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
-
-    //    joinedRDD.foreach { case (logkey, (map1, map2)) =>
-    //      val (diffRates, averageDiff) = calculateFeatureDiff(map1, map2)
-    //      println(s"Logkey: $logkey")
-    //      println("各个特征的差异率:")
-    //      diffRates.foreach { case (feature, rate) =>
-    //        println(s"  $feature: ${rate * 100}%")
-    //      }
-    //      println(s"平均差异: $averageDiff")
-    //    }
-
+    val keys = rdd1.keySet().asScala.toSet ++ rdd2.keySet().asScala.toSet
+    println("keys")
+    println(keys)
 
     // 用于存储每个特征的总差异和比较次数
     val featureDiffSum = mutable.Map[String, Double]()
     val featureCount = mutable.Map[String, Int]()
-    // 用于存储每个 logkey 下每个特征的差异率
-    val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
-
-    joinedRDD.foreach { case (logkey, (map1, map2)) =>
-      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
-      //      val currentDiffRates = mutable.Map[String, Double]()
-
-      keys.foreach { key =>
-        if (map1.containsKey(key) && map2.containsKey(key)) {
-          val value1 = map1.getString(key)
-          val value2 = map2.getString(key)
-
-          def tryToNumber(value: Any): Option[java.lang.Number] = {
-            value match {
-              case num: java.lang.Number => Some(num)
-              case str: String =>
-                try {
-                  Some(str.toDouble)
-                } catch {
-                  case _: NumberFormatException => None
-                }
-              case _ => None
-            }
-          }
 
-          (tryToNumber(value1), tryToNumber(value2)) match {
-            case (Some(num1), Some(num2)) =>
-              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
-              if (diff > 0) {
-                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
-                featureCount(key) = featureCount.getOrElse(key, 0) + 1
-              }
-            case _ =>
-              val str1 = if (value1 != null) value1 else ""
-              val str2 = if (value2 != null) value2 else ""
-              if (str1 != str2) {
-                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+    keys.foreach { key =>
+      if (rdd1.containsKey(key) && rdd2.containsKey(key)) {
+        val value1 = rdd1.getString(key)
+        val value2 = rdd2.getString(key)
+
+        def tryToNumber(value: Any): Option[java.lang.Number] = {
+          value match {
+            case num: java.lang.Number => Some(num)
+            case str: String =>
+              try {
+                Some(str.toDouble)
+              } catch {
+                case _: NumberFormatException => None
               }
+            case _ => None
           }
         }
+
+        (tryToNumber(value1), tryToNumber(value2)) match {
+          case (Some(num1), Some(num2)) =>
+            val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+            if (diff > 0) {
+              featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
+              featureCount(key) = featureCount.getOrElse(key, 0) + 1
+            }
+          case _ =>
+            val str1 = if (value1 != null) value1 else ""
+            val str2 = if (value2 != null) value2 else ""
+            if (str1 != str2) {
+              featureCount(key) = featureCount.getOrElse(key, 0) + 1
+            }
+        }
       }
-      //      featureDiffRates(logkey) = currentDiffRates
     }
 
+    // 输出每个特征的平均差异
+        println("每个特征的平均差异:")
+        println(featureDiffSum.size)
+        featureDiffSum.foreach { case (feature, sum) =>
+          val count = featureCount(feature)
+          val averageDiff = sum / count
+          println(s"  Feature: $feature, Average Diff: $averageDiff")
+        }
+
+        val count = 1
+        println(s"对比总数: $count")
+        println("每个特征的差异率:")
+        println(featureCount.size)
+        featureCount.foreach { case (feature, sum) =>
+          val rateDiff = sum / count
+          println(s"  Feature: $feature, Rate Diff: $rateDiff")
+        }
+
+
+    //    var result: List[String] = List.empty
+    //
+    //    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
+    //
+    //    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
+
+//    // 以 logkey 为键进行转换
+//    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
+//    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
+//
+//
+//    // 进行 join 操作
+//    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
+//
+//    // 用于存储每个特征的总差异和比较次数
+//    val featureDiffSum = mutable.Map[String, Double]()
+//    val featureCount = mutable.Map[String, Int]()
+//    // 用于存储每个 logkey 下每个特征的差异率
+//    val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
+//
+//    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+//      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
+//      //      val currentDiffRates = mutable.Map[String, Double]()
+//
+//      keys.foreach { key =>
+//        if (map1.containsKey(key) && map2.containsKey(key)) {
+//          val value1 = map1.getString(key)
+//          val value2 = map2.getString(key)
+//
+//          def tryToNumber(value: Any): Option[java.lang.Number] = {
+//            value match {
+//              case num: java.lang.Number => Some(num)
+//              case str: String =>
+//                try {
+//                  Some(str.toDouble)
+//                } catch {
+//                  case _: NumberFormatException => None
+//                }
+//              case _ => None
+//            }
+//          }
+//
+//          (tryToNumber(value1), tryToNumber(value2)) match {
+//            case (Some(num1), Some(num2)) =>
+//              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+//              if (diff > 0) {
+//                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
+//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+//              }
+//            case _ =>
+//              val str1 = if (value1 != null) value1 else ""
+//              val str2 = if (value2 != null) value2 else ""
+//              if (str1 != str2) {
+//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+//              }
+//          }
+//        }
+//      }
+//      //      featureDiffRates(logkey) = currentDiffRates
+//    }
+
     // 输出每个 logkey 下每个特征的差异率
     //    featureDiffRates.foreach { case (logkey, rates) =>
     //      println(s"Logkey: $logkey")
@@ -141,23 +191,23 @@ object diff_data_20250319 {
     //      }
     //    }
 
-    // 输出每个特征的平均差异
-    println("每个特征的平均差异:")
-    println(featureDiffSum.size)
-    featureDiffSum.foreach { case (feature, sum) =>
-      val count = featureCount(feature)
-      val averageDiff = sum / count
-      println(s"  Feature: $feature, Average Diff: $averageDiff")
-    }
-
-    val count = joinedRDD.count()
-    println(s"对比总数: $count")
-    println("每个特征的差异率:")
-    println(featureCount.size)
-    featureCount.foreach { case (feature, sum) =>
-      val rateDiff = sum / count
-      println(s"  Feature: $feature, Rate Diff: $rateDiff")
-    }
+//    // 输出每个特征的平均差异
+//    println("每个特征的平均差异:")
+//    println(featureDiffSum.size)
+//    featureDiffSum.foreach { case (feature, sum) =>
+//      val count = featureCount(feature)
+//      val averageDiff = sum / count
+//      println(s"  Feature: $feature, Average Diff: $averageDiff")
+//    }
+//
+//    val count = joinedRDD.count()
+//    println(s"对比总数: $count")
+//    println("每个特征的差异率:")
+//    println(featureCount.size)
+//    featureCount.foreach { case (feature, sum) =>
+//      val rateDiff = sum / count
+//      println(s"  Feature: $feature, Rate Diff: $rateDiff")
+//    }
   }