Forráskód Böngészése

feat:修改分桶脚本

zhaohaipeng 10 hónapja
szülő
commit
bb0ad67f80

+ 42 - 34
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/makedata_ad_33_bucketData_20240622.scala

@@ -9,6 +9,7 @@ import org.apache.spark.sql.SparkSession
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
+
 /*
 
  */
@@ -16,6 +17,17 @@ import scala.io.Source
 object makedata_ad_33_bucketData_20240622 {
   def main(args: Array[String]): Unit = {
 
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
+    val beginStr = param.getOrElse("beginStr", "20240620")
+    val endStr = param.getOrElse("endStr", "20240620")
+    val repartition = param.getOrElse("repartition", "200").toInt
+    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
+    val bucketFileName = param.getOrElse("bucketFileName", "20240624_ad_bucket_249.txt")
+    val bucketLength = param.getOrElse("bucketLength", "-1").toInt
+
     val spark = SparkSession
       .builder()
       .appName(this.getClass.getName)
@@ -24,7 +36,7 @@ object makedata_ad_33_bucketData_20240622 {
 
     val loader = getClass.getClassLoader
 
-    val resourceUrlBucket = loader.getResource("20240624_ad_bucket_249.txt")
+    val resourceUrlBucket = loader.getResource(bucketFileName)
     val buckets =
       if (resourceUrlBucket != null) {
         val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
@@ -37,43 +49,33 @@ object makedata_ad_33_bucketData_20240622 {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r =>{
+      .map(r => {
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
 
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "200").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-        val rList = r.split("\t")
-        val logKey = rList(0)
-        val labelKey = rList(1)
-        val jsons = JSON.parseObject(rList(2))
-        val features = scala.collection.mutable.Map[String, Double]()
-        jsons.foreach(r => {
-          features.put(r._1, jsons.getDoubleValue(r._1))
+      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val labelKey = rList(1)
+          val jsons = JSON.parseObject(rList(2))
+          val features = scala.collection.mutable.Map[String, Double]()
+          jsons.foreach(r => {
+            features.put(r._1, jsons.getDoubleValue(r._1))
+          })
+          (logKey, labelKey, features)
         })
-        (logKey, labelKey, features)
-      })
-        .filter{
+        .filter {
           case (logKey, labelKey, features) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
             !Set("12").contains(apptype)
         }
-        .map{
+        .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault("ad_is_conversion", "0").toString
             (label, features)
@@ -81,22 +83,29 @@ object makedata_ad_33_bucketData_20240622 {
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
-          row.foreach{
+          row.foreach {
             case (label, features) =>
-              val featuresBucket = features.map{
+              val featuresBucket = features.map {
                 case (name, score) =>
                   var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.startsWith(r)) {ifFilter = true} )
+                  if (filterNames.nonEmpty) {
+                    filterNames.foreach(r => if (!ifFilter && name.startsWith(r)) {
+                      ifFilter = true
+                    })
                   }
-                  if (ifFilter){
+                  if (ifFilter) {
                     ""
-                  }else{
+                  } else {
                     if (score > 1E-8) {
                       if (bucketsMap.contains(name)) {
                         val (_, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / (buckets.length + 1) * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
+                        if (bucketLength > 0) {
+                          val scoreNew = 1.0 / bucketLength * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                          name + ":" + scoreNew.toString
+                        } else {
+                          val scoreNew = 1.0 / (buckets.length + 1) * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                          name + ":" + scoreNew.toString
+                        }
                       } else {
                         name + ":" + score.toString
                       }
@@ -108,7 +117,7 @@ object makedata_ad_33_bucketData_20240622 {
               result.add(label + "\t" + featuresBucket.mkString("\t"))
           }
           result.iterator
-      })
+        })
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
@@ -122,6 +131,5 @@ object makedata_ad_33_bucketData_20240622 {
     }
 
 
-
   }
 }