xueyiming před 1 měsícem
rodič
revize
4ce8df3ec4

+ 45 - 22
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -6,6 +6,8 @@ import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods._
 
 
 /*
@@ -31,7 +33,7 @@ object diff_data_20250319 {
     val odpsData2 = odpsOps.readTable(project = "loghubods",
       table = "alg_recsys_ad_sample_all",
       partition = "dt=20250319,hh=12",
-      transfer = func,
+      transfer = func1,
       numPartition = 64)
 
 
@@ -51,27 +53,27 @@ object diff_data_20250319 {
     }
 
 
-    var result: List[String] = List.empty
-
-    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
-
-    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
-
-    // 以 logkey 为键进行转换
-    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
-    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
-
-
-    // 进行 join 操作
-    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
-
-    val firstElement = joinedRDD.first()
-    firstElement match {
-      case (logkey, (map1, map2)) =>
-        println(logkey)
-        println(map1)
-        println(map2)
-    }
+//    var result: List[String] = List.empty
+//
+//    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
+//
+//    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
+//
+//    // 以 logkey 为键进行转换
+//    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
+//    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
+//
+//
+//    // 进行 join 操作
+//    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
+//
+//    val firstElement = joinedRDD.first()
+//    firstElement match {
+//      case (logkey, (map1, map2)) =>
+//        println(logkey)
+//        println(map1)
+//        println(map2)
+//    }
 
     // 比较相同 logkey 对应的 Map 中相同键的 value
     //    joinedRDD.foreach { case (logkey, (map1, map2)) =>
@@ -131,6 +133,27 @@ object diff_data_20250319 {
     map
   }
 
+  def func1(record: Record, schema: TableSchema): Map[String, String] = {
+    implicit val formats: DefaultFormats.type = DefaultFormats
+
+    // 定义需要拼接成 logkey 的列名
+    val logKeyColumns = List("apptype", "mid", "cid", "ts", "headvideoid")
+    // 拼接 logkey
+    val logKey = logKeyColumns.map { columnName =>
+      Option(record.get(columnName)).map(_.toString).getOrElse("")
+    }.mkString(",")
+
+    // 获取 allfeaturemap 字段的值
+    val allFeatureMapJson = Option(record.get("allfeaturemap")).map(_.toString).getOrElse("{}")
+    // 将 JSON 字符串转换为 Map
+    val allFeatureMap = parse(allFeatureMapJson).extract[Map[String, String]]
+
+    // 插入 logkey 到 allFeatureMap 中
+    val updatedMap = allFeatureMap + ("logkey" -> logKey)
+
+    updatedMap
+  }
+
   private def processString(input: String): Map[String, String] = {
     // 去除多余空格并按空格分割成键值对数组
     val parts = input.trim.split("\t")