Parcourir la source

测试读取表数据是否正常

xueyiming il y a 2 mois
Parent
commit
b87ee14d34

+ 53 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/diff_data_20240718.scala

@@ -0,0 +1,53 @@
+package com.aliyun.odps.spark.examples.makedata_ad.v20240718
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{ParamUtils, env}
+import examples.utils.AdUtil
+import org.apache.spark.sql.SparkSession
+
+
+/*
+   diff data
+ */
+
+object diff_data_20240718 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "ad_easyrec_train_data_v1")
+    val partition = "dt=20250101"
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    val odpsData = odpsOps.readTable(project = project,
+      table = table,
+      partition = partition,
+      transfer = func,
+      numPartition = 64)
+    val randomRow = odpsData.takeSample(withReplacement = false, num = 10)
+    for (cc <- randomRow) {
+      println(cc)
+    }
+  }
+
+  def func(record: Record, schema: TableSchema): Map[String, String] = {
+    var map: Map[String, String] = Map.empty
+    val columns = schema.getColumns
+    for (i <- 0 until columns.size()) {
+      val column = columns.get(i)
+      val name = column.getName
+      val value = record.get(name)
+      map += (name, value)
+    }
+    map
+  }
+
+
+}

+ 2 - 2
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketData_logKey_20240718.scala

@@ -47,7 +47,7 @@ object makedata_ad_33_bucketData_logKey_20240718 {
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
+    val savePath = param.getOrElse("savePath", "/test/33_ad_train_data/")
     val beginStr = param.getOrElse("beginStr", "20240620")
     val endStr = param.getOrElse("endStr", "20240620")
     val repartition = param.getOrElse("repartition", "100").toInt
@@ -113,7 +113,7 @@ object makedata_ad_33_bucketData_logKey_20240718 {
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/test/")) {
         println("删除路径并开始数据写入:" + hdfsPath)
         MyHdfsUtils.delete_hdfs_path(hdfsPath)
         data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])