|
@@ -59,27 +59,27 @@ object diff_data_20250319 {
|
|
|
println(rdd2.toString)
|
|
|
|
|
|
|
|
|
-// var result: List[String] = List.empty
|
|
|
-//
|
|
|
-// result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
|
|
|
-//
|
|
|
-// result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
|
|
|
+ // var result: List[String] = List.empty
|
|
|
+ //
|
|
|
+ // result = result :+ "ad_easyrec_eval_data_v3_sampled size =" + odpsData1.count();
|
|
|
+ //
|
|
|
+ // result = result :+ "alg_recsys_ad_sample_all size =" + odpsData2.count();
|
|
|
|
|
|
// 以 logkey 为键进行转换
|
|
|
-// val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
|
|
|
-// val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
|
|
|
-//
|
|
|
-//
|
|
|
-// // 进行 join 操作
|
|
|
-// val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
|
|
|
-//
|
|
|
-// val firstElement = joinedRDD.first()
|
|
|
-// firstElement match {
|
|
|
-// case (logkey, (map1, map2)) =>
|
|
|
-// println(logkey)
|
|
|
-// println(map1)
|
|
|
-// println(map2)
|
|
|
-// }
|
|
|
+ // val rdd1Pairs: RDD[(String, JSONObject)] = odpsData1.map(map => (map.getString("logkey"), map))
|
|
|
+ // val rdd2Pairs: RDD[(String, JSONObject)] = odpsData2.map(map => (map.getString("logkey"), map))
|
|
|
+ //
|
|
|
+ //
|
|
|
+ // // 进行 join 操作
|
|
|
+ // val joinedRDD: RDD[(String, (JSONObject, JSONObject))] = rdd1Pairs.join(rdd2Pairs)
|
|
|
+ //
|
|
|
+ // val firstElement = joinedRDD.first()
|
|
|
+ // firstElement match {
|
|
|
+ // case (logkey, (map1, map2)) =>
|
|
|
+ // println(logkey)
|
|
|
+ // println(map1)
|
|
|
+ // println(map2)
|
|
|
+ // }
|
|
|
|
|
|
// 比较相同 logkey 对应的 Map 中相同键的 value
|
|
|
// joinedRDD.foreach { case (logkey, (map1, map2)) =>
|
|
@@ -144,7 +144,20 @@ object diff_data_20250319 {
|
|
|
for (i <- 0 until columns.size()) {
|
|
|
val column = columns.get(i)
|
|
|
val name = column.getName
|
|
|
- featureMap.put(name, record.get(name))
|
|
|
+ val value = record.get(name)
|
|
|
+ value match {
|
|
|
+ case byteArray: Array[Byte] =>
|
|
|
+ try {
|
|
|
+ // 将字节数组转换为字符串,使用 UTF-8 编码
|
|
|
+ val str = new String(byteArray, "UTF-8")
|
|
|
+ featureMap.put(name, str)
|
|
|
+ } catch {
|
|
|
+ case e: Exception =>
|
|
|
+ println(s"转换过程中出现错误: ${e.getMessage}")
|
|
|
+ }
|
|
|
+ case _ =>
|
|
|
+ featureMap.put(name, record.get(name).toString)
|
|
|
+ }
|
|
|
}
|
|
|
featureMap
|
|
|
}
|