Browse Source

修改任务

xueyiming 2 weeks ago
parent
commit
2b4300e068

+ 122 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_hiveToHive_20250708.scala

@@ -0,0 +1,122 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
+import com.aliyun.odps.{Column, TableSchema}
+import org.apache.spark.sql.SparkSession
+
+object makedata_ad_33_hiveToHive_20250708 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "20250708")
+    val endStr = param.getOrElse("endStr", "20250516")
+    val input = param.getOrElse("input", "loghubods.ad_easyrec_train_realtime_data_v3_sampled_temp")
+    val output = param.getOrElse("output", "ad_easyrec_train_realtime_data_v3_sampled")
+
+
+    val odpsOps = env.getODPS(sc)
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (dt <- dateRange) {
+      val partition = s"dt=$dt"
+      // 读取样本表
+      val sampleRdd = odpsOps.readTable(
+        project = project,
+        table = input,
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart
+      )
+
+      // 写入表
+      odpsOps.saveToTable(
+        project,
+        output,
+        partition,
+        sampleRdd,
+        write,
+        defaultCreate = true,
+        overwrite = true
+      )
+    }
+
+
+  }
+
+  def func(record: Record, schema: TableSchema):  Map[String, String] = {
+    // 1. 获取所有列信息
+    val columns: Array[Column] = schema.getColumns.toArray(Array.empty[Column])
+    val recordMap = columns.zipWithIndex
+      .map { case (column, index) =>
+        val columnName = column.getName
+        val columnType = column.getTypeInfo.getTypeName // 获取字段类型
+
+        // 根据字段类型获取值并转换为字符串
+        val value: String = columnType match {
+          case "STRING" =>
+            val str = record.getString(index)
+            if (str == null) null else str // 字符串类型直接保留(null 保持 null)
+
+          case "BIGINT" =>
+            val num = record.getBigint(index)
+            if (num == null) null else num.toString // 长整型转字符串
+
+          case "DOUBLE" =>
+            val num = record.getDouble(index)
+            if (num == null) null else num.toString // 浮点型转字符串
+
+          case "BOOLEAN" =>
+            val bool = record.getBoolean(index)
+            if (bool == null) null else bool.toString // 布尔型转字符串("true"/"false")
+        }
+
+        columnName -> value
+      }
+      .toMap
+
+     recordMap
+  }
+
+  def write(map: Map[String, String], record: Record, schema: TableSchema): Unit = {
+    for ((columnName, value) <- map) {
+      try {
+        // 查找列名在表结构中的索引
+        val columnIndex = schema.getColumnIndex(columnName.toLowerCase)
+        // 获取列的类型
+        val columnType = schema.getColumn(columnIndex).getTypeInfo
+        try {
+          columnType.getTypeName match {
+            case "STRING" =>
+              record.setString(columnIndex, value)
+            case "BIGINT" =>
+              record.setBigint(columnIndex, value.toLong)
+            case "DOUBLE" =>
+              record.setDouble(columnIndex, value.toDouble)
+            case "BOOLEAN" =>
+              record.setBoolean(columnIndex, value.toBoolean)
+            case other =>
+              throw new IllegalArgumentException(s"Unsupported column type: $other")
+          }
+        } catch {
+          case e: NumberFormatException =>
+            println(s"Error converting value $value to type ${columnType.getTypeName} for column $columnName: ${e.getMessage}")
+          case e: Exception =>
+            println(s"Unexpected error writing value $value to column $columnName: ${e.getMessage}")
+        }
+      } catch {
+        case e: IllegalArgumentException => {
+          println(e.getMessage)
+        }
+      }
+    }
+  }
+
+
+}