zhangbo 8 meses atrás
pai
commit
c5198d474c

+ 87 - 0
01-脚本记录

@@ -0,0 +1,87 @@
+spark-submit --class com.tzld.piaoquan.recommend.model.produce.xgboost.XGBoostTrain --master yarn --driver-memory 512M --executor-memory 512M --executor-cores 1 --num-executors 4 /root/recommend-model/recommend-model-produce-new.jar > ~/recommend-model/log 2>&1 &
+
+
+
+
+
+recommend-model-produce-jar-with-dependencies.jar
+
+
+
+
+
+
+
+nohup /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.produce.xgboost.XGBoostTrain \
+--master yarn --driver-memory 512M --executor-memory 512M --executor-cores 1 --num-executors 2 \
+./target/recommend-model-produce-jar-with-dependencies.jar \
+> p.log 2>&1 &
+
+nohup /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit --class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 --master yarn --driver-memory 6G --executor-memory 6G --executor-cores 1 --num-executors 32 --conf spark.yarn.executor.memoryoverhead=1024 --conf spark.shuffle.service.enabled=true --conf spark.shuffle.service.port=7337 --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.storage.memoryFraction=0.4 --conf spark.shuffle.memoryFraction=0.5 --conf spark.default.parallelism=200 ./target/recommend-model-produce-jar-with-dependencies.jar > p.log 2>&1 &
+
+
+nohup /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.train_01_xgb_ad_20240808 \
+--master yarn --driver-memory 6G --executor-memory 3G --executor-cores 1 --num-executors 160 \
+--conf spark.yarn.executor.memoryoverhead=1000 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+./target/recommend-model-produce-jar-with-dependencies.jar \
+featureFile:20240809_ad_feature_name_517.txt \
+trainPath:/dw/recommend/model/33_ad_train_data_v4/2024080[6-9],/dw/recommend/model/33_ad_train_data_v4/2024081[0-2] \
+testPath:/dw/recommend/model/33_ad_train_data_v4/20240813/ \
+savePath:/dw/recommend/model/34_ad_predict_data/20240813_1000/ \
+modelPath:/dw/recommend/model/35_ad_model/model_xgb_7day \
+eta:0.01 gamma:0.0 max_depth:5 num_round:1000 num_worker:63 \
+repartition:20 \
+> p5.log 2>&1 &
+
+0.7316512679739304 1000
+2024072[5-9],2024073[0-1],2024080[1-4]
+/dw/recommend/model/33_ad_train_data_v4/(20240725|20240726|20240727|20240728|20240729|20240730|20240731|20240801|20240802|20240803|20240804)
+
+
+nohup /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.ana_01_xgb_ad_20240809 \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+--conf spark.yarn.executor.memoryoverhead=1024 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+./target/recommend-model-produce-jar-with-dependencies.jar \
+savePath:/dw/recommend/model/34_ad_predict_data/20240805_1000/ \
+> p1.log 2>&1 &
+
+dfs -get /dw/recommend/model/35_ad_model/model_xgb_1000 ./
+tar -czvf model_xgb_1000.tar.gz -C model_xgb_1000 .
+dfs -put model_xgb_1000.tar.gz oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
+
+oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/model_xgb_1000.tar.gz
+
+
+nohup /opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.pred_01_xgb_ad_20240813 \
+--master yarn --driver-memory 6G --executor-memory 6G --executor-cores 1 --num-executors 32 \
+--conf spark.yarn.executor.memoryoverhead=1024 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+./target/recommend-model-produce-jar-with-dependencies.jar \
+featureFile:20240809_ad_feature_name_517.txt \
+savePath:/dw/recommend/model/34_ad_predict_data/case_tmp/ \
+modelPath:/dw/recommend/model/35_ad_model/model_xgb_1000 \
+> p5.log 2>&1 &

+ 170 - 0
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_01_xgb_ad_hdfsfile_20240813.scala

