xueyiming 2 mesiacov pred
rodič
commit
1978a45387

+ 21 - 11
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_32_bucket_hive_test.scala

@@ -20,7 +20,13 @@ object makedata_ad_32_bucket_hive_test {
     val spark = SparkSession
       .builder()
       .appName(this.getClass.getName)
-      .enableHiveSupport() // 启用 Hive 支持
+      .config("spark.sql.broadcastTimeout", 20 * 60)
+      .config("spark.sql.crossJoin.enabled", true)
+      .config("spark.sql.defaultCatalog","odps")
+      .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
+      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
+      .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
+      .config("spark.sql.catalogImplementation","hive")
       .getOrCreate()
     val sc = spark.sparkContext
 
@@ -1433,16 +1439,20 @@ object makedata_ad_32_bucket_hive_test {
 
 
     // 创建 DataFrame
-    val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
-    df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
-      .option("odpsUrl", "http://service.odps.aliyun.com/api")
-      .option("tunnelUrl", "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com")
-      .option("table", table)
-      .option("project", project)
-      .option("accessKeyId", "LTAIWYUujJAm7CbH")
-      .option("accessKeySecret", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P")
-      .mode("append") //覆盖写
-      .save()
+//    val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+//    df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
+//      .option("odpsUrl", "http://service.odps.aliyun.com/api")
+//      .option("tunnelUrl", "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com")
+//      .option("table", table)
+//      .option("project", project)
+//      .option("accessKeyId", "LTAIWYUujJAm7CbH")
+//      .option("accessKeySecret", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P")
+//      .mode("append") //覆盖写
+//      .save()
+
+        // 创建 DataFrame
+        val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+        df.write.mode("append").saveAsTable(table)
   }