瀏覽代碼

feat:添加分桶脚本

zhaohaipeng 2 月之前
父節點
當前提交
105b672689

+ 20 - 44
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/train_01_xgb_ad_20240808.scala

@@ -15,7 +15,7 @@ import java.util
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
-object train_01_xgb_ad_20240808{
+object train_01_xgb_ad_20240808 {
   def main(args: Array[String]): Unit = {
     val spark = SparkSession
       .builder()
@@ -64,7 +64,7 @@ object train_01_xgb_ad_20240808{
 
     var fields = Array(
       DataTypes.createStructField("label", DataTypes.IntegerType, true)
-//      DataTypes.createStructField("logKey", DataTypes.IntegerType, true)
+      //      DataTypes.createStructField("logKey", DataTypes.IntegerType, true)
 
     ) ++ features.map(f => DataTypes.createStructField(f, DataTypes.DoubleType, true))
 
@@ -74,11 +74,11 @@ object train_01_xgb_ad_20240808{
     val schema = DataTypes.createStructType(fields)
     val trainDataSet: Dataset[Row] = spark.createDataFrame(trainData, schema)
     val vectorAssembler = new VectorAssembler().setInputCols(features).setOutputCol("features")
-    val xgbInput = vectorAssembler.transform(trainDataSet).select("features","label")
-//    val xgbParam = Map("eta" -> 0.01f,
-//      "max_depth" -> 5,
-//      "objective" -> "binary:logistic",
-//      "num_class" -> 3)
+    val xgbInput = vectorAssembler.transform(trainDataSet).select("features", "label")
+    //    val xgbParam = Map("eta" -> 0.01f,
+    //      "max_depth" -> 5,
+    //      "objective" -> "binary:logistic",
+    //      "num_class" -> 3)
     val xgbClassifier = new XGBoostClassifier()
       .setEta(eta)
       .setGamma(gamma)
@@ -94,8 +94,8 @@ object train_01_xgb_ad_20240808{
       .setLabelCol("label")
       .setNthread(1)
       .setNumWorkers(num_worker)
-  .setSeed(2024)
-  .setMinChildWeight(1)
+      .setSeed(2024)
+      .setMinChildWeight(1)
     val model = xgbClassifier.fit(xgbInput)
 
 
@@ -104,14 +104,14 @@ object train_01_xgb_ad_20240808{
       features
     )
     val testDataSet = spark.createDataFrame(testData, schema)
-    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features","label", "logKey")
+    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey")
     val predictions = model.transform(testDataSetTrans)
-//     [label, features, probability, prediction, rawPrediction]
+    //     [label, features, probability, prediction, rawPrediction]
     println("zhangbo:columns:" + predictions.columns.mkString(","))
     val saveData = predictions.select("label", "rawPrediction", "probability", "logKey").rdd
-      .map(r =>{
+      .map(r => {
         (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
-    })
+      })
     val hdfsPath = savePath
     if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
       println("删除路径并开始数据写入:" + hdfsPath)
@@ -122,7 +122,6 @@ object train_01_xgb_ad_20240808{
     }
 
 
-
     val evaluator = new BinaryClassificationEvaluator()
       .setLabelCol("label")
       .setRawPredictionCol("probability")
@@ -131,16 +130,16 @@ object train_01_xgb_ad_20240808{
     println("zhangbo:auc:" + auc)
 
     // 统计分cid的分数
-    sc.textFile(hdfsPath).map(r=>{
+    sc.textFile(hdfsPath).map(r => {
       val rList = r.split("\t")
       val cid = rList(3)
       val score = rList(2).replace("[", "").replace("]", "")
         .split(",")(1).toDouble
       val label = rList(0).toDouble
       (cid, (1, label, score))
-    }).reduceByKey{
+    }).reduceByKey {
       case (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3)
-    }.map{
+    }.map {
       case (cid, (all, zheng, scores)) =>
         (cid, all, zheng, scores, zheng / all, scores / all)
     }.collect().sortBy(_._1).map(_.productIterator.mkString("\t")).foreach(println)
@@ -150,35 +149,13 @@ object train_01_xgb_ad_20240808{
 
   def createData4Ad(data: RDD[String], features: Array[String]): RDD[Row] = {
     data.map(r => {
-//      val rList = r.split("\t")
-//      val label = rList(0).toInt
-//      val featureMap = scala.collection.mutable.Map[String, Double]()
-//      var cid = -1
-//      rList.drop(1).foreach(kv =>{
-//        val kv_ = kv.split(":")
-//        if (kv_(0).startsWith("cid_")){
-//          cid = kv_(0).split("_")(1).toInt
-//        }else{
-//          featureMap.put(kv_(0), kv_(1).toDouble)
-//        }
-//      })
-//      val v: Array[Any] = new Array[Any](features.length + 1)
-//      v(0) = label
-////      v(1) = cid
-//      for (i <- 0 until features.length) {
-//        v(i + 1) = featureMap.getOrElse(r, 0.0D)
-//      }
-//      Row(v: _*)
-val line: Array[String] = StringUtils.split(r, '\t')
-      val label: Int = NumberUtils.toInt(line(0))
+
+      val line: Array[String] = StringUtils.split(r, '\t')
+      val label: Int = NumberUtils.toInt(line(1))
       val map: util.Map[String, Double] = new util.HashMap[String, Double]
-      var cid = "-1"
-      for (i <- 1 until line.length) {
+      for (i <- 2 until line.length) {
         val fv: Array[String] = StringUtils.split(line(i), ':')
         map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
-        if(fv(0).startsWith("cid_")){
-          cid = fv(0).split("_")(1)
-        }
       }
 
       val v: Array[Any] = new Array[Any](features.length + 2)
@@ -186,7 +163,6 @@ val line: Array[String] = StringUtils.split(r, '\t')
       for (i <- 0 until features.length) {
         v(i + 1) = map.getOrDefault(features(i), 0.0d)
       }
-      v(features.length + 1) = cid
       Row(v: _*)
     })
   }