Browse Source

new label rov&nor model

jch 3 months ago
parent
commit
a6157ab29c

+ 29 - 21
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_recsys_61_xgb_rov_hdfsfile_20241209.scala

@@ -12,6 +12,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 
 
 import java.util
 import java.util
 import scala.io.Source
 import scala.io.Source
+import scala.util.Random
 
 
 object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
 object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
@@ -27,6 +28,7 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_recsys_rov_predict_data/")
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_recsys_rov_predict_data/")
     val featureFilter = param.getOrElse("featureFilter", "XXXXXX").split(",")
     val featureFilter = param.getOrElse("featureFilter", "XXXXXX").split(",")
 
 
+    val negRate = param.getOrElse("negRate", "1.0").toDouble
     val repartition = param.getOrElse("repartition", "20").toInt
     val repartition = param.getOrElse("repartition", "20").toInt
     val modelPath = param.getOrElse("modelPath", "/dw/recommend/model/61_recsys_rov_model/model_xgb")
     val modelPath = param.getOrElse("modelPath", "/dw/recommend/model/61_recsys_rov_model/model_xgb")
 
 
@@ -62,6 +64,7 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
     model.setMissing(0.0f).setFeaturesCol("features")
     model.setMissing(0.0f).setFeaturesCol("features")
 
 
     val testData = createData(
     val testData = createData(
+      negRate,
       sc.textFile(testPath),
       sc.textFile(testPath),
       features
       features
     )
     )
@@ -94,26 +97,31 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
     println("---------------------------------\n")
     println("---------------------------------\n")
   }
   }
 
 
-  def createData(data: RDD[String], features: Array[String]): RDD[Row] = {
-    data.map(r => {
-      val line: Array[String] = StringUtils.split(r, '\t')
-      val logKey = line(0)
-      val label: Int = NumberUtils.toInt(line(1))
-      val scoresMap = line(2)
-      val map: util.Map[String, Double] = new util.HashMap[String, Double]
-      for (i <- 3 until line.length) {
-        val fv: Array[String] = StringUtils.split(line(i), ':')
-        map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
-      }
-
-      val v: Array[Any] = new Array[Any](features.length + 3)
-      v(0) = label
-      for (i <- 0 until features.length) {
-        v(i + 1) = map.getOrDefault(features(i), 0.0d)
-      }
-      v(features.length + 1) = logKey
-      v(features.length + 2) = scoresMap
-      Row(v: _*)
-    })
+  def createData(negRate: Double, data: RDD[String], features: Array[String]): RDD[Row] = {
+    data.filter(r => {
+        val line: Array[String] = StringUtils.split(r, '\t')
+        val label: Int = NumberUtils.toInt(line(1))
+        label > 0 || new Random().nextDouble() <= negRate
+      })
+      .map(r => {
+        val line: Array[String] = StringUtils.split(r, '\t')
+        val logKey = line(0)
+        val label: Int = NumberUtils.toInt(line(1))
+        val scoresMap = line(2)
+        val map: util.Map[String, Double] = new util.HashMap[String, Double]
+        for (i <- 3 until line.length) {
+          val fv: Array[String] = StringUtils.split(line(i), ':')
+          map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
+        }
+
+        val v: Array[Any] = new Array[Any](features.length + 3)
+        v(0) = label
+        for (i <- 0 until features.length) {
+          v(i + 1) = map.getOrDefault(features(i), 0.0d)
+        }
+        v(features.length + 1) = logKey
+        v(features.length + 2) = scoresMap
+        Row(v: _*)
+      })
   }
   }
 }
 }