浏览代码

统计diff差异

xueyiming 2 周之前
父节点
当前提交
c8b9ec1774

+ 159 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_diffFeature_20250708.scala

@@ -0,0 +1,159 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
+import com.aliyun.odps.{Column, TableSchema}
+import org.apache.spark.sql.SparkSession
+
+object makedata_ad_33_diffFeature_20250708 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "20250708")
+    val endStr = param.getOrElse("endStr", "20250708")
+
+    val odpsOps = env.getODPS(sc)
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (dt <- dateRange) {
+      val partition = s"dt=$dt"
+      // 读取样本表
+      val sampleRdd = odpsOps.readTable(
+        project = project,
+        table = "ad_easyrec_train_realtime_data_v3_sampled",
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart
+      )
+
+      // 读取特征表,并按 mid 去重(保留最后一条)
+      val sampleRdd1 = odpsOps.readTable(
+        project = project,
+        table = "ad_easyrec_train_realtime_data_v3_sampled_temp",
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart
+      )
+
+
+      val recordRdd = sampleRdd.join(sampleRdd1).map {
+        case (patId, (sampleMap, sample1Map)) =>
+          // 合并两个 Map 的所有键
+          val allKeys = sampleMap.keySet ++ sample1Map.keySet
+
+          // 统计每个键在两个 Map 中的值差异
+          val diffCounts = allKeys.map { key =>
+            val valueInSample = sampleMap.get(key)
+            val valueInSample1 = sample1Map.get(key)
+
+            // 判断值是否不同(包括存在性和实际值)
+            val isDifferent = (valueInSample, valueInSample1) match {
+              case (None, Some(_)) => true  // 键只存在于 sample1Map
+              case (Some(_), None) => true  // 键只存在于 sampleMap
+              case (Some(v1), Some(v2)) => v1 != v2  // 键存在但值不同
+              case _ => false  // 都不存在或都相同
+            }
+
+            (key, if (isDifferent) 1 else 0)  // 不同则计为1,相同则计为0
+          }.toMap
+
+          // 返回 patId 和差异统计结果
+          (patId, diffCounts)
+      }
+
+      // 按 key 聚合所有差异数量
+      val keyDiffCounts = recordRdd
+        .flatMap { case (_, diffCounts) => diffCounts }  // 展开所有差异统计
+        .reduceByKey(_ + _)  // 按 key 累加差异数量
+
+      // 输出结果
+      keyDiffCounts.collect().foreach { case (key, count) =>
+        println(s"Key: $key, 差异数量: $count")
+      }
+
+
+    }
+
+
+  }
+
+  def func(record: Record, schema: TableSchema): (String, Map[String, String]) = {
+    // 1. 获取所有列信息
+    val columns: Array[Column] = schema.getColumns.toArray(Array.empty[Column])
+
+    // 2. 遍历列,找到 "mid" 字段的索引
+    var pqtIdIndex = -1
+    for (i <- columns.indices) {
+      if (columns(i).getName == "pqtid") {
+        pqtIdIndex = i
+      }
+    }
+
+    // 3. 检查 mid 字段是否存在
+    if (pqtIdIndex == -1) {
+      throw new IllegalArgumentException("表中不存在 'mid' 字段,请检查字段名")
+    }
+
+    val pqtId = Option(record.get(pqtIdIndex))
+      .map(_.toString) // 非 null 值转为字符串
+      .getOrElse("") // null 值返回空字符串(或其他默认值)
+
+    // 5. 将 Record 转换为 Map[String, String](跳过 mid 字段)
+    val recordMap = columns.zipWithIndex
+      .map { case (column, index) =>
+        // 获取字段值,保留 null(不转换为空字符串)
+        val value: String = record.get(index) match {
+          case null => null // 保留 null 值
+          case value => value.toString // 非 null 值转换为字符串
+        }
+
+        column.getName -> value
+      }
+      .toMap
+
+    // 6. 返回 (mid, Map[String, String])
+    (pqtId, recordMap)
+  }
+
+  def write(map: Map[String, String], record: Record, schema: TableSchema): Unit = {
+    for ((columnName, value) <- map) {
+      try {
+        // 查找列名在表结构中的索引
+        val columnIndex = schema.getColumnIndex(columnName.toLowerCase)
+        // 获取列的类型
+        val columnType = schema.getColumn(columnIndex).getTypeInfo
+        try {
+          columnType.getTypeName match {
+            case "STRING" =>
+              record.setString(columnIndex, value)
+            case "BIGINT" =>
+              record.setBigint(columnIndex, value.toLong)
+            case "DOUBLE" =>
+              record.setDouble(columnIndex, value.toDouble)
+            case "BOOLEAN" =>
+              record.setBoolean(columnIndex, value.toBoolean)
+            case other =>
+              throw new IllegalArgumentException(s"Unsupported column type: $other")
+          }
+        } catch {
+          case e: NumberFormatException =>
+            println(s"Error converting value $value to type ${columnType.getTypeName} for column $columnName: ${e.getMessage}")
+          case e: Exception =>
+            println(s"Unexpected error writing value $value to column $columnName: ${e.getMessage}")
+        }
+      } catch {
+        case e: IllegalArgumentException => {
+          println(e.getMessage)
+        }
+      }
+    }
+  }
+
+
+}