Forráskód Böngészése

Update makedata_ad_33_bucketDataFromOriginToHive_20250228

StrayWarrior 1 hónapja
szülő
commit
7c14738fbd

+ 32 - 25
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250228.scala

@@ -56,6 +56,13 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
+    val denseFeatureNames = bucketsMap.keySet
+    val sparseFeatureNames = Set(
+      "cid", "adid", "adverid", "targeting_conversion",
+      "region", "city", "brand",
+      "vid", "cate1", "cate2",
+      "user_cid_click_list", "user_cid_conver_list",
+      "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d")
 
 
     // 2 读取odps+表信息
@@ -469,38 +476,35 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
               val headvideoid = record.getString("headvideoid")
               val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
               val labelKey = labels.toString()
-              val featureKey = featureMap.toString()
-
-              val mutableMap = scala.collection.mutable.Map[String, String]()
-              //6 拼接数据,保存。
-              mutableMap.put("logKey", logKey)
-              mutableMap.put("labelKey", labelKey)
-              mutableMap.put("featureKey", featureKey)
-              mutableMap
+              (logKey, labelKey, featureMap)
             })
           odpsData
         }).reduce(_ union _)
-        .map(record => {
-          val logKey = record.getOrElse("logKey", "")
-          val labelKey = record.getOrElse("labelKey", "")
-          val featureKey = record.getOrElse("featureKey", "")
-          val jsons = JSON.parseObject(featureKey)
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
+        .map{ case (logKey, labelKey, jsons) =>
+          val denseFeatures = scala.collection.mutable.Map[String, Double]()
+          val sparseFeatures = scala.collection.mutable.Map[String, String]()
+          denseFeatureNames.foreach(r => {
+            if (jsons.containsKey(r)) {
+              denseFeatures.put(r, jsons.getDoubleValue(r))
+            }
           })
-          (logKey, labelKey, features)
-        }).filter {
-          case (logKey, labelKey, features) =>
+          sparseFeatureNames.foreach(r => {
+            if (jsons.containsKey(r)) {
+              sparseFeatures.put(r, jsons.get(r).toString)
+            }
+          })
+          (logKey, labelKey, denseFeatures, sparseFeatures)
+        }.filter {
+          case (logKey, labelKey, denseFeatures, sparseFeatures) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
             !Set("12", "13").contains(apptype)
         }
         .map {
-          case (logKey, labelKey, features) =>
+          case (logKey, labelKey, denseFeatures, sparseFeatures) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
             val bucketsMap = bucketsMap_br.value
-            var resultMap = features.collect {
+            var resultMap = denseFeatures.collect {
               case (name, score) if !filterNames.exists(name.contains) && score > 1E-8 =>
                 var key = name.replace("*", "_x_").replace("(view)", "_view")
                 if (key == "ad_is_click") {
@@ -514,6 +518,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 }
                 key -> value.toString
             }.toMap
+            sparseFeatures.foreach(kv => {
+              resultMap += (kv._1 -> kv._2)
+            })
             resultMap += ("has_conversion" -> label)
             resultMap += ("logkey" -> logKey)
             (label.toInt, resultMap, Random.nextDouble())
@@ -535,13 +542,13 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
         try {
           columnType.getTypeName match {
             case "STRING" =>
-              record.setString(columnIndex, value.toString)
+              record.setString(columnIndex, value)
             case "BIGINT" =>
-              record.setBigint(columnIndex, value.toString.toLong)
+              record.setBigint(columnIndex, value.toLong)
             case "DOUBLE" =>
-              record.setDouble(columnIndex, value.toString.toDouble)
+              record.setDouble(columnIndex, value.toDouble)
             case "BOOLEAN" =>
-              record.setBoolean(columnIndex, value.toString.toBoolean)
+              record.setBoolean(columnIndex, value.toBoolean)
             case other =>
               throw new IllegalArgumentException(s"Unsupported column type: $other")
           }