xueyiming 4 周之前
父节点
当前提交
c311b78734

+ 71 - 56
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20250813/makedata_ad_33_bucketData_add_Feature_20250813.scala

@@ -3,10 +3,7 @@ package com.aliyun.odps.spark.examples.makedata_ad.v20250813
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConverters._
 object makedata_ad_33_bucketData_add_Feature_20250813 {
 
   def main(args: Array[String]): Unit = {
@@ -43,71 +40,89 @@ object makedata_ad_33_bucketData_add_Feature_20250813 {
         transfer = func,
         numPartition = tablePart)
 
-      val odpsDataPair: RDD[(Long, Record)] = odpsData.map(record => (record.getBigint("adverid"), record))
-      val odpsData1Pair: RDD[(Long, Record)] = odpsData1.map(record => (record.getBigint("id"), record))
-      val joinedRDD = odpsDataPair.leftOuterJoin(odpsData1Pair)
-      odpsOps.saveToTable(project, outputTable, partition, joinedRDD, write, defaultCreate = true, overwrite = true)
+      // 使用方式
+      val serializableOdpsData = odpsData.map { map =>
+        (map.get("adverid"), map)
+      }
 
+      val serializableOdpsData1 = odpsData1.map { map =>
+        (map.get("id"), map)
+      }
 
-    }
+      // 执行连接操作
+      val joined = serializableOdpsData.leftOuterJoin(serializableOdpsData1)
+
+      // 合并结果
+      val result = joined.map { case (adverid, (leftMap, rightOpt)) =>
+        val mergedMap = rightOpt match {
+          case Some(rightMap) => leftMap ++ Map("category_name" -> rightMap.getOrElse("category_name", null))
+          case None => leftMap + ("category_name" -> null)
+        }
+        mergedMap
+      }
 
+      odpsOps.saveToTable(project, outputTable, partition, result, write, defaultCreate = true, overwrite = true)
 
+    }
   }
 
-  def write(data: (Long, (Record, Option[Record])),
-            record: Record,
-            schema: TableSchema): Unit = {
-
-    // 解构输入数据
-    val (_, (leftRecord, rightRecordOpt)) = data
-
-    // 定义需要从右表获取的指定字段名(根据实际需求修改)
-    val rightTableFields = Set("category_name")
-
-    // 获取schema所有字段名
-    val allFieldNames = schema.getColumns.asScala.map(_.getName)
-
-    // 遍历所有字段
-    allFieldNames.foreach { fieldName =>
-      if (rightTableFields.contains(fieldName)) {
-        // 指定字段:尝试从右表获取
-        rightRecordOpt match {
-          case Some(rightRecord) =>
-            // 获取字段索引和类型
-            val colIndex = schema.getColumnIndex(fieldName)
-            val colType = schema.getColumn(fieldName).getTypeInfo
-
-            // 根据类型复制值
-            colType.getTypeName match {
-              case "BIGINT"   => record.setBigint(colIndex, rightRecord.getBigint(fieldName))
-              case "STRING"   => record.setString(colIndex, rightRecord.getString(fieldName))
-              case "DOUBLE"   => record.setDouble(colIndex, rightRecord.getDouble(fieldName))
-              case "BOOLEAN"  => record.setBoolean(colIndex, rightRecord.getBoolean(fieldName))
-              case _ => throw new UnsupportedOperationException(s"Unsupported type: ${colType.getTypeName}")
-            }
-
-          case None =>
-            // 右表不存在时设为null
-            record.set(fieldName, null)
+
+
+  def write(map: Map[String, String], record: Record, schema: TableSchema): Unit = {
+    for ((columnName, value) <- map) {
+      try {
+        // 查找列名在表结构中的索引
+        val columnIndex = schema.getColumnIndex(columnName.toLowerCase)
+        // 获取列的类型
+        val columnType = schema.getColumn(columnIndex).getTypeInfo
+        try {
+          columnType.getTypeName match {
+            case "STRING" =>
+              record.setString(columnIndex, value)
+            case "BIGINT" =>
+              record.setBigint(columnIndex, value.toLong)
+            case "DOUBLE" =>
+              record.setDouble(columnIndex, value.toDouble)
+            case "BOOLEAN" =>
+              record.setBoolean(columnIndex, value.toBoolean)
+            case other =>
+              throw new IllegalArgumentException(s"Unsupported column type: $other")
+          }
+        } catch {
+          case e: NumberFormatException =>
+            println(s"Error converting value $value to type ${columnType.getTypeName} for column $columnName: ${e.getMessage}")
+          case e: Exception =>
+            println(s"Unexpected error writing value $value to column $columnName: ${e.getMessage}")
         }
-      } else {
-        // 非指定字段:从左表获取
-        val colIndex = schema.getColumnIndex(fieldName)
-        val colType = schema.getColumn(fieldName).getTypeInfo
-
-        colType.getTypeName match {
-          case "BIGINT"   => record.setBigint(colIndex, leftRecord.getBigint(fieldName))
-          case "STRING"   => record.setString(colIndex, leftRecord.getString(fieldName))
-          case "DOUBLE"   => record.setDouble(colIndex, leftRecord.getDouble(fieldName))
-          case "BOOLEAN"  => record.setBoolean(colIndex, leftRecord.getBoolean(fieldName))
-          case _ => throw new UnsupportedOperationException(s"Unsupported type: ${colType.getTypeName}")
+      } catch {
+        case e: IllegalArgumentException => {
+          println(e.getMessage)
         }
       }
     }
   }
 
-  def func(record: Record, schema: TableSchema): Record = {
-    record
+  def func(record: Record, schema: TableSchema): Map[String, String] = {
+    val fieldMap = scala.collection.mutable.Map[String, String]()
+
+    for (i <- 0 until schema.getColumns.size()) {
+      val column = schema.getColumn(i)
+      val fieldName = column.getName
+      val fieldType = column.getTypeInfo.getTypeName
+
+      val value = fieldType match {
+        case "BIGINT"   => record.getBigint(i).toString
+        case "STRING"   => record.getString(i)
+        case "DOUBLE"   => record.getDouble(i).toString
+        case "BOOLEAN"  => record.getBoolean(i).toString
+        // 添加其他需要的类型
+        case _ => throw new UnsupportedOperationException(s"Unsupported type: $fieldType")
+      }
+
+      fieldMap(fieldName) = value
+    }
+
+    fieldMap.toMap
   }
 
 }