Quellcode durchsuchen

Merge branch 'feature/zhangbo_makedata_v3' into feature/zhangbo_makedata_v2

zhangbo vor 10 Monaten
Ursprung
Commit
0427b919fa

+ 0 - 0
src/main/resources/20240609_bucket_274.txt


+ 60 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_16_bucketData_20240609.scala

@@ -0,0 +1,60 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+/*
+
+ */
+
+object makedata_16_bucketData_20240609 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val resourceUrl = loader.getResource("20240608_feature_name.txt")
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
+    println(content)
+    val contentList = content.split("\n")
+      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r=> r.nonEmpty).toList
+
+
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=20240607,hh=00")
+    val date = param.getOrElse("date", "20240607")
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/14_feature_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/15_bucket_data/")
+    val bucketNum = param.getOrElse("bucketNum", "200").toInt
+
+
+
+    // 4 保存数据到hdfs
+    val hdfsPath = savePath + "/" + date
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+}