xueyiming před 1 měsícem
rodič
revize
8b0cd6924f

+ 38 - 37
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -1,6 +1,6 @@
 package com.aliyun.odps.spark.examples.makedata_ad.v20240718
 
-import com.alibaba.fastjson.JSON
+import com.alibaba.fastjson.{JSON, JSONObject}
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.env
@@ -33,23 +33,30 @@ object diff_data_20250319 {
       table = "alg_recsys_ad_sample_all",
       partition = "dt=20250319,hh=12",
       transfer = func1,
-      numPartition = 64)
-
-
-    val rdd1 = odpsData1.first()
-    val rdd2 = odpsData2.first()
+      numPartition = 64).map(record => {
+
+      val ts = record.getString("ts").toInt
+      val cid = record.getString("cid")
+      val apptype = record.getString("apptype")
+      val mid = record.getString("mid")
+      val headvideoid = record.getString("headvideoid")
+      val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
+      val allfeaturemap: JSONObject = if (record.isNull("allfeaturemap")) new JSONObject() else
+        JSON.parseObject(record.getString("allfeaturemap"))
+      allfeaturemap.put("logkey", logKey)
+      allfeaturemap
+    })
+
+    val rdd1: JSONObject = odpsData1.first()
 
     println("rdd1")
-    rdd1.foreach {
-      case (key, value) =>
-        println(key + ":" + value)
-    }
+    println(rdd1.toString)
+
+    val rdd2: JSONObject = odpsData2.first()
 
     println("rdd2")
-    rdd2.foreach {
-      case (key, value) =>
-        println(key + ":" + value)
-    }
+    println(rdd2.toString)
+
 
 
     //    var result: List[String] = List.empty
@@ -120,36 +127,30 @@ object diff_data_20250319 {
 
   }
 
-  def func(record: Record, schema: TableSchema): Map[String, String] = {
-    var map: Map[String, String] = Map.empty
+  //  def func(record: Record, schema: TableSchema): Map[String, String] = {
+  //    var map: Map[String, String] = Map.empty
+  //    val columns = schema.getColumns
+  //    for (i <- 0 until columns.size()) {
+  //      val column = columns.get(i)
+  //      val name = column.getName
+  //      val value = Option(record.get(name)).map(_.toString).getOrElse("")
+  //      map += (name -> value)
+  //    }
+  //    map
+  //  }
+  def func(record: Record, schema: TableSchema): JSONObject = {
+    val featureMap = new JSONObject()
     val columns = schema.getColumns
     for (i <- 0 until columns.size()) {
       val column = columns.get(i)
       val name = column.getName
-      val value = Option(record.get(name)).map(_.toString).getOrElse("")
-      map += (name -> value)
+      featureMap.put(name, record.get(name))
     }
-    map
+    featureMap
   }
 
-  def func1(record: Record, schema: TableSchema): Map[String, String] = {
-
-    // 定义需要拼接成 logkey 的列名
-    val logKeyColumns = List("apptype", "mid", "cid", "ts", "headvideoid")
-    // 拼接 logkey
-    val logKey = logKeyColumns.map { columnName =>
-      Option(record.get(columnName)).map(_.toString).getOrElse("")
-    }.mkString(",")
-    // 获取 allfeaturemap 字段的值
-    val allFeatureMapJson = Option(record.get("allfeaturemap")).map(_.toString).getOrElse("{}")
-    val json = JSON.parseObject(allFeatureMapJson)
-    val allFeatureMap: Map[String, AnyRef] = json.asScala.toMap
-    // 插入 logkey 到 allFeatureMap 中
-    val updatedMap = allFeatureMap + ("logkey" -> logKey)
-    val stringMap: Map[String, String] = allFeatureMap.map { case (key, value) =>
-      key -> value.toString
-    }
-    stringMap
+  def func1(record: Record, schema: TableSchema): Record = {
+    record
   }
 
   private def processString(input: String): Map[String, String] = {