Przeglądaj źródła

增加diff校验

xueyiming 1 miesiąc temu
rodzic
commit
6cbc2de00a

+ 22 - 21
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250319.scala

@@ -4,7 +4,9 @@ import com.alibaba.fastjson.{JSON, JSONObject}
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.env
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
+
 import java.util.Base64
 
 
@@ -56,28 +58,27 @@ object diff_data_20250319 {
     println(rdd2.toString)
 
 
+    var result: List[String] = List.empty
 
-    //    var result: List[String] = List.empty
-    //
-    //    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
-    //
-    //    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
-    //
-    //    // 以 logkey 为键进行转换
-    //    val rdd1Pairs: RDD[(String, Map[String, String])] = odpsData1.map(map => (map("logkey"), map))
-    //    val rdd2Pairs: RDD[(String, Map[String, String])] = odpsData2.map(map => ((map("apptype"), map("mid"), map("cid"), map("ts"), map("headvideoid")).productIterator.mkString(","), map))
-    //
-    //
-    //    // 进行 join 操作
-    //    val joinedRDD: RDD[(String, (Map[String, String], Map[String, String]))] = rdd1Pairs.join(rdd2Pairs)
-    //
-    //    val firstElement = joinedRDD.first()
-    //    firstElement match {
-    //      case (logkey, (map1, map2)) =>
-    //        println(logkey)
-    //        println(map1)
-    //        println(map2)
-    //    }
+    result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
+
+    result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
+
+    // 以 logkey 为键进行转换
+    val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
+    val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
+
+
+    // 进行 join 操作
+    val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
+
+    val firstElement = joinedRDD.first()
+    firstElement match {
+      case (logkey, (map1, map2)) =>
+        println(logkey)
+        println(map1)
+        println(map2)
+    }
 
     // 比较相同 logkey 对应的 Map 中相同键的 value
     //    joinedRDD.foreach { case (logkey, (map1, map2)) =>