Browse Source

修改任务 增加logkey

xueyiming 2 months ago
parent
commit
71c4537962

+ 3 - 3
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketData_20240718.scala

@@ -78,13 +78,13 @@ object makedata_ad_33_bucketData_20240718 {
         .map{
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
+            (logKey, label, features)
         }
         .mapPartitions(row => {
           val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
           row.foreach{
-            case (label, features) =>
+            case (logKey, label, features) =>
               val featuresBucket = features.map{
                 case (name, score) =>
                   var ifFilter = false
@@ -107,7 +107,7 @@ object makedata_ad_33_bucketData_20240718 {
                     }
                   }
               }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
+              result.add(label + "\t" + featuresBucket.mkString("\t") + "\t" + "logkey:" + logKey)
           }
           result.iterator
       })

+ 19 - 56
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketData_hive_20240718.scala

@@ -86,40 +86,25 @@ object makedata_ad_33_bucketData_hive_20240718 {
         .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
+            val bucketsMap = bucketsMap_br.value
+            var resultMap = features.collect {
+              case (name, score) if !filterNames.exists(name.contains) && score > 1E-8 =>
+                var key = name.replace("*", "_x_").replace("(view)", "_view")
+                if (key == "ad_is_click") {
+                  key = "has_click"
+                }
+                val value = if (bucketsMap.contains(name)) {
+                  val (bucketsNum, buckets) = bucketsMap(name)
+                  1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                } else {
+                  score
+                }
+                key -> value.toString
+            }.toMap
+            resultMap += ("has_conversion" -> label)
+            resultMap += ("log_key" -> logKey)
+            resultMap
         }
-        .mapPartitions(row => {
-          val bucketsMap = bucketsMap_br.value
-          row.flatMap {
-            case (label, features) =>
-              val featuresBucket = features.map {
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty) {
-                    filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
-                      ifFilter = true
-                    })
-                  }
-                  if (ifFilter) {
-                    ""
-                  } else {
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              val str = label + "\t" + featuresBucket.mkString("\t")
-              Iterator(processString(str))
-          }
-        })
 
       // 4 hive
       odpsOps.saveToTable(project, table, partition, list, write, defaultCreate = true)
@@ -128,7 +113,7 @@ object makedata_ad_33_bucketData_hive_20240718 {
 
   }
 
-  def write(map: Map[String, Any], record: Record, schema: TableSchema): Unit = {
+  def write(map: Map[String, String], record: Record, schema: TableSchema): Unit = {
     for ((columnName, value) <- map) {
       try {
         // 查找列名在表结构中的索引
@@ -163,26 +148,4 @@ object makedata_ad_33_bucketData_hive_20240718 {
   }
 
 
-  private def processString(input: String): Map[String, Any] = {
-    // 去除多余空格并按空格分割成键值对数组
-    val parts = input.trim.split("\t")
-    var resultMap = Map[String, Any]()
-    // 处理第一个元素,假设为特殊标识
-    resultMap += ("has_conversion" -> parts(0).toLong)
-    // 处理后续的键值对
-    parts.drop(1).foreach { part =>
-      part.split(":", 2) match {
-        case Array(keyStr, valueStr) =>
-          val key = keyStr.replace("*", "_x_").replace("(view)", "_view")
-          val value = try {
-            valueStr.toDouble
-          } catch {
-            case _: NumberFormatException => valueStr
-          }
-          resultMap += (key -> value)
-        case _ => // 若无法解析成键值对则丢弃
-      }
-    }
-    resultMap
-  }
 }