xueyiming 2 ヶ月 前
コミット
9e6369ebc8

+ 11 - 23
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_32_bucket_hive_test.scala

@@ -1,14 +1,11 @@
 package com.aliyun.odps.spark.examples.makedata_ad.v20240718
 
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.makedata_ad.v20240718.makedata_ad_31_originData_20240718.func
 import com.aliyun.odps.spark.examples.myUtils.{ParamUtils, env}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
 
 /*
 
@@ -20,13 +17,6 @@ object makedata_ad_32_bucket_hive_test {
     val spark = SparkSession
       .builder()
       .appName(this.getClass.getName)
-      .config("spark.sql.broadcastTimeout", 20 * 60)
-      .config("spark.sql.crossJoin.enabled", true)
-      .config("spark.sql.defaultCatalog","odps")
-      .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
-      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
-      .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
-      .config("spark.sql.catalogImplementation","hive")
       .getOrCreate()
     val sc = spark.sparkContext
 
@@ -1439,20 +1429,18 @@ object makedata_ad_32_bucket_hive_test {
 
 
     // 创建 DataFrame
-//    val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
-//    df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
-//      .option("odpsUrl", "http://service.odps.aliyun.com/api")
-//      .option("tunnelUrl", "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com")
-//      .option("table", table)
-//      .option("project", project)
-//      .option("accessKeyId", "LTAIWYUujJAm7CbH")
-//      .option("accessKeySecret", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P")
-//      .mode("append") //覆盖写
-//      .save()
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+
+    df.write.format("com.aliyun.odps.spark.sql")
+      .option("odpsUrl", "http://service.odps.aliyun.com/api")
+      .option("tunnelUrl", "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com")
+      .option("table", table)
+      .option("project", project)
+      .option("accessKeyId", "LTAIWYUujJAm7CbH")
+      .option("accessKeySecret", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P")
+      .mode("append") //覆盖写
+      .save()
 
-        // 创建 DataFrame
-        val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
-        df.write.mode("append").saveAsTable(table)
   }