|
@@ -47,10 +47,15 @@ object makedata_ad_33_addFeatureToHive_20250708 {
|
|
|
val featurePairRdd = featureRdd.map(row => (row._1, row)) // (mid, 完整特征元组)
|
|
|
|
|
|
// 3. 基于 mid 关联两个 RDD,并合并 Map
|
|
|
- val recordRdd = samplePairRdd.join(featurePairRdd).map {
|
|
|
- case (mid, (sampleMap, featureMap)) =>
|
|
|
- val sampleData = sampleMap._2 // 提取 sampleMap 的第二个元素
|
|
|
- val featureData = featureMap._2 // 提取 featureMap 的第二个元素
|
|
|
+ val recordRdd = samplePairRdd.leftOuterJoin(featurePairRdd).map {
|
|
|
+ case (mid, (sampleMap, featureOption)) =>
|
|
|
+ val sampleData = sampleMap._2 // 提取样本数据的 Map
|
|
|
+
|
|
|
+ // 处理特征数据(可能不存在)
|
|
|
+ val featureData = featureOption match {
|
|
|
+ case Some(featureMap) => featureMap._2 // 有匹配的特征数据
|
|
|
+ case None => Map.empty[String, String] // 无匹配数据,使用空 Map
|
|
|
+ }
|
|
|
|
|
|
// 合并两个 Map,重复键以 featureData 为准
|
|
|
val mergedMap = sampleData ++ featureData
|
|
@@ -62,7 +67,7 @@ object makedata_ad_33_addFeatureToHive_20250708 {
|
|
|
|
|
|
}
|
|
|
|
|
|
- def func(record: Record, schema: TableSchema): (Any, Map[String, String]) = {
|
|
|
+ def func(record: Record, schema: TableSchema): (String, Map[String, String]) = {
|
|
|
// 1. 获取所有列信息
|
|
|
val columns: Array[Column] = schema.getColumns.toArray(Array.empty[Column])
|
|
|
|
|
@@ -79,8 +84,9 @@ object makedata_ad_33_addFeatureToHive_20250708 {
|
|
|
throw new IllegalArgumentException("表中不存在 'mid' 字段,请检查字段名")
|
|
|
}
|
|
|
|
|
|
- // 4. 提取 mid 的值,保留 null(不转换为空字符串)
|
|
|
- val mid = record.get(midIndex)
|
|
|
+ val mid = Option(record.get(midIndex))
|
|
|
+ .map(_.toString) // 非 null 值转为字符串
|
|
|
+ .getOrElse("") // null 值返回空字符串(或其他默认值)
|
|
|
|
|
|
// 5. 将 Record 转换为 Map[String, String](跳过 mid 字段)
|
|
|
val recordMap = columns.zipWithIndex
|