Browse Source

feat:修改20240728分桶脚本

zhaohaipeng 9 months ago
parent
commit
6154189945

+ 68 - 47
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/makedata_ad_33_bucketData_20240726.scala

@@ -9,6 +9,7 @@ import org.apache.spark.sql.SparkSession
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
+
 /*
 
  */
@@ -22,6 +23,19 @@ object makedata_ad_33_bucketData_20240726 {
       .getOrCreate()
     val sc = spark.sparkContext
 
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
+    val beginStr = param.getOrElse("beginStr", "20240620")
+    val endStr = param.getOrElse("endStr", "20240620")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
+    val featureNameFile = param.getOrElse("featureNameFile", "20240718_ad_feature_name.txt");
+
+
     val loader = getClass.getClassLoader
 
     val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
@@ -37,88 +51,96 @@ object makedata_ad_33_bucketData_20240726 {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r =>{
+      .map(r => {
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
 
+    val resourceUrl = loader.getResource(featureNameFile)
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
 
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
+    println()
+    println()
+    println()
+    println(content)
+    val contentList = content.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty).toList
 
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-        val rList = r.split("\t")
-        val logKey = rList(0)
-        val labelKey = rList(1)
-        val jsons = JSON.parseObject(rList(2))
-        val features = scala.collection.mutable.Map[String, Double]()
-        jsons.foreach(r => {
-          features.put(r._1, jsons.getDoubleValue(r._1))
+      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val labelKey = rList(1)
+          val jsons = JSON.parseObject(rList(2))
+          val features = scala.collection.mutable.Map[String, Double]()
+          jsons.foreach(r => {
+            features.put(r._1, jsons.getDoubleValue(r._1))
+          })
+          (logKey, labelKey, features)
         })
-        (logKey, labelKey, features)
-      })
-        .filter{
+        .filter {
           case (logKey, labelKey, features) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
             !Set("12", "13").contains(apptype)
         }
-        .map{
+        .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
 
-            bucketsMap.foreach {
-              case (name, scorer) => {
-                if (!features.contains(name)){
-                  features.put(name, 0);
-                }
-              }
-            }
-
             (label, features)
         }
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
-          row.foreach{
+          row.foreach {
             case (label, features) =>
-              val featuresBucket = features.map{
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
-                  }
-                  if (ifFilter){
-                    ""
-                  }else{
+              val featuresBucket = new ArrayBuffer[String]()
+              for (name <- contentList) {
+                var ifFilter = false
+                if (filterNames.nonEmpty) {
+                  filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
+                    ifFilter = true
+                  })
+                }
+                if (!ifFilter) {
+                  if (features.contains(name)) {
+                    val score = features(name)
                     if (score > 1E-8) {
                       if (bucketsMap.contains(name)) {
                         val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 0.01+1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
+                        val scoreNew = 0.01 + 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        featuresBucket.add(name + ":" + scoreNew.toString)
                       } else {
-                        name + ":" + score.toString
+                        featuresBucket.add(name + ":" + score.toString)
                       }
                     } else {
-                      name + ":" + "0.01"
+                      featuresBucket.add(name + ":" + "0.01")
                     }
+
+                  } else {
+                    featuresBucket.add(name + ":" + "0.01")
                   }
-              }.filter(_.nonEmpty)
+                }
+
+              }
+
               result.add(label + "\t" + featuresBucket.mkString("\t"))
           }
+
           result.iterator
-      })
+        })
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
@@ -132,6 +154,5 @@ object makedata_ad_33_bucketData_20240726 {
     }
 
 
-
   }
 }

+ 35 - 52
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/makedata_ad_33_bucketData_20240728.scala → src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/makedata_ad_33_bucketData_20240729.scala

@@ -1,6 +1,7 @@
 package com.aliyun.odps.spark.examples.makedata_ad
 
 import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.makedata_ad.makedata_ad_33_bucketData_20240718.getClass
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
 import examples.extractor.ExtractorUtils
 import org.apache.hadoop.io.compress.GzipCodec
@@ -14,7 +15,7 @@ import scala.io.Source
 
  */
 
-object makedata_ad_33_bucketData_20240728 {
+object makedata_ad_33_bucketData_20240729 {
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession
@@ -23,7 +24,6 @@ object makedata_ad_33_bucketData_20240728 {
       .getOrCreate()
     val sc = spark.sparkContext
 
-
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
@@ -33,8 +33,6 @@ object makedata_ad_33_bucketData_20240728 {
     val repartition = param.getOrElse("repartition", "100").toInt
     val filterNames = param.getOrElse("filterNames", "").split(",").toSet
     val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-    val featureNameFile = param.getOrElse("featureNameFile", "20240718_ad_feature_name.txt");
-
 
     val loader = getClass.getClassLoader
 
@@ -51,34 +49,18 @@ object makedata_ad_33_bucketData_20240728 {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r => {
+      .map(r =>{
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
 
-    val resourceUrl = loader.getResource(featureNameFile)
-    val content =
-      if (resourceUrl != null) {
-        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
-        Source.fromURL(resourceUrl).close()
-        content
-      } else {
-        ""
-      }
-
-    println()
-    println()
-    println()
-    println(content)
-    val contentList = content.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty).toList
-
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+
+    val cidCountMap = scala.collection.mutable.Map[String, Double]()
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
           val rList = r.split("\t")
           val logKey = rList(0)
           val labelKey = rList(1)
@@ -89,56 +71,56 @@ object makedata_ad_33_bucketData_20240728 {
           })
           (logKey, labelKey, features)
         })
-        .filter {
+        .filter{
           case (logKey, labelKey, features) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
             !Set("12", "13").contains(apptype)
-        }
-        .map {
+        }.filter{
+          case (logKey, labelKey, features) =>
+            var key = ""
+            for (elem <- features) {
+              if (elem._1.contains("cid_")){
+                key = elem._1
+              }
+            }
+            val count = cidCountMap.getOrElse(key, 0) + 1
+            cidCountMap.put(key, count)
+            count > 20000
+        }.map{
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-
             (label, features)
         }
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
-          row.foreach {
+          row.foreach{
             case (label, features) =>
-              val featuresBucket = new ArrayBuffer[String]()
-              for (name <- contentList) {
-                var ifFilter = false
-                if (filterNames.nonEmpty) {
-                  filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
-                    ifFilter = true
-                  })
-                }
-                if (!ifFilter) {
-                  if (features.contains(name)) {
-                    val score = features(name)
+              val featuresBucket = features.map{
+                case (name, score) =>
+                  var ifFilter = false
+                  if (filterNames.nonEmpty){
+                    filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
+                  }
+                  if (ifFilter){
+                    ""
+                  }else{
                     if (score > 1E-8) {
                       if (bucketsMap.contains(name)) {
                         val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 0.01 + 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        featuresBucket.add(name + ":" + scoreNew.toString)
+                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        name + ":" + scoreNew.toString
                       } else {
-                        featuresBucket.add(name + ":" + score.toString)
+                        name + ":" + score.toString
                       }
                     } else {
-                      featuresBucket.add(name + ":" + "0.01")
+                      ""
                     }
-
-                  } else {
-                    featuresBucket.add(name + ":" + "0.01")
                   }
-                }
-
-              }
-
+              }.filter(_.nonEmpty)
               result.add(label + "\t" + featuresBucket.mkString("\t"))
           }
-
           result.iterator
         })
 
@@ -154,5 +136,6 @@ object makedata_ad_33_bucketData_20240728 {
     }
 
 
+
   }
 }