丁云鹏 8 tháng trước cách đây
mục cha
commit
8b69b08747

+ 39 - 0
ad-engine-commons/pom.xml

@@ -89,6 +89,45 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.12.15</version>
+        </dependency>
+        <dependency>
+            <groupId>ml.dmlc</groupId>
+            <artifactId>xgboost4j-spark_2.12</artifactId>
+            <version>1.7.6</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>scala-library</artifactId>
+                    <groupId>org.scala-lang</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.12</artifactId>
+            <version>3.3.1</version>
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <artifactId>scala-library</artifactId>-->
+<!--                    <groupId>org.scala-lang</groupId>-->
+<!--                </exclusion>-->
+<!--                <exclusion>-->
+<!--                    <artifactId>hadoop-mapreduce-client-core</artifactId>-->
+<!--                    <groupId>org.apache.hadoop</groupId>-->
+<!--                </exclusion>-->
+<!--                <exclusion>-->
+<!--                    <artifactId>commons-compiler</artifactId>-->
+<!--                    <groupId>org.codehaus.janino</groupId>-->
+<!--                </exclusion>-->
+<!--                <exclusion>-->
+<!--                    <artifactId>janino</artifactId>-->
+<!--                    <groupId>org.codehaus.janino</groupId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+        </dependency>
     </dependencies>
 
 </project>

+ 1 - 1
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/AbstractScorer.java

@@ -2,8 +2,8 @@ package com.tzld.piaoquan.ad.engine.commons.score;
 
 
 import com.tzld.piaoquan.ad.engine.commons.score.model.Model;
-import com.tzld.piaoquan.ad.engine.commons.score.model.ModelManager;
 
+import com.tzld.piaoquan.ad.engine.commons.score.model.ModelManager;
 import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
 import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
 import org.apache.commons.lang3.StringUtils;

+ 1 - 1
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/model/Model.java

@@ -3,7 +3,7 @@ package com.tzld.piaoquan.ad.engine.commons.score.model;
 
 import java.io.InputStreamReader;
 
-abstract public class Model {
+public abstract class Model {
     public abstract int getModelSize();
 
     public abstract boolean loadFromStream(InputStreamReader in) throws Exception;

+ 45 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/model/XGBoostModel.java

@@ -0,0 +1,45 @@
+package com.tzld.piaoquan.ad.engine.commons.score.model;
+
+
+import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+
+public class XGBoostModel extends Model {
+    private static final Logger LOGGER = LoggerFactory.getLogger(XGBoostModel.class);
+    private XGBoostClassificationModel model;
+
+    @Override
+    public int getModelSize() {
+        if (this.model == null)
+            return 0;
+        return 1;
+    }
+
+    public void cleanModel() {
+        this.model = null;
+    }
+
+    public Float score(Map<String, String> featureMap) {
+        return 0f;
+    }
+
+    @Override
+    public boolean loadFromStream(InputStreamReader in) throws IOException {
+
+
+
+        String modelDir = "";
+        XGBoostClassificationModel model2 = XGBoostClassificationModel.load("file://" + modelDir);
+        model2.setMissing(0.0f)
+                .setFeaturesCol("features");
+        model = model2;
+        return true;
+    }
+
+}

+ 157 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/XGBoostScorer.java

@@ -0,0 +1,157 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseFMModelScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.FMModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class XGBoostScorer extends BaseFMModelScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogRovFMScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+
+    public XGBoostScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userAdFeature,
+                                    final List<AdRankItem> rankItems) {
+        throw new NoSuchMethodError();
+    }
+
+    public List<AdRankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                    final Map<String, String> userFeatureMap,
+                                    final List<AdRankItem> rankItems) {
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        FMModel model = (FMModel) this.getModel();
+
+        List<AdRankItem> result = rankByJava(sceneFeatureMap, userFeatureMap, rankItems);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<AdRankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<AdRankItem> items) {
+        long startTime = System.currentTimeMillis();
+        FMModel model = (FMModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMap, sceneFeatureMap, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<AdRankItem> items,
+                                  final Map<String, String> userFeatureMap,
+                                  final Map<String, String> sceneFeatureMap,
+                                  final FMModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMap, sceneFeatureMap);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex), ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        // 等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMap.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+    }
+
+    public double calcScore(final FMModel model,
+                            final AdRankItem item,
+                            final Map<String, String> userFeatureMap,
+                            final Map<String, String> sceneFeatureMap) {
+
+
+        Map<String, String> featureMap = new HashMap<>();
+        if (MapUtils.isNotEmpty(item.getFeatureMap())) {
+            featureMap.putAll(item.getFeatureMap());
+        }
+        if (MapUtils.isNotEmpty(userFeatureMap)) {
+            featureMap.putAll(userFeatureMap);
+        }
+        if (MapUtils.isNotEmpty(sceneFeatureMap)) {
+            featureMap.putAll(sceneFeatureMap);
+        }
+
+        double pro = 0.0;
+        if (MapUtils.isNotEmpty(featureMap)) {
+            try {
+                pro = model.score(featureMap);
+                // LOGGER.info("fea : {}, score:{}", JSONUtils.toJson(featureMap), pro);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", item.getVideoId(), ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+        item.setLrScore(pro);
+        item.getScoreMap().put("ctcvrScore", pro);
+        return pro;
+    }
+}