xueyiming 1 Minggu lalu
induk
melakukan
d3bea5596f

+ 201 - 10
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250422.scala

@@ -25,22 +25,213 @@ object diff_data_20250422 {
 
     // 2 读取odps+表信息
     val odpsOps = env.getODPS(sc)
-    //    val odpsData1 = odpsOps.readTable(project = "loghubods",
-    //      table = "ad_easyrec_eval_data_v3_sampled",
-    //      partition = "dt=20250319",
-    //      transfer = func,
-    //      numPartition = 64)
+    val odpsData1 = odpsOps.readTable(project = "loghubods",
+      table = "ad_easyrec_eval_data_v3_sampled",
+      partition = "dt=20250421",
+      transfer = func,
+      numPartition = 64)
 
     val odpsData2 = odpsOps.readTable(project = "loghubods",
-      table = "loghubods.ad_engine_statistics_log_per5min_new",
-      partition = "(dt BETWEEN '20250421220000' and '20250421235500')",
+      table = "loghubods.loghubods.ad_engine_statistics_log_per_linshi_dt",
+      partition = "dt=20250421",
       transfer = func1,
-      numPartition = 64)
+      numPartition = 64).map(record => {
+      val pqtid = record.getString("pqtid")
+      val allfeature: JSONObject = if (record.isNull("allfeature")) new JSONObject() else
+        JSON.parseObject(record.getString("allfeature"))
+      allfeature.put("pqtid", pqtid)
+      allfeature
+    })
+
+    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("pqtid"), map))
+    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("pqtid"), map))
+
+
+    // 进行 join 操作
+    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd2Pairs.join(rdd1Pairs)
+    val tuple = joinedRDD.first()
+    val value2 = tuple._2
+    val rdd1 = value2._1
+    val rdd2 = value2._2
+
+    //    val rdd1: JSONObject = odpsData1.first()
+    println("rdd1")
+    println(rdd1.get("logkey").toString)
+    println(rdd1.toString)
+
+    //    val rdd2: JSONObject = odpsData2.first()
+    println("rdd2")
+    println(rdd2.getString("logkey"))
+    println(rdd2.toString)
+
+    val keys = rdd1.keySet().asScala.toSet ++ rdd2.keySet().asScala.toSet
+    println("keys")
+    println(keys)
+
+    // 用于存储每个特征的总差异和比较次数
+    val featureDiffSum1 = mutable.Map[String, Double]()
+    val featureCount1 = mutable.Map[String, Int]()
+
+    keys.foreach { key =>
+      if (rdd1.containsKey(key) && rdd2.containsKey(key)) {
+        val value1 = rdd1.getString(key)
+        val value2 = rdd2.getString(key)
+
+        def tryToNumber(value: Any): Option[java.lang.Number] = {
+          value match {
+            case num: java.lang.Number => Some(num)
+            case str: String =>
+              try {
+                Some(str.toDouble)
+              } catch {
+                case _: NumberFormatException => None
+              }
+            case _ => None
+          }
+        }
+
+        (tryToNumber(value1), tryToNumber(value2)) match {
+          case (Some(num1), Some(num2)) =>
+            val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+            if (diff > 0) {
+              featureDiffSum1(key) = featureDiffSum1.getOrElse(key, 0.0) + diff
+              featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
+            }
+          case _ =>
+            val str1 = if (value1 != null) value1 else ""
+            val str2 = if (value2 != null) value2 else ""
+            if (str1 != str2) {
+              featureCount1(key) = featureCount1.getOrElse(key, 0) + 1
+            }
+        }
+      }
+    }
+
+    // 输出每个特征的平均差异
+    println("每个特征的平均差异:")
+    println(featureDiffSum1.size)
+    featureDiffSum1.foreach { case (feature, sum) =>
+      val count = featureCount1(feature)
+      val averageDiff = sum / count
+      println(s"  Feature: $feature, Average Diff: $averageDiff")
+    }
+
+    val count1 = 1
+    println(s"对比总数: $count1")
+    println("每个特征的差异率:")
+    println(featureCount1.size)
+    featureCount1.foreach { case (feature, sum) =>
+      val rateDiff = sum / count1
+      println(s"  Feature: $feature, Rate Diff: $rateDiff")
+    }
+
+    println("=====================================================================")
+
+    // 使用 map 操作生成键值对
+    val diffPairs = joinedRDD.flatMap { case (_, (map1, map2)) =>
+      val keys = map1.keySet().asScala.toSet ++ map2.keySet().asScala.toSet
+      keys.flatMap { key =>
+        if (map1.containsKey(key) && map2.containsKey(key)) {
+          val value1 = map1.getString(key)
+          val value2 = map2.getString(key)
+          (tryToNumber(value1), tryToNumber(value2)) match {
+            case (Some(num1), Some(num2)) =>
+              val diff = math.abs(num1.doubleValue() - num2.doubleValue())
+              if (diff > 0) {
+                Seq((key, (diff, 1)))
+              } else {
+                Seq.empty
+              }
+            case _ =>
+              val str1 = if (value1 != null) value1 else ""
+              val str2 = if (value2 != null) value2 else ""
+              if (str1 != str2) {
+                Seq((key, (0.0, 1)))
+              } else {
+                Seq.empty
+              }
+          }
+        } else {
+          Seq.empty
+        }
+      }
+    }
+
+    // 使用 reduceByKey 进行聚合
+    val aggregatedPairs = diffPairs.reduceByKey { case ((diffSum1, count1), (diffSum2, count2)) =>
+      (diffSum1 + diffSum2, count1 + count2)
+    }
+
+    // 收集结果
+    val result = aggregatedPairs.collect()
+
+    // 处理结果
+    val featureDiffSum = mutable.Map[String, Double]()
+    val featureCount = mutable.Map[String, Int]()
+
+    result.foreach { case (key, (diffSum, count)) =>
+      featureDiffSum(key) = diffSum
+      featureCount(key) = count
+    }
 
