丁云鹏 преди 10 месеца
родител
ревизия
1e128f538e

+ 35 - 46
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_16_bucketData_20240609_check.scala

@@ -1,15 +1,5 @@
 package com.aliyun.odps.spark.examples.makedata
 
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-import com.alibaba.fastjson.{JSON, JSONObject}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
 /*
 
  */
@@ -35,8 +25,8 @@ object makedata_16_bucketData_20240609_check {
       }
     println(content)
     val contentList = content.split("\n")
-      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r=> r.nonEmpty).toList
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty).toList
     val contentList_br = sc.broadcast(contentList)
 
     val resourceUrlBucket = loader.getResource("20240609_bucket_274.txt")
@@ -52,7 +42,7 @@ object makedata_16_bucketData_20240609_check {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r =>{
+      .map(r => {
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
@@ -72,16 +62,16 @@ object makedata_16_bucketData_20240609_check {
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + date).map(r=>{
+      val data = sc.textFile(readPath + date).map(r => {
         val rList = r.split("\t")
         val logKey = rList(0)
         val labelKey = rList(1)
         val features = rList(2).split(",").map(_.toDouble)
         val allFeature: JSONObject = if (rList(3).equals("\\N")) new JSONObject() else
-                                     JSON.parseObject(rList(3))
+          JSON.parseObject(rList(3))
         (logKey, labelKey, features, allFeature)
       })
-        .filter{
+        .filter {
           case (logKey, labelKey, features, allFeature) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
@@ -91,42 +81,42 @@ object makedata_16_bucketData_20240609_check {
             APPSETS.contains(apptype) && pagesource.endsWith("recommend") &&
               ABSETS.contains(abcode) && level.equals("0")
         }
-        .map{
+        .map {
           case (logKey, labelKey, features, allFeature) =>
             val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
             (label, features, allFeature)
         }
         .mapPartitions(row => {
-        val result = new ArrayBuffer[String]()
-        val contentList = contentList_br.value
-        val bucketsMap = bucketsMap_br.value
-        row.foreach{
-          case (label, features, allFeature) =>
-            val featuresBucket = contentList.indices.map(i =>{
-              val featureName = contentList(i)
-              val score = features(i)
-              // 用户
-              if (featureName.startsWith("c")) {
-                val scoreNew = allFeature.getOrDefault(featureName, "").toString
-                if (scoreNew.equals("")) {
-                  ""
-                } else {
-                  featureName + ":" + scoreNew.toString
-                }
-              } else {
-                if (score > 1E-8) {
-                  val (bucketNum, buckets) = bucketsMap(featureName)
-                  val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                  featureName + ":" + scoreNew.toString
+          val result = new ArrayBuffer[String]()
+          val contentList = contentList_br.value
+          val bucketsMap = bucketsMap_br.value
+          row.foreach {
+            case (label, features, allFeature) =>
+              val featuresBucket = contentList.indices.map(i => {
+                val featureName = contentList(i)
+                val score = features(i)
+                // 用户
+                if (featureName.startsWith("c") || featureName.startsWith("b")) {
+                  val scoreNew = allFeature.getOrDefault(featureName, "").toString
+                  if (scoreNew.equals("")) {
+                    ""
+                  } else {
+                    featureName + ":" + scoreNew
+                  }
                 } else {
-                  ""
+                  if (score > 1E-8) {
+                    val (bucketNum, buckets) = bucketsMap(featureName)
+                    val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                    featureName + ":" + scoreNew.toString
+                  } else {
+                    ""
+                  }
                 }
-              }
-            }).filter(_.nonEmpty)
-            result.add(label + "\t" + featuresBucket.mkString("\t"))
-        }
-        result.iterator
-      })
+              }).filter(_.nonEmpty)
+              result.add(label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+        })
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
@@ -140,6 +130,5 @@ object makedata_16_bucketData_20240609_check {
     }
 
 
-
   }
 }

+ 35 - 44
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_16_bucketData_20240609_check_replace_video.scala

@@ -1,13 +1,5 @@
 package com.aliyun.odps.spark.examples.makedata
 
-import com.alibaba.fastjson.{JSON, JSONObject}
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
 /*
 
  */
@@ -33,8 +25,8 @@ object makedata_16_bucketData_20240609_check_replace_video {
       }
     println(content)
     val contentList = content.split("\n")
-      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r=> r.nonEmpty).toList
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty).toList
     val contentList_br = sc.broadcast(contentList)
 
     val resourceUrlBucket = loader.getResource("20240609_bucket_274.txt")
@@ -50,7 +42,7 @@ object makedata_16_bucketData_20240609_check_replace_video {
     val bucketsMap = buckets.split("\n")
       .map(r => r.replace(" ", "").replaceAll("\n", ""))
       .filter(r => r.nonEmpty)
-      .map(r =>{
+      .map(r => {
         val rList = r.split("\t")
         (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
       }).toMap
@@ -70,16 +62,16 @@ object makedata_16_bucketData_20240609_check_replace_video {
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
-      val data = sc.textFile(readPath + date).map(r=>{
+      val data = sc.textFile(readPath + date).map(r => {
         val rList = r.split("\t")
         val logKey = rList(0)
         val labelKey = rList(1)
         val features = rList(2).split(",").map(_.toDouble)
         val allFeature: JSONObject = if (rList(3).equals("\\N")) new JSONObject() else
-                                     JSON.parseObject(rList(3))
+          JSON.parseObject(rList(3))
         (logKey, labelKey, features, allFeature)
       })
-        .filter{
+        .filter {
           case (logKey, labelKey, features, allFeature) =>
             val logKeyList = logKey.split(",")
             val apptype = logKeyList(0)
@@ -89,42 +81,42 @@ object makedata_16_bucketData_20240609_check_replace_video {
             APPSETS.contains(apptype) && pagesource.endsWith("recommend") &&
               ABSETS.contains(abcode) && level.equals("0")
         }
-        .map{
+        .map {
           case (logKey, labelKey, features, allFeature) =>
             val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
             (label, features, allFeature)
         }
         .mapPartitions(row => {
-        val result = new ArrayBuffer[String]()
-        val contentList = contentList_br.value
-        val bucketsMap = bucketsMap_br.value
-        row.foreach{
-          case (label, features, allFeature) =>
-            val featuresBucket = contentList.indices.map(i =>{
-              val featureName = contentList(i)
-              val score = features(i)
-              // 用户
-              if (featureName.startsWith("b")) {
-                val scoreNew = allFeature.getOrDefault(featureName, "").toString
-                if (scoreNew.equals("")) {
-                  ""
-                } else {
-                  featureName + ":" + scoreNew.toString
-                }
-              } else {
-                if (score > 1E-8) {
-                  val (bucketNum, buckets) = bucketsMap(featureName)
-                  val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                  featureName + ":" + scoreNew.toString
+          val result = new ArrayBuffer[String]()
+          val contentList = contentList_br.value
+          val bucketsMap = bucketsMap_br.value
+          row.foreach {
+            case (label, features, allFeature) =>
+              val featuresBucket = contentList.indices.map(i => {
+                val featureName = contentList(i)
+                val score = features(i)
+                // 用户
+                if (featureName.startsWith("b") && !featureName.equals("bit_rate")) {
+                  val scoreNew = allFeature.getOrDefault(featureName, "").toString
+                  if (scoreNew.equals("")) {
+                    ""
+                  } else {
+                    featureName + ":" + scoreNew
+                  }
                 } else {
-                  ""
+                  if (score > 1E-8) {
+                    val (bucketNum, buckets) = bucketsMap(featureName)
+                    val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                    featureName + ":" + scoreNew.toString
+                  } else {
+                    ""
+                  }
                 }
-              }
-            }).filter(_.nonEmpty)
-            result.add(label + "\t" + featuresBucket.mkString("\t"))
-        }
-        result.iterator
-      })
+              }).filter(_.nonEmpty)
+              result.add(label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+        })
 
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + date
@@ -138,6 +130,5 @@ object makedata_16_bucketData_20240609_check_replace_video {
     }
 
 
-
   }
 }