|
@@ -1,5 +1,6 @@
|
|
|
package com.tzld.piaoquan.recommend.model
|
|
|
|
|
|
+import com.tzld.piaoquan.recommend.model.produce.util.CompressUtil
|
|
|
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
|
|
|
import org.apache.commons.lang.math.NumberUtils
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
@@ -12,7 +13,6 @@ import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession}
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
import java.util
|
|
|
-import scala.collection.mutable.ArrayBuffer
|
|
|
import scala.io.Source
|
|
|
|
|
|
object train_01_xgb_ad_20240808{
|
|
@@ -26,7 +26,7 @@ object train_01_xgb_ad_20240808{
|
|
|
val param = ParamUtils.parseArgs(args)
|
|
|
val featureFile = param.getOrElse("featureFile", "20240703_ad_feature_name.txt")
|
|
|
val trainPath = param.getOrElse("trainPath", "/dw/recommend/model/33_ad_train_data_v4/20240724")
|
|
|
- val testPath = param.getOrElse("testPath", "/dw/recommend/model/33_ad_train_data_v4/20240725")
|
|
|
+ val testPath = param.getOrElse("testPath", "")
|
|
|
val savePath = param.getOrElse("savePath", "/dw/recommend/model/34_ad_predict_data/")
|
|
|
val featureFilter = param.getOrElse("featureFilter", "XXXXXX").split(",")
|
|
|
val eta = param.getOrElse("eta", "0.01").toDouble
|
|
@@ -37,6 +37,8 @@ object train_01_xgb_ad_20240808{
|
|
|
val func_object = param.getOrElse("func_object", "binary:logistic")
|
|
|
val func_metric = param.getOrElse("func_metric", "auc")
|
|
|
val repartition = param.getOrElse("repartition", "20").toInt
|
|
|
+ val modelPath = param.getOrElse("modelPath", "/root/zhangbo/recommend-model/recommend-model-produce/models/")
|
|
|
+ val modelFile = param.getOrElse("modelFile", "model.tar.gz")
|
|
|
|
|
|
val loader = getClass.getClassLoader
|
|
|
val resourceUrl = loader.getResource(featureFile)
|
|
@@ -63,8 +65,6 @@ object train_01_xgb_ad_20240808{
|
|
|
|
|
|
var fields = Array(
|
|
|
DataTypes.createStructField("label", DataTypes.IntegerType, true)
|
|
|
-// DataTypes.createStructField("logKey", DataTypes.IntegerType, true)
|
|
|
-
|
|
|
) ++ features.map(f => DataTypes.createStructField(f, DataTypes.DoubleType, true))
|
|
|
|
|
|
fields = fields ++ Array(
|
|
@@ -74,10 +74,10 @@ object train_01_xgb_ad_20240808{
|
|
|
val trainDataSet: Dataset[Row] = spark.createDataFrame(trainData, schema)
|
|
|
val vectorAssembler = new VectorAssembler().setInputCols(features).setOutputCol("features")
|
|
|
val xgbInput = vectorAssembler.transform(trainDataSet).select("features","label")
|
|
|
-// val xgbParam = Map("eta" -> 0.01f,
|
|
|
-// "max_depth" -> 5,
|
|
|
-// "objective" -> "binary:logistic",
|
|
|
-// "num_class" -> 3)
|
|
|
+ // val xgbParam = Map("eta" -> 0.01f,
|
|
|
+ // "max_depth" -> 5,
|
|
|
+ // "objective" -> "binary:logistic",
|
|
|
+ // "num_class" -> 3)
|
|
|
val xgbClassifier = new XGBoostClassifier()
|
|
|
.setEta(eta)
|
|
|
.setGamma(gamma)
|
|
@@ -93,56 +93,63 @@ object train_01_xgb_ad_20240808{
|
|
|
.setLabelCol("label")
|
|
|
.setNthread(1)
|
|
|
.setNumWorkers(num_worker)
|
|
|
- .setSeed(2024)
|
|
|
- .setMinChildWeight(1)
|
|
|
+ .setSeed(2024)
|
|
|
+ .setMinChildWeight(1)
|
|
|
val model = xgbClassifier.fit(xgbInput)
|
|
|
|
|
|
|
|
|
- val testData = createData4Ad(
|
|
|
- sc.textFile(testPath),
|
|
|
- features
|
|
|
- )
|
|
|
- val testDataSet = spark.createDataFrame(testData, schema)
|
|
|
- val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features","label", "logKey")
|
|
|
- val predictions = model.transform(testDataSetTrans)
|
|
|
-// [label, features, probability, prediction, rawPrediction]
|
|
|
- println("zhangbo:columns:" + predictions.columns.mkString(","))
|
|
|
- val saveData = predictions.select("label", "rawPrediction", "probability", "logKey").rdd
|
|
|
- .map(r =>{
|
|
|
- (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
|
|
|
- })
|
|
|
- val hdfsPath = savePath
|
|
|
- if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
- println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
- MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
- saveData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
- } else {
|
|
|
- println("路径不合法,无法写入:" + hdfsPath)
|
|
|
+ if(modelPath.nonEmpty && modelFile.nonEmpty){
|
|
|
+ val modelPathTmp = modelPath + "/tmp"
|
|
|
+ model.write.overwrite.save("file://" + modelPathTmp)
|
|
|
+ val gzPath = modelPath + "/" + modelFile
|
|
|
+ CompressUtil.compressDirectoryToGzip(modelPath, gzPath)
|
|
|
}
|
|
|
|
|
|
+ if (testPath.nonEmpty){
|
|
|
+ val testData = createData4Ad(
|
|
|
+ sc.textFile(testPath),
|
|
|
+ features
|
|
|
+ )
|
|
|
+ val testDataSet = spark.createDataFrame(testData, schema)
|
|
|
+ val testDataSetTrans = vectorAssembler.transform(testDataSet).select("features", "label", "logKey")
|
|
|
+ val predictions = model.transform(testDataSetTrans)
|
|
|
+
|
|
|
+ println("zhangbo:columns:" + predictions.columns.mkString(","))//[label, features, probability, prediction, rawPrediction]
|
|
|
+ val saveData = predictions.select("label", "rawPrediction", "probability", "logKey").rdd
|
|
|
+ .map(r => {
|
|
|
+ (r.get(0), r.get(1), r.get(2), r.get(3)).productIterator.mkString("\t")
|
|
|
+ })
|
|
|
+ val hdfsPath = savePath
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
+ saveData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
+ } else {
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
+ }
|
|
|
|
|
|
-
|
|
|
- val evaluator = new BinaryClassificationEvaluator()
|
|
|
- .setLabelCol("label")
|
|
|
- .setRawPredictionCol("probability")
|
|
|
- .setMetricName("areaUnderROC")
|
|
|
- val auc = evaluator.evaluate(predictions.select("label", "probability"))
|
|
|
- println("zhangbo:auc:" + auc)
|
|
|
-
|
|
|
- // 统计分cid的分数
|
|
|
- sc.textFile(hdfsPath).map(r=>{
|
|
|
- val rList = r.split("\t")
|
|
|
- val cid = rList(3)
|
|
|
- val score = rList(2).replace("[", "").replace("]", "")
|
|
|
- .split(",")(1).toDouble
|
|
|
- val label = rList(0).toDouble
|
|
|
- (cid, (1, label, score))
|
|
|
- }).reduceByKey{
|
|
|
- case (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3)
|
|
|
- }.map{
|
|
|
- case (cid, (all, zheng, scores)) =>
|
|
|
- (cid, all, zheng, scores, zheng / all, scores / all)
|
|
|
- }.collect().sortBy(_._1).map(_.productIterator.mkString("\t")).foreach(println)
|
|
|
+ val evaluator = new BinaryClassificationEvaluator()
|
|
|
+ .setLabelCol("label")
|
|
|
+ .setRawPredictionCol("probability")
|
|
|
+ .setMetricName("areaUnderROC")
|
|
|
+ val auc = evaluator.evaluate(predictions.select("label", "probability"))
|
|
|
+ println("zhangbo:auc:" + auc)
|
|
|
+
|
|
|
+ // 统计分cid的分数
|
|
|
+ sc.textFile(hdfsPath).map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ val cid = rList(3)
|
|
|
+ val score = rList(2).replace("[", "").replace("]", "")
|
|
|
+ .split(",")(1).toDouble
|
|
|
+ val label = rList(0).toDouble
|
|
|
+ (cid, (1, label, score))
|
|
|
+ }).reduceByKey {
|
|
|
+ case (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3)
|
|
|
+ }.map {
|
|
|
+ case (cid, (all, zheng, scores)) =>
|
|
|
+ (cid, all, zheng, scores, zheng / all, scores / all)
|
|
|
+ }.collect().sortBy(_._1).map(_.productIterator.mkString("\t")).foreach(println)
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|