-    val count = odpsData2.count()
     // 打印结果
-    println("RDD的元素数量为: " + count)
+    println("featureDiffSum: " + featureDiffSum)
+    println("featureCount: " + featureCount)
+
+
+    val count2 = joinedRDD.count()
+    // 输出每个特征的平均差异
+    println("每个特征的平均差异:")
+    println(featureDiffSum.size)
+    featureDiffSum.foreach { case (feature, sum) =>
+      val count = featureCount(feature)
+      val averageDiff = sum / count
+      val rateDiff = count / count2
+      println(s"  Feature: $feature, Average Diff: $averageDiff  Rate Diff: $rateDiff")
+    }
+  }
+
+  def tryToNumber(value: Any): Option[java.lang.Number] = {
+    value match {
+      case num: java.lang.Number => Some(num)
+      case str: String =>
+        try {
+          Some(str.toDouble)
+        } catch {
+          case _: NumberFormatException => None
+        }
+      case _ => None
+    }
+
+  }
 
+  def func(record: Record, schema: TableSchema): JSONObject = {
+    val featureMap = new JSONObject()
+    val columns = schema.getColumns
+    for (i <- 0 until columns.size()) {
+      val column = columns.get(i)
+      val name = column.getName
+      var key = name.replace("_x_", "*").replace("_view", "(view)")
+      val value = record.get(name)
+      value match {
+        case byteArray: Array[Byte] =>
+          try {
+            // 将字节数组转换为字符串,使用 UTF-8 编码
+            val str = new String(byteArray, "UTF-8")
+            featureMap.put(key, str)
+          } catch {
+            case e: Exception =>
+              println(s"转换过程中出现错误: ${e.getMessage}")
+          }
+        case num: java.lang.Number =>
+          // 数字类型转换为字符串
+          val str = num.toString
+          featureMap.put(key, str)
+        case _ =>
+          featureMap.put(key, record.get(name))
+      }
+    }
+    featureMap
   }
 
   def func1(record: Record, schema: TableSchema): Record = {