丁云鹏 hace 8 meses
padre
commit
acbd3bf480

+ 22 - 0
recommend-server-service/pom.xml

@@ -243,6 +243,28 @@
             <artifactId>abtest-client</artifactId>
             <version>1.0.0</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.12.15</version>
+        </dependency>
+        <dependency>
+            <groupId>ml.dmlc</groupId>
+            <artifactId>xgboost4j-spark_2.12</artifactId>
+            <version>1.7.6</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>scala-library</artifactId>
+                    <groupId>org.scala-lang</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.12</artifactId>
+            <version>3.3.1</version>
+        </dependency>
     </dependencies>
 
 

+ 2 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -23,12 +23,13 @@ import org.springframework.scheduling.annotation.EnableScheduling;
         RedisReactiveAutoConfiguration.class
 })
 @ComponentScan({
+        "com.tzld.piaoquan.recommend.server.util",
+        "com.tzld.piaoquan.recommend.server.config",
         "com.tzld.piaoquan.recommend.server.service",
         "com.tzld.piaoquan.recommend.server.implement",
         "com.tzld.piaoquan.recommend.server.framework.utils",
         "com.tzld.piaoquan.recommend.server.grpcservice",
         "com.tzld.piaoquan.recommend.server.remote",
-        "com.tzld.piaoquan.recommend.server.config",
         "com.tzld.piaoquan.recommend.server.web",
         "com.tzld.piaoquan.recommend.server.xxl",
 })

+ 20 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/config/SparkConfig.java

@@ -0,0 +1,20 @@
+package com.tzld.piaoquan.recommend.server.config;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+@Component
+public class SparkConfig {
+
+    @PostConstruct
+    public void init() {
+        SparkConf sparkConf = new SparkConf()
+                .setMaster("local")
+                .setAppName("XGBoostPredict");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+    }
+
+}

+ 20 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/BaseXGBoostModelScorer.java

@@ -0,0 +1,20 @@
+package com.tzld.piaoquan.recommend.server.service.score;
+
+import com.tzld.piaoquan.recommend.server.service.score.model.XGBoostModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseXGBoostModelScorer extends AbstractScorer {
+
+    private static Logger LOGGER = LoggerFactory.getLogger(BaseXGBoostModelScorer.class);
+
+    public BaseXGBoostModelScorer(ScorerConfigInfo scorerConfigInfo) {
+        super(scorerConfigInfo);
+    }
+
+    @Override
+    public void loadModel() {
+        doLoadModel(XGBoostModel.class);
+    }
+}

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/ScorerUtils.java

