zhangbo há 1 ano atrás
pai
commit
86a75fe4c8

+ 43 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_04_sampleStatic.scala

@@ -0,0 +1,43 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
+import examples.dataloader.RecommRedisFeatureConstructor
+import org.apache.spark.aliyun.odps.OdpsOps
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import scala.collection.JavaConversions._
+
+
+object makedata_04_sampleStatic {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
+    val beginStr = param.getOrElse("beginStr", "20230101")
+    val endStr = param.getOrElse("endStr", "20230101")
+    val path = param.getOrElse("path", "")
+
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (date <- dateRange) {
+      val partition = partitionPrefix + date
+      val hdfsPath = path + "/" + partition
+      println("数据路径:" + hdfsPath)
+      val data = sc.textFile(hdfsPath).map(r =>{
+        (r.split("\t")(0), 1)
+      }).reduceByKey{
+        case (a, b) => a + b
+      }
+      data.collect().foreach(r=> println(r._1 + "\t" + r._2))
+    }
+
+  }
+
+}