Explorar o código

测试获取元素

xueyiming hai 1 semana
pai
achega
d9cc326c78

+ 51 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20250422.scala

@@ -0,0 +1,51 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.alibaba.fastjson.{JSON, JSONObject}
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.env
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable
+
+
+/*
+   diff data
+ */
+
+object diff_data_20250422 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    //    val odpsData1 = odpsOps.readTable(project = "loghubods",
+    //      table = "ad_easyrec_eval_data_v3_sampled",
+    //      partition = "dt=20250319",
+    //      transfer = func,
+    //      numPartition = 64)
+
+    val odpsData2 = odpsOps.readTable(project = "loghubods",
+      table = "loghubods.ad_engine_statistics_log_per5min_new",
+      partition = "dt>=20250421220000 and dt<=20250421235500",
+      transfer = func1,
+      numPartition = 64)
+
+    val count = odpsData2.count()
+    // 打印结果
+    println("RDD的元素数量为: " + count)
+
+  }
+
+  def func1(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+
+}