Bläddra i källkod

增加diff校验

xueyiming 1 månad sedan
förälder
incheckning
730b3c2b10

+ 31 - 33
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -1,13 +1,12 @@
 package com.aliyun.odps.spark.examples.makedata_ad.v20240718
 
+import com.alibaba.fastjson.JSON
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.rdd.RDD
+import com.aliyun.odps.spark.examples.myUtils.env
 import org.apache.spark.sql.SparkSession
-import org.json4s.DefaultFormats
-import org.json4s.jackson.JsonMethods._
+
+import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 
 /*
@@ -53,27 +52,27 @@ object diff_data_20250319 {
     }
 
 
-//    var result: List[String] = List.empty
-//
-//    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
-//
-//    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
-//
-//    // 以 logkey 为键进行转换
-//    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
-//    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
-//
-//
-//    // 进行 join 操作
-//    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
-//
-//    val firstElement = joinedRDD.first()
-//    firstElement match {
-//      case (logkey, (map1, map2)) =>
-//        println(logkey)
-//        println(map1)
-//        println(map2)
-//    }
+    //    var result: List[String] = List.empty
+    //
+    //    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
+    //
+    //    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
+    //
+    //    // 以 logkey 为键进行转换
+    //    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
+    //    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
+    //
+    //
+    //    // 进行 join 操作
+    //    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
+    //
+    //    val firstElement = joinedRDD.first()
+    //    firstElement match {
+    //      case (logkey, (map1, map2)) =>
+    //        println(logkey)
+    //        println(map1)
+    //        println(map2)
+    //    }
 
     // 比较相同 logkey 对应的 Map 中相同键的 value
     //    joinedRDD.foreach { case (logkey, (map1, map2)) =>
@@ -134,7 +133,6 @@ object diff_data_20250319 {
   }
 
   def func1(record: Record, schema: TableSchema): Map[String, String] = {
-    implicit val formats: DefaultFormats.type = DefaultFormats
 
     // 定义需要拼接成 logkey 的列名
     val logKeyColumns = List("apptype", "mid", "cid", "ts", "headvideoid")
@@ -142,16 +140,16 @@ object diff_data_20250319 {
     val logKey = logKeyColumns.map { columnName =>
       Option(record.get(columnName)).map(_.toString).getOrElse("")
     }.mkString(",")
-
     // 获取 allfeaturemap 字段的值
     val allFeatureMapJson = Option(record.get("allfeaturemap")).map(_.toString).getOrElse("{}")
-    // 将 JSON 字符串转换为 Map
-    val allFeatureMap = parse(allFeatureMapJson).extract[Map[String, String]]
-
+    val json = JSON.parseObject(allFeatureMapJson)
+    val allFeatureMap: Map[String, AnyRef] = json.asScala.toMap
     // 插入 logkey 到 allFeatureMap 中
     val updatedMap = allFeatureMap + ("logkey" -> logKey)
-
-    updatedMap
+    val stringMap: Map[String, String] = allFeatureMap.map { case (key, value) =>
+      key -> value.toString
+    }
+    stringMap
   }
 
   private def processString(input: String): Map[String, String] = {