Преглед на файлове

feat:添加对单个CID打分的脚本

zhaohaipeng преди 9 месеца
родител
ревизия
158375af1b
променени са 1 файла, в които са добавени 56 реда и са изтрити 39 реда
  1. 56 39
      src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/makedata_ad_33_bucketData_20240728.scala

+ 56 - 39
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/makedata_ad_33_bucketData_20240728.scala

@@ -9,6 +9,7 @@ import org.apache.spark.sql.SparkSession
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
+
 /*
 
  */
@@ -22,6 +23,19 @@ object makedata_ad_33_bucketData_20240728 {
       .getOrCreate()
     val sc = spark.sparkContext
 
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
+    val beginStr = param.getOrElse("beginStr", "20240620")
+    val endStr = param.getOrElse("endStr", "20240620")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
+    val featureNameFile = param.getOrElse("featureNameFile", "20240718_ad_feature_name.txt");
+
+
     val loader = getClass.getClassLoader
 
     val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
@@ -37,55 +51,57 @@ object makedata_ad_33_bucketData_20240728 {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r =>{
+      .map(r => {
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
 
+    val resourceUrl = loader.getResource(featureNameFile)
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
 
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
+    println()
+    println()
+    println()
+    println(content)
+    val contentList = content.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty).toList
 
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-        val rList = r.split("\t")
-        val logKey = rList(0)
-        val labelKey = rList(1)
-        val jsons = JSON.parseObject(rList(2))
-        val features = scala.collection.mutable.Map[String, Double]()
-        jsons.foreach(r => {
-          features.put(r._1, jsons.getDoubleValue(r._1))
+      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val labelKey = rList(1)
+          val jsons = JSON.parseObject(rList(2))
+          val features = scala.collection.mutable.Map[String, Double]()
+          jsons.foreach(r => {
+            features.put(r._1, jsons.getDoubleValue(r._1))
+          })
+          (logKey, labelKey, features)
         })
-        (logKey, labelKey, features)
-      })
-        .filter{
+        .filter {
           case (logKey, labelKey, features) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
             !Set("12", "13").contains(apptype)
         }
-        .map{
+        .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
 
-            bucketsMap.foreach {
-              case (name, scorer) => {
-                if (!features.contains(name)){
-                  features.put(name, 0);
-                } else {
-                  val score = features(name)
-                  features.put(name, score + 0.01)
-                }
+            for (name <- contentList) {
+              if (!features.contains(name)) {
+                features.put(name, 0);
               }
             }
 
@@ -94,21 +110,23 @@ object makedata_ad_33_bucketData_20240728 {
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
-          row.foreach{
+          row.foreach {
             case (label, features) =>
-              val featuresBucket = features.map{
+              val featuresBucket = features.map {
                 case (name, score) =>
                   var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
+                  if (filterNames.nonEmpty) {
+                    filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
+                      ifFilter = true
+                    })
                   }
-                  if (ifFilter){
+                  if (ifFilter) {
                     ""
-                  }else{
+                  } else {
                     if (score > 1E-8) {
                       if (bucketsMap.contains(name)) {
                         val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 0.01+1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        val scoreNew = 0.01 + 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
                         name + ":" + scoreNew.toString
                       } else {
                         name + ":" + score.toString
@@ -121,7 +139,7 @@ object makedata_ad_33_bucketData_20240728 {
               result.add(label + "\t" + featuresBucket.mkString("\t"))
           }
           result.iterator
-      })
+        })
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
@@ -135,6 +153,5 @@ object makedata_ad_33_bucketData_20240728 {
     }
 
 
-
   }
 }