|
@@ -0,0 +1,43 @@
|
|
|
+package com.aliyun.odps.spark.examples.makedata
|
|
|
+
|
|
|
+import com.alibaba.fastjson.{JSON, JSONObject}
|
|
|
+import com.aliyun.odps.TableSchema
|
|
|
+import com.aliyun.odps.data.Record
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import org.xm.Similarity
|
|
|
+
|
|
|
+import scala.collection.JavaConversions._
|
|
|
+import scala.collection.mutable.ArrayBuffer
|
|
|
+/*
|
|
|
+ 20240608 提取特征
|
|
|
+ */
|
|
|
+
|
|
|
+object makedata_18_mergehour2day_20240617 {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val spark = SparkSession
|
|
|
+ .builder()
|
|
|
+ .appName(this.getClass.getName)
|
|
|
+ .getOrCreate()
|
|
|
+ val sc = spark.sparkContext
|
|
|
+
|
|
|
+ // 1 读取参数
|
|
|
+ val param = ParamUtils.parseArgs(args)
|
|
|
+ val readPath = param.getOrElse("readPath", "/dw/recommend/model/16_train_data_print_online/20240615*")
|
|
|
+ val savePath = param.getOrElse("savePath", "/dw/recommend/model/16_train_data_print_online_merge/20240615/")
|
|
|
+ val repartition = param.getOrElse("repartition", "100").toInt
|
|
|
+
|
|
|
+ val data = sc.textFile(readPath)
|
|
|
+
|
|
|
+ // 4 保存数据到hdfs
|
|
|
+ val hdfsPath = savePath
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")){
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
+ data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
+ }else{
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|