jch пре 4 месеци
родитељ
комит
63349d1819

+ 26 - 10
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_recsys_61_xgb_nor_hdfsfile_20241209.scala

@@ -8,7 +8,7 @@ import org.apache.spark.ml.evaluation.RegressionEvaluator
 import org.apache.spark.ml.feature.VectorAssembler
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.DataTypes
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 
 import java.util
 import scala.io.Source
@@ -64,11 +64,12 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
 
     val testDataSet = spark.createDataFrame(testData, schema)
     val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label")
-    val predictions = model.transform(testDataSetTrans).persist()
+    val predictions = model.transform(testDataSetTrans)
+    val clipPrediction = getClipData(spark, predictions).persist()
 
-    val saveData = predictions.select("label", "prediction").rdd
+    val saveData = clipPrediction.select("label", "prediction", "clipPrediction").rdd
       .map(r => {
-        (r.get(0), r.get(1)).productIterator.mkString("\t")
+        (r.get(0), r.get(1), r.get(2)).productIterator.mkString("\t")
       })
     val hdfsPath = savePath
     if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
@@ -81,16 +82,16 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
 
     val rmseEvaluator = new RegressionEvaluator()
       .setLabelCol("label")
-      .setPredictionCol("prediction")
+      .setPredictionCol("clipPrediction")
       .setMetricName("rmse")
     val maeEvaluator = new RegressionEvaluator()
       .setLabelCol("label")
-      .setPredictionCol("prediction")
+      .setPredictionCol("clipPrediction")
       .setMetricName("mae")
-    val rmse = rmseEvaluator.evaluate(predictions.select("label", "prediction"))
-    val mae = maeEvaluator.evaluate(predictions.select("label", "prediction"))
-    val mape = calMAPE(predictions.select("label", "prediction").rdd)
-    val rmsle = calRMSLE(predictions.select("label", "prediction").rdd)
+    val rmse = rmseEvaluator.evaluate(clipPrediction.select("label", "clipPrediction"))
+    val mae = maeEvaluator.evaluate(clipPrediction.select("label", "clipPrediction"))
+    val mape = calMAPE(clipPrediction.select("label", "clipPrediction").rdd)
+    val rmsle = calRMSLE(clipPrediction.select("label", "clipPrediction").rdd)
     printf("recsys nor:rmse: %.6f\n", rmse)
     printf("recsys nor:mae: %.6f\n", mae)
     printf("recsys nor:mape: %.6f\n", mape)
@@ -119,6 +120,21 @@ object pred_recsys_61_xgb_nor_hdfsfile_20241209 {
     })
   }
 
+  def getClipData(spark: SparkSession, df: DataFrame): DataFrame = {
+    import spark.implicits._
+    df.select("label", "prediction").rdd
+      .map(row => {
+        val label = row.getAs[Double]("label")
+        val prediction = row.getAs[Double]("prediction")
+        if (prediction < 1E-8) {
+          (label, prediction, 0)
+        } else {
+          (label, prediction, prediction)
+        }
+      }
+      ).toDF("label", "prediction", "clipPrediction")
+  }
+
   def calMAPE(evalRdd: RDD[Row]): Double = {
     val apeRdd = evalRdd.map(raw => {
       val label = raw.get(0).toString.toDouble