@@ -38,6 +38,7 @@ public final class ScorerUtils {
         ScorerUtils.init("feeds_score_config_20240711.conf");
         ScorerUtils.init("feeds_score_config_20240806.conf");
         ScorerUtils.init("feeds_score_config_20240807.conf");
+        ScorerUtils.init("feeds_score_config_20240826.conf");
         ScorerUtils.init4Recall("feeds_recall_config_region_v1.conf");
         ScorerUtils.init4Recall("feeds_recall_config_region_v2.conf");
         ScorerUtils.init4Recall("feeds_recall_config_region_v3.conf");

+ 159 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/XGBoostScorer.java

@@ -0,0 +1,159 @@
+package com.tzld.piaoquan.recommend.server.service.score;
+
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.score.model.XGBoostModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class XGBoostScorer extends BaseXGBoostModelScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(XGBoostScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+
+    public XGBoostScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+        throw new NoSuchMethodError();
+    }
+
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                  final Map<String, String> userFeatureMap,
+                                  final List<RankItem> rankItems) {
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        XGBoostModel model = (XGBoostModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        XGBoostModel model = (XGBoostModel) this.getModel();
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMap, sceneFeatureMap, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, String> userFeatureMap,
+                                  final Map<String, String> sceneFeatureMap,
+                                  final XGBoostModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMap, sceneFeatureMap);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMap.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+    }
+
+    public double calcScore(final XGBoostModel model,
+                            final RankItem item,
+                            final Map<String, String> userFeatureMap,
+                            final Map<String, String> sceneFeatureMap) {
+
+
+        Map<String, String> featureMap = new HashMap<>();
+        if (MapUtils.isNotEmpty(item.getFeatureMap())) {
+            featureMap.putAll(item.getFeatureMap());
+        }
+        if (MapUtils.isNotEmpty(userFeatureMap)) {
+            featureMap.putAll(userFeatureMap);
+        }
+        if (MapUtils.isNotEmpty(sceneFeatureMap)) {
+            featureMap.putAll(sceneFeatureMap);
+        }
+
+        double pro = 0.0;
+        if (MapUtils.isNotEmpty(featureMap)) {
+            try {
+                pro = model.score(featureMap);
+                // LOGGER.info("fea : {}, score:{}", JSONUtils.toJson(featureMap), pro);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", item.getVideoId(), ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+        item.setScoreRov(pro);
+        item.getScoresMap().put("RovFMScore", pro);
+        item.setAllFeatureMap(featureMap);
+        return pro;
+    }
+}

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/model/Model.java

@@ -1,11 +1,15 @@
 package com.tzld.piaoquan.recommend.server.service.score.model;
 
 
+import java.io.InputStream;
 import java.io.InputStreamReader;
 
 abstract public class Model {
     public abstract int getModelSize();
 
     public abstract boolean loadFromStream(InputStreamReader in) throws Exception;
+    public boolean loadFromStream(InputStream is) throws Exception {
+        return loadFromStream(new InputStreamReader(is));
+    }
 }
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/model/ModelManager.java

@@ -205,7 +205,7 @@ public class ModelManager {
                         loadTask.lastModifyTime, timeStamp);
 
                 Model model = loadTask.modelClass.newInstance();
-                if (model.loadFromStream(new InputStreamReader(ossObj.getObjectContent()))) {
+                if (model.loadFromStream(ossObj.getObjectContent())) {
                     loadTask.model = model;
                     loadTask.lastModifyTime = timeStamp;
                 }

+ 92 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/model/XGBoostModel.java

@@ -0,0 +1,92 @@
+package com.tzld.piaoquan.recommend.server.service.score.model;
+
+
+import com.tzld.piaoquan.recommend.server.util.CompressUtil;
+import com.tzld.piaoquan.recommend.server.util.PropertiesUtil;
+import ml.dmlc.xgboost4j.scala.DMatrix;
+import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel;
+import org.apache.commons.lang.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+
+public class XGBoostModel extends Model {
+    private static final Logger LOGGER = LoggerFactory.getLogger(XGBoostModel.class);
+    private XGBoostClassificationModel model;
+
+    private String[] features = {
+            "cpa",
+            "b2_1h_ctr",
+            "b2_1h_ctcvr",
+            "b2_1h_cvr",
+            "b2_1h_conver",
+            "b2_1h_click",
+            "b2_1h_conver*log(view)",
+            "b2_1h_conver*ctcvr",
+            "b2_2h_ctr",
+            "b2_2h_ctcvr",
+            "b2_2h_cvr",
+            "b2_2h_conver",
+            "b2_2h_click",
+            "b2_2h_conver*log(view)",
+            "b2_2h_conver*ctcvr",
+            "b2_3h_ctr",
+            "b2_3h_ctcvr",
+            "b2_3h_cvr",
+            "b2_3h_conver",
+            "b2_3h_click",
+            "b2_3h_conver*log(view)",
+            "b2_3h_conver*ctcvr",
+            "b2_6h_ctr",
+            "b2_6h_ctcvr"
+    };
+
+    @Override
+    public int getModelSize() {
+        if (this.model == null)
+            return 0;
+        return 1;
+    }
+
+    @Override
+    public boolean loadFromStream(InputStreamReader in) throws Exception {
+        return false;
+    }
+
+    public void cleanModel() {
+        this.model = null;
+    }
+
+    public Float score(Map<String, String> featureMap) {
+
+        try {
+            float[] values = new float[features.length];
+            for (int i = 0; i < features.length; i++) {
+                float v = NumberUtils.toFloat(featureMap.getOrDefault(features[i], "0.0"), 0.0f);
+                values[i] = v;
+            }
+            DMatrix dm = new DMatrix(values, 1, features.length, 0.0f);
+            float[][] result = model._booster().predict(dm, false, 100);
+            return result[0][0];
+        } catch (Exception e) {
+            return 0f;
+        }
+    }
+
+    @Override
+    public boolean loadFromStream(InputStream in) throws Exception {
+        String modelDir = PropertiesUtil.getString("model.xgboost.path");
+        CompressUtil.decompressGzFile(in, modelDir);
+        String absolutePath =new File(modelDir).getAbsolutePath();
+        XGBoostClassificationModel model2 = XGBoostClassificationModel.load("file://" + absolutePath);
+        model2.setMissing(0.0f);
+        this.model = model2;
+        return true;
+    }
+
+}

+ 123 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/CompressUtil.java

@@ -0,0 +1,123 @@
+package com.tzld.piaoquan.recommend.server.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class CompressUtil {
+    public static void compressDirectoryToGzip(String sourceDirPath, String outputFilePath) {
+        // 创建.gz文件的输出流
+        try (OutputStream out = new FileOutputStream(outputFilePath);
+             GzipCompressorOutputStream gzipOut = new GzipCompressorOutputStream(out);
+             TarArchiveOutputStream taos = new TarArchiveOutputStream(gzipOut)) {
+
+            taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+
+            // 遍历目录
+            Files.walk(Paths.get(sourceDirPath))
+                    .filter(Files::isRegularFile)
+                    .forEach(filePath -> {
+                        try {
+                            // 为每个文件创建TarEntry
+                            TarArchiveEntry entry = new TarArchiveEntry(filePath.toFile(), filePath.toString().substring(sourceDirPath.length() + 1));
+                            taos.putArchiveEntry(entry);
+
+                            // 读取文件内容并写入TarArchiveOutputStream
+                            try (InputStream is = Files.newInputStream(filePath)) {
+                                byte[] buffer = new byte[1024];
+                                int len;
+                                while ((len = is.read(buffer)) > 0) {
+                                    taos.write(buffer, 0, len);
+                                }
+                            }
+                            // 关闭entry
+                            taos.closeArchiveEntry();
+                        } catch (IOException e) {
+                            log.error("", e);
+                        }
+                    });
+        } catch (Exception e) {
+            log.error("", e);
+        }
+    }
+
+    public static void decompressGzFile(String gzipFilePath, String destDirPath) {
+        try (InputStream gzipIn = new FileInputStream(gzipFilePath);
+             GzipCompressorInputStream gzIn = new GzipCompressorInputStream(gzipIn);
+             TarArchiveInputStream tais = new TarArchiveInputStream(gzIn)) {
+
+            TarArchiveEntry entry;
+            Files.createDirectories(Paths.get(destDirPath));
+            while ((entry = tais.getNextTarEntry()) != null) {
+                if (entry.isDirectory()) {
+                    // 如果是目录,创建目录
+                    Files.createDirectories(Paths.get(destDirPath, entry.getName()));
+                } else {
+                    // 如果是文件,创建文件并写入内容
+                    File outputFile = new File(destDirPath, entry.getName());
+                    if (!outputFile.exists()) {
+                        File parent = outputFile.getParentFile();
+                        if (!parent.exists()) {
+                            parent.mkdirs();
+                        }
+                        outputFile.createNewFile();
+                    }
+                    try (OutputStream out = new FileOutputStream(outputFile)) {
+                        byte[] buffer = new byte[1024];
+                        int len;
+                        while ((len = tais.read(buffer)) > 0) {
+                            out.write(buffer, 0, len);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("", e);
+        }
+    }
+
+    public static void decompressGzFile(InputStream gzipIn, String destDirPath) {
+        try (GzipCompressorInputStream gzIn = new GzipCompressorInputStream(gzipIn);
+             TarArchiveInputStream tais = new TarArchiveInputStream(gzIn)) {
+
+            TarArchiveEntry entry;
+            Files.createDirectories(Paths.get(destDirPath));
+            while ((entry = tais.getNextTarEntry()) != null) {
+                if (entry.isDirectory()) {
+                    // 如果是目录,创建目录
+                    Files.createDirectories(Paths.get(destDirPath, entry.getName()));
+                } else {
+                    // 如果是文件,创建文件并写入内容
+                    File outputFile = new File(destDirPath, entry.getName());
+                    if (!outputFile.exists()) {
+                        File parent = outputFile.getParentFile();
+                        if (!parent.exists()) {
+                            parent.mkdirs();
+                        }
+                        outputFile.createNewFile();
+                    }
+                    try (OutputStream out = new FileOutputStream(outputFile)) {
+                        byte[] buffer = new byte[1024];
+                        int len;
+                        while ((len = tais.read(buffer)) > 0) {
+                            out.write(buffer, 0, len);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("", e);
+        }
+    }
+}

+ 22 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/PropertiesUtil.java

@@ -0,0 +1,22 @@
+package com.tzld.piaoquan.recommend.server.util;
+
+import org.springframework.context.EnvironmentAware;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PropertiesUtil implements EnvironmentAware {
+
+
+    private static Environment environment;
+
+
+    @Override
+    public void setEnvironment(Environment environment) {
+        this.environment = environment;
+    }
+
+    public static String getString(String name) {
+        return environment.getProperty(name);
+    }
+}

+ 4 - 1
recommend-server-service/src/main/resources/application.yml

@@ -58,4 +58,7 @@ feign:
     config:
       default:
         connectTimeout: 2000
-        readTimeout: 10000
+        readTimeout: 10000
+model:
+  xgboost:
+    path: xgboost

+ 8 - 0
recommend-server-service/src/main/resources/feeds_score_config_20240826.conf

@@ -0,0 +1,8 @@
+scorer-config = {
+  lr-rov-score-config = {
+    scorer-name = "com.tzld.piaoquan.recommend.server.service.score.XGBoostScorer"
+    scorer-priority = 99
+    model-path = "zhangbo/model_xgb_1000.tar.gz"
+  }
+
+}