@@ -0,0 +1,170 @@
+package com.tzld.piaoquan.recommend.model
+
+import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostClassifier}
+import org.apache.commons.lang.math.NumberUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.ml.feature.VectorAssembler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import com.alibaba.fastjson.{JSON, JSONArray}
+import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
+
+import java.util
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+import scala.collection.mutable
+object pred_01_xgb_ad_hdfsfile_20240813{
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val featureFile = param.getOrElse("featureFile", "20240703_ad_feature_name.txt")
+    val testPath = param.getOrElse("testPath", "")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/34_ad_predict_data/")
+    val featureFilter = param.getOrElse("featureFilter", "XXXXXX").split(",")
+
+    val repartition = param.getOrElse("repartition", "20").toInt
+    val modelPath = param.getOrElse("modelPath", "/dw/recommend/model/35_ad_model/model_xgb")
+
+    val loader = getClass.getClassLoader
+    val resourceUrl = loader.getResource(featureFile)
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
+    println(content)
+
+    val features = content.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty || !featureFilter.contains(r))
+    println("features.size=" + features.length)
+
+    var fields = Array(
+      DataTypes.createStructField("label", DataTypes.IntegerType, true)
+    ) ++ features.map(f => DataTypes.createStructField(f, DataTypes.DoubleType, true))
+
+    fields = fields ++ Array(
+      DataTypes.createStructField("logKey", DataTypes.StringType, true)
+    )
+    val schema = DataTypes.createStructType(fields)
+    val vectorAssembler = new VectorAssembler().setInputCols(features).setOutputCol("features")
+
+    val model = XGBoostClassificationModel.load(modelPath)
+    model.setMissing(0.0f).setFeaturesCol("features")
+
+
+
+    val testData = createData4Ad(
+      sc.textFile(testPath),
+      features
+    )
+
+    val testDataSet = spark.createDataFrame(testData, schema)
+    val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey")
+    val predictions = model.transform(testDataSetTrans)
+
+    val saveData = predictions.select("label", "rawPrediction", "probability", "logKey").rdd
+      .map(r => {
+        (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
+      })
+    val hdfsPath = savePath
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      saveData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+
+    val evaluator = new BinaryClassificationEvaluator()
+      .setLabelCol("label")
+      .setRawPredictionCol("probability")
+      .setMetricName("areaUnderROC")
+    val auc = evaluator.evaluate(predictions.select("label", "probability"))
+    println("zhangbo:auc:" + auc)
+
+    // 统计分cid的分数
+    sc.textFile(hdfsPath).map(r => {
+      val rList = r.split("\t")
+      val cid = rList(3)
+      val score = rList(2).replace("[", "").replace("]", "")
+        .split(",")(1).toDouble
+      val label = rList(0).toDouble
+      (cid, (1, label, score))
+    }).reduceByKey {
+      case (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3)
+    }.map {
+      case (cid, (all, zheng, scores)) =>
+        (cid, all, zheng, scores, zheng / all, scores / all)
+    }.collect().sortBy(-_._2).map(_.productIterator.mkString("\t")).foreach(println)
+
+
+
+  }
+
+
+
+
+  def createData4Ad(data: RDD[String], features: Array[String]): RDD[Row] = {
+    data.map(r => {
+      val line: Array[String] = StringUtils.split(r, '\t')
+      val label: Int = NumberUtils.toInt(line(0))
+      val map: util.Map[String, Double] = new util.HashMap[String, Double]
+      var cid = "-1"
+      for (i <- 1 until line.length) {
+        val fv: Array[String] = StringUtils.split(line(i), ':')
+        map.put(fv(0), NumberUtils.toDouble(fv(1), 0.0))
+        if(fv(0).startsWith("cid_")){
+          cid = fv(0).split("_")(1)
+        }
+      }
+
+      val v: Array[Any] = new Array[Any](features.length + 2)
+      v(0) = label
+      for (i <- 0 until features.length) {
+        v(i + 1) = map.getOrDefault(features(i), 0.0d)
+      }
+      v(features.length + 1) = cid
+      Row(v: _*)
+    })
+  }
+}
+
+
+
+//rabit_timeout -> -1
+//scale_pos_weight -> 1.0
+//seed -> 0
+//handle_invalid -> error
+//features_col -> features
+//label_col -> label
+//num_workers -> 1
+//subsample -> 0.8
+//max_depth -> 5
+//probability_col -> probability
+//raw_prediction_col -> rawPrediction
+//tree_limit -> 0
+//dmlc_worker_connect_retry -> 5
+//train_test_ratio -> 1.0
+//use_external_memory -> false
+//objective -> binary:logistic
+//eval_metric -> auc
+//num_round -> 1000
+//missing -> 0.0
+//rabit_ring_reduce_threshold -> 32768
+//tracker_conf -> TrackerConf(0,python,,)
+//eta -> 0.009999999776482582
+//colsample_bytree -> 0.8
+//allow_non_zero_for_missing -> false
+//nthread -> 8
+//prediction_col -> prediction

+ 7 - 5
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_01_xgb_ad_20240813.scala → recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/pred_01_xgb_ad_jsonfile_20240813.scala

@@ -1,19 +1,21 @@
 package com.tzld.piaoquan.recommend.model
 
-import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostClassifier}
+import com.alibaba.fastjson.{JSON, JSONArray}
+import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel
 import org.apache.commons.lang.math.NumberUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.ml.feature.VectorAssembler
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.DataTypes
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import com.alibaba.fastjson.{JSON, JSONArray}
+import org.apache.spark.sql.{Row, SparkSession}
+
 import java.util
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.collection.mutable
-object pred_01_xgb_ad_20240813{
+
+object pred_01_xgb_ad_jsonfile_20240813{
   def main(args: Array[String]): Unit = {
     val spark = SparkSession
       .builder()

+ 1 - 1
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/train_01_xgb_ad_20240808.scala

@@ -147,7 +147,7 @@ object train_01_xgb_ad_20240808{
       }.map {
         case (cid, (all, zheng, scores)) =>
           (cid, all, zheng, scores, zheng / all, scores / all)
-      }.collect().sortBy(_._1).map(_.productIterator.mkString("\t")).foreach(println)
+      }.collect().sortBy(-_._2).map(_.productIterator.mkString("\t")).foreach(println)
     }
 
   }