xueyiming il y a 1 mois
Parent
commit
487697814a

+ 107 - 237
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -50,12 +50,36 @@ object diff_data_20250319 {
       allfeaturemap
     })
 
-    val rdd1: JSONObject = odpsData1.first()
+
+
+
+
+    // 以 logkey 为键进行转换
+    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
+    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
+
+
+    // 进行 join 操作
+    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
+
+    // 用于存储每个特征的总差异和比较次数
+    val featureDiffSum = mutable.Map[String, Double]()
+    val featureCount = mutable.Map[String, Int]()
+    // 用于存储每个 logkey 下每个特征的差异率
+    val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
+
+    val tuple = joinedRDD.first()
+    val value1 = tuple._1
+    val value2 = tuple._2
+    val rdd1 = value2._1
+    val rdd2 = value2._2
+
+//    val rdd1: JSONObject = odpsData1.first()
     println("rdd1")
     println(rdd1.get("logkey").toString)
     println(rdd1.toString)
 
-    val rdd2: JSONObject = odpsData2.first()
+//    val rdd2: JSONObject = odpsData2.first()
     println("rdd2")
     println(rdd2.getString("logkey"))
     println(rdd2.toString)
@@ -65,8 +89,8 @@ object diff_data_20250319 {
     println(keys)
 
     // 用于存储每个特征的总差异和比较次数
-    val featureDiffSum = mutable.Map[String, Double]()
-    val featureCount = mutable.Map[String, Int]()
+    val featureDiffSum1 = mutable.Map[String, Double]()
+    val featureCount1 = mutable.Map[String, Int]()
 
     keys.foreach { key =>
       if (rdd1.containsKey(key) && rdd2.containsKey(key)) {
@@ -90,259 +114,105 @@ object diff_data_20250319 {
           case (Some(num1), Some(num2)) =>
             val diff = math.abs(num1.doubleValue() - num2.doubleValue())
             if (diff > 0) {
-              featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
-              featureCount(key) = featureCount.getOrElse(key, 0) + 1
+              featureDiffSum1(key) = featureDiffSum1.getOrElse(key, 0.0) + diff
+              featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
             }
           case _ =>
             val str1 = if (value1 != null) value1 else ""
             val str2 = if (value2 != null) value2 else ""
             if (str1 != str2) {
-              featureCount(key) = featureCount.getOrElse(key, 0) + 1
+              featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
             }
         }
       }
     }
 
     // 输出每个特征的平均差异
-        println("每个特征的平均差异:")
-        println(featureDiffSum.size)
-        featureDiffSum.foreach { case (feature, sum) =>
-          val count = featureCount(feature)
-          val averageDiff = sum / count
-          println(s"  Feature: $feature, Average Diff: $averageDiff")
-        }
+    println("每个特征的平均差异:")
+    println(featureDiffSum1.size)
+    featureDiffSum1.foreach { case (feature, sum) =>
+      val count = featureCount1(feature)
+      val averageDiff = sum / count
+      println(s"  Feature: $feature, Average Diff: $averageDiff")
+    }
+
+    val count1 = 1
+    println(s"对比总数: $count1")
+    println("每个特征的差异率:")
+    println(featureCount1.size)
+    featureCount1.foreach { case (feature, sum) =>
+      val rateDiff = sum / count1
+      println(s"  Feature: $feature, Rate Diff: $rateDiff")
+    }
 
-        val count = 1
-        println(s"对比总数: $count")
-        println("每个特征的差异率:")
-        println(featureCount.size)
-        featureCount.foreach { case (feature, sum) =>
-          val rateDiff = sum / count
-          println(s"  Feature: $feature, Rate Diff: $rateDiff")
+    println("=====================================================================")
+
+
+
+
+    joinedRDD.foreach { case (logkey, (map1, map2)) =>
+      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
+      println(logkey)
+      keys.foreach { key =>
+        if (map1.containsKey(key) && map2.containsKey(key)) {
+          val value1 = map1.getString(key)
+          val value2 = map2.getString(key)
+
+          def tryToNumber(value: Any): Option[java.lang.Number] = {
+            value match {
+              case num: java.lang.Number => Some(num)
+              case str: String =>
+                try {
+                  Some(str.toDouble)
+                } catch {
+                  case _: NumberFormatException => None
+                }
+              case _ => None
+            }
+          }
+
+          (tryToNumber(value1), tryToNumber(value2)) match {
+            case (Some(num1), Some(num2)) =>
+              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+              if (diff > 0) {
+                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
+                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+              }
+            case _ =>
+              val str1 = if (value1 != null) value1 else ""
+              val str2 = if (value2 != null) value2 else ""
+              if (str1 != str2) {
+                featureCount(key) = featureCount.getOrElse(key, 0) + 1
+              }
+          }
         }
+      }
+    }
 
 
-    //    var result: List[String] = List.empty
-    //
-    //    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
-    //
-    //    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
-
-//    // 以 logkey 为键进行转换
-//    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
-//    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
-//
-//
-//    // 进行 join 操作
-//    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
-//
-//    // 用于存储每个特征的总差异和比较次数
-//    val featureDiffSum = mutable.Map[String, Double]()
-//    val featureCount = mutable.Map[String, Int]()
-//    // 用于存储每个 logkey 下每个特征的差异率
-//    val featureDiffRates = mutable.Map[String, mutable.Map[String, Double]]()
-//
-//    joinedRDD.foreach { case (logkey, (map1, map2)) =>
-//      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
-//      //      val currentDiffRates = mutable.Map[String, Double]()
-//
-//      keys.foreach { key =>
-//        if (map1.containsKey(key) && map2.containsKey(key)) {
-//          val value1 = map1.getString(key)
-//          val value2 = map2.getString(key)
-//
-//          def tryToNumber(value: Any): Option[java.lang.Number] = {
-//            value match {
-//              case num: java.lang.Number => Some(num)
-//              case str: String =>
-//                try {
-//                  Some(str.toDouble)
-//                } catch {
-//                  case _: NumberFormatException => None
-//                }
-//              case _ => None
-//            }
-//          }
-//
-//          (tryToNumber(value1), tryToNumber(value2)) match {
-//            case (Some(num1), Some(num2)) =>
-//              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
-//              if (diff > 0) {
-//                featureDiffSum(key) = featureDiffSum.getOrElse(key, 0.0) + diff
-//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
-//              }
-//            case _ =>
-//              val str1 = if (value1 != null) value1 else ""
-//              val str2 = if (value2 != null) value2 else ""
-//              if (str1 != str2) {
-//                featureCount(key) = featureCount.getOrElse(key, 0) + 1
-//              }
-//          }
-//        }
-//      }
-//      //      featureDiffRates(logkey) = currentDiffRates
-//    }
-
-    // 输出每个 logkey 下每个特征的差异率
-    //    featureDiffRates.foreach { case (logkey, rates) =>
-    //      println(s"Logkey: $logkey")
-    //      rates.foreach { case (feature, rate) =>
-    //        println(s"  Feature: $feature, Diff Rate: ${rate * 100}%")
-    //      }
-    //    }
-
-//    // 输出每个特征的平均差异
-//    println("每个特征的平均差异:")
-//    println(featureDiffSum.size)
-//    featureDiffSum.foreach { case (feature, sum) =>
-//      val count = featureCount(feature)
-//      val averageDiff = sum / count
-//      println(s"  Feature: $feature, Average Diff: $averageDiff")
-//    }
-//
-//    val count = joinedRDD.count()
-//    println(s"对比总数: $count")
-//    println("每个特征的差异率:")
-//    println(featureCount.size)
-//    featureCount.foreach { case (feature, sum) =>
-//      val rateDiff = sum / count
-//      println(s"  Feature: $feature, Rate Diff: $rateDiff")
-//    }
+    // 输出每个特征的平均差异
+    println("每个特征的平均差异:")
+    println(featureDiffSum.size)
+    featureDiffSum.foreach { case (feature, sum) =>
+      val count = featureCount(feature)
+      val averageDiff = sum / count
+      println(s"  Feature: $feature, Average Diff: $averageDiff")
+    }
+
+    val count = joinedRDD.count()
+    println(s"对比总数: $count")
+    println("每个特征的差异率:")
+    println(featureCount.size)
+    featureCount.foreach { case (feature, sum) =>
+      val rateDiff = sum / count
+      println(s"  Feature: $feature, Rate Diff: $rateDiff")
+    }
   }
 
 
 
 
-  //    val firstElement = joinedRDD.first()
-  //    firstElement match {
-  //      case (logkey, (map1, map2)) =>
-  //        println(compareJSONObjects(map1, map2, logkey))
-  //    }
-
-  // 比较相同 logkey 对应的 Map 中相同键的 value
-  //    joinedRDD.foreach { case (logkey, (map1, map2)) =>
-  //      println(logkey)
-  //      println(map1)
-  //      println(map2)
-  //      return
-  //      map1.foreach { case (key, value1) =>
-  //        if (key != "logkey") {
-  //          if(value1 == "\\N")
-  //
-  //          if (map2.contains(key) && value1 != map2(key)) {
-  //            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 Value: ${map2(key)}"
-  //            println(res)
-  //            result = result :+ res
-  //          } else if (!map2.contains(key)) {
-  //            val res = s"LogKey: $logkey, Key: $key, RDD1 Value: $value1, RDD2 does not have this key"
-  //            println(res)
-  //            result = result :+ res
-  //          }
-  //        }
-  //      }
-  //    }
-
-  // 检查 rdd1 中存在但 rdd2 中不存在的 logkey
-  //    val rdd1Only = rdd1Pairs.subtractByKey(rdd2Pairs)
-  //    rdd1Only.foreach { case (logkey, map) =>
-  //      val res = s"LogKey: $logkey, Map only exists in RDD1: $map"
-  //      println(res)
-  //      result = result :+ res
-  //    }
-  //
-  //    // 检查 rdd2 中存在但 rdd1 中不存在的 logkey
-  //    val rdd2Only = rdd2Pairs.subtractByKey(rdd1Pairs)
-  //    rdd2Only.foreach { case (logkey, map) =>
-  //      val res = s"LogKey: $logkey, Map only exists in RDD2: $map"
-  //      println(res)
-  //      result = result :+ res
-  //    }
-  //
-  //    result = result :+ "test"
-  //    println(result)
-  //    val rdd = sc.parallelize(result)
-
-
-  //  def calculateFeatureDiff(map1: JSONObject, map2: JSONObject): (Map[String, Double], Double) = {
-  //    val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
-  //    var totalDiff = 0.0
-  //    var validCount = 0
-  //    val diffRates = collection.mutable.Map[String, Double]()
-  //
-  //    keys.foreach { key =>
-  //      if (map1.containsKey(key) && map2.containsKey(key)) {
-  //        val value1 = map1.get(key)
-  //        val value2 = map2.get(key)
-  //        (value1, value2) match {
-  //          case (num1: java.lang.Number, num2: java.lang.Number) =>
-  //            val diff = math.abs(num1.doubleValue() - num2.doubleValue())
-  //            val diffRate = if (num1.doubleValue() != 0) diff / num1.doubleValue() else diff
-  //            diffRates(key) = diffRate
-  //            totalDiff += diff
-  //            validCount += 1
-  //          case _ =>
-  //        }
-  //      }
-  //    }
-  //
-  //    val averageDiff = if (validCount > 0) totalDiff / validCount else 0.0
-  //    (diffRates.toMap, averageDiff)
-  //  }
-
-  //  def compareJSONObjects(json1: JSONObject, json2: JSONObject, logkey: String): String = {
-  //    // 存储不同值的键和对应不同的值
-  //    val differentValues = collection.mutable.Map[String, (Any, Any)]()
-  //    // 存储 json1 中缺少的键
-  //    val missingInJson1 = collection.mutable.Set[String]()
-  //    // 存储 json2 中缺少的键
-  //    val missingInJson2 = collection.mutable.Set[String]()
-  //
-  //    // 获取 json1 和 json2 的所有键
-  //    val keys1 = json1.keySet().asScala
-  //    val keys2 = json2.keySet().asScala
-  //
-  //    // 找出不同值的键和对应不同的值
-  //    keys1.foreach { key =>
-  //      if (keys2.contains(key)) {
-  //        val value1 = json1.get(key)
-  //        val value2 = json2.get(key)
-  //        if (value1 != value2) {
-  //          differentValues(key) = (value1, value2)
-  //        }
-  //      } else {
-  //        missingInJson2.add(key)
-  //      }
-  //    }
-  //
-  //    // 找出 json1 中缺少的键
-  //    keys2.foreach { key =>
-  //      if (!keys1.contains(key)) {
-  //        missingInJson1.add(key)
-  //      }
-  //    }
-  //    val differentValuesStr = differentValues.map { case (key, (value1, value2)) =>
-  //      s"$key: $value1 vs $value2"
-  //    }.mkString(", ")
-  //
-  //    val missingInJson1Str = missingInJson1.mkString(", ")
-  //    val missingInJson2Str = missingInJson2.mkString(", ")
-  //
-  //    val res = s"logkey: $logkey\t不同值: $differentValuesStr\tjson1 缺少的键: $missingInJson1Str\tjson2 缺少的键: $missingInJson2Str"
-  //    res
-  //  }
-
-
-  //  def func(record: Record, schema: TableSchema): Map[String, String] = {
-  //    var map: Map[String, String] = Map.empty
-  //    val columns = schema.getColumns
-  //    for (i <- 0 until columns.size()) {
-  //      val column = columns.get(i)
-  //      val name = column.getName
-  //      val value = Option(record.get(name)).map(_.toString).getOrElse("")
-  //      map += (name -> value)
-  //    }
-  //    map
-  //  }
+
   def func(record: Record, schema: TableSchema): JSONObject = {
     val featureMap = new JSONObject()
     val columns = schema.getColumns