Jelajahi Sumber

add scores map

jch 4 bulan lalu
induk
melakukan
f51bbf79f9

+ 14 - 10
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_recsys_61_xgb_nor_hdfsfile_20241209.scala

@@ -51,7 +51,8 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
       DataTypes.createStructField("label", DataTypes.DoubleType, true)
     ) ++ features.map(f => DataTypes.createStructField(f, DataTypes.DoubleType, true))
     fields = fields ++ Array(
-      DataTypes.createStructField("logKey", DataTypes.StringType, true)
+      DataTypes.createStructField("logKey", DataTypes.StringType, true),
+      DataTypes.createStructField("scoresMap", DataTypes.StringType, true)
     )
 
     val schema = DataTypes.createStructType(fields)
@@ -66,13 +67,13 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
     )
 
     val testDataSet = spark.createDataFrame(testData, schema)
-    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey")
+    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey", "scoresMap")
     val predictions = model.transform(testDataSetTrans)
     val clipPrediction = getClipData(spark, predictions).persist()
 
-    val saveData = clipPrediction.select("label", "prediction", "clipPrediction", "logKey").rdd
+    val saveData = clipPrediction.select("label", "prediction", "clipPrediction", "logKey", "scoresMap").rdd
       .map(r => {
-        (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
+        (r.get(0), r.get(1), r.get(2), r.get(3), r.get(4)).productIterator.mkString("\t")
       })
     val hdfsPath = savePath
     if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
@@ -109,36 +110,39 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
       val line: Array[String] = StringUtils.split(r, '\t')
       val logKey = line(0)
       val label: Double = NumberUtils.toDouble(line(1))
+      val scoresMap = line(2)
       val map: util.Map[String, Double] = new util.HashMap[String, Double]
-      for (i <- 2 until line.length) {
+      for (i <- 3 until line.length) {
         val fv: Array[String] = StringUtils.split(line(i), ':')
         map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
       }
 
-      val v: Array[Any] = new Array[Any](features.length + 2)
+      val v: Array[Any] = new Array[Any](features.length + 3)
       v(0) = label
       for (i <- 0 until features.length) {
         v(i + 1) = map.getOrDefault(features(i), 0.0d)
       }
       v(features.length + 1) = logKey
+      v(features.length + 2) = scoresMap
       Row(v: _*)
     })
   }
 
   def getClipData(spark: SparkSession, df: DataFrame): DataFrame = {
     import spark.implicits._
-    df.select("label", "prediction", "logKey").rdd
+    df.select("label", "prediction", "logKey", "scoresMap").rdd
       .map(row => {
         val label = row.getAs[Double]("label")
         val prediction = row.getAs[Double]("prediction")
         val logKey = row.getAs[String]("logKey")
+        val scoresMap = row.getAs[String]("scoresMap")
         if (prediction < 1E-8) {
-          (label, prediction, 0d, logKey)
+          (label, prediction, 0d, logKey, scoresMap)
         } else {
-          (label, prediction, prediction, logKey)
+          (label, prediction, prediction, logKey, scoresMap)
         }
       }
-      ).toDF("label", "prediction", "clipPrediction", "logKey")
+      ).toDF("label", "prediction", "clipPrediction", "logKey", "scoresMap")
   }
 
   def calMAPE(evalRdd: RDD[Row]): Double = {

+ 9 - 6
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_recsys_61_xgb_rov_hdfsfile_20241209.scala

@@ -51,7 +51,8 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
       DataTypes.createStructField("label", DataTypes.IntegerType, true)
     ) ++ features.map(f => DataTypes.createStructField(f, DataTypes.DoubleType, true))
     fields = fields ++ Array(
-      DataTypes.createStructField("logKey", DataTypes.StringType, true)
+      DataTypes.createStructField("logKey", DataTypes.StringType, true),
+      DataTypes.createStructField("scoresMap", DataTypes.StringType, true)
     )
 
     val schema = DataTypes.createStructType(fields)
@@ -66,12 +67,12 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
     )
 
     val testDataSet = spark.createDataFrame(testData, schema)
-    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey")
+    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey", "scoresMap")
     val predictions = model.transform(testDataSetTrans)
 
-    val saveData = predictions.select("label", "rawPrediction", "probability", "logKey").rdd
+    val saveData = predictions.select("label", "rawPrediction", "probability", "logKey", "scoresMap").rdd
       .map(r => {
-        (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
+        (r.get(0), r.get(1), r.get(2), r.get(3), r.get(4)).productIterator.mkString("\t")
       })
     val hdfsPath = savePath
     if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
@@ -98,18 +99,20 @@ object pred_recsys_61_xgb_rov_hdfsfile_20241209 {
       val line: Array[String] = StringUtils.split(r, '\t')
       val logKey = line(0)
       val label: Int = NumberUtils.toInt(line(1))
+      val scoresMap = line(2)
       val map: util.Map[String, Double] = new util.HashMap[String, Double]
-      for (i <- 2 until line.length) {
+      for (i <- 3 until line.length) {
         val fv: Array[String] = StringUtils.split(line(i), ':')
         map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
       }
 
-      val v: Array[Any] = new Array[Any](features.length + 2)
+      val v: Array[Any] = new Array[Any](features.length + 3)
       v(0) = label
       for (i <- 0 until features.length) {
         v(i + 1) = map.getOrDefault(features(i), 0.0d)
       }
       v(features.length + 1) = logKey
+      v(features.length + 2) = scoresMap
       Row(v: _*)
     })
   }

+ 2 - 1
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/train_recsys_61_xgb_nor_20241209.scala

@@ -133,8 +133,9 @@ object train_recsys_61_xgb_nor_20241209 {
       val line: Array[String] = StringUtils.split(r, '\t')
       // val logKey = line(0)
       val label: Double = NumberUtils.toDouble(line(1))
+      // val scoresMap = line(2)
       val map: util.Map[String, Double] = new util.HashMap[String, Double]
-      for (i <- 2 until line.length) {
+      for (i <- 3 until line.length) {
         val fv: Array[String] = StringUtils.split(line(i), ':')
         map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
       }

+ 2 - 1
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/train_recsys_61_xgb_rov_20241209.scala

@@ -144,8 +144,9 @@ object train_recsys_61_xgb_rov_20241209 {
         val line: Array[String] = StringUtils.split(r, '\t')
         // val logKey = line(0)
         val label: Int = NumberUtils.toInt(line(1))
+        // val scoresMap = line(2)
         val map: util.Map[String, Double] = new util.HashMap[String, Double]
-        for (i <- 2 until line.length) {
+        for (i <- 3 until line.length) {
           val fv: Array[String] = StringUtils.split(line(i), ':')
           map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
         }