Explorar o código

推荐样本生产-特征分桶

zhangbo hai 10 meses
pai
achega
6a7975e174

+ 101 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_15_bucket_20240608.scala

@@ -0,0 +1,101 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+/*
+
+ */
+
+object makedata_15_bucket_20240608 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val resourceUrl = loader.getResource("20240608_feature_name.txt")
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
+    println(content)
+    val contentList = content.split("\n")
+      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r=> r.nonEmpty).toList
+
+
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=20240607,hh=00")
+    val date = param.getOrElse("date", "20240607")
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/14_feature_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/15_bucket_data/")
+    val bucketNum = param.getOrElse("bucketNum", "999").toInt
+
+    val data = sc.textFile(readPath + partitionPrefix)
+    val data1 = data.map(r => {
+      val rList = r.split("\t")
+      val doubles = rList(1).split(",").map(_.toDouble)
+      doubles
+    })
+
+    val result = new ArrayBuffer[String]()
+
+    for (i <- contentList.indices){
+      println("特征:" + contentList(i))
+      val data2 = data1.map(r => r(i)).filter(_ > 1E-8).collect().sorted
+      if (data2.map(_.toString).toSet.size < bucketNum*10){
+        println("无法分桶:" + data2.map(_.toString).toSet.size.toString)
+      }else{
+        val len = data2.length
+        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
+        val buffers = new ArrayBuffer[Double]()
+
+        var lastBucketValue = data2(0) // 记录上一个桶的切分点
+        for (j <- 0 until len by oneBucketNum) {
+          val d = data2(j)
+          if (j > 0 && d != lastBucketValue) {
+            // 如果当前切分点不同于上一个切分点,则保存当前切分点
+            buffers += d
+          }
+          lastBucketValue = d // 更新上一个桶的切分点
+        }
+
+        // 最后一个桶的结束点应该是数组的最后一个元素
+        if (!buffers.contains(data2.last)) {
+          buffers += data2.last
+        }
+        result.add(contentList(i) + "\t" + buffers.mkString(","))
+      }
+    }
+    val data3 = sc.parallelize(result)
+
+
+    // 4 保存数据到hdfs
+    val hdfsPath = savePath + "/" + date
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+}

+ 10 - 2
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -72,6 +72,14 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 tablePart:32 \
-beginStr:2024060700 endStr:2024060700 \
+beginStr:2024060701 endStr:2024060701 \
 table:alg_recsys_sample_all \
-> p13_data.log 2>&1 &
+> p13_data.log 2>&1 &
+
+
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_14_valueData_20240608 \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+partitionPrefix:dt=20240607 date:20240607 \
+> p14_data.log 2>&1 &