Browse Source

ocpm bid engine score v1

sunmingze 1 year ago
parent
commit
8a0edcf34b
16 changed files with 805 additions and 14 deletions
  1. 1 1
      ad-engine-commons/pom.xml
  2. 19 0
      ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/BaseMergeScorer.java
  3. 20 0
      ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/BaseThompsonSamplingScorer.java
  4. 5 0
      ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/ScorerUtils.java
  5. 96 0
      ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/model/ThompsonSamplingModel.java
  6. 1 1
      ad-engine-server/pom.xml
  7. 22 1
      ad-engine-server/src/main/resources/feeds_score_config_baseline.conf
  8. 14 0
      ad-engine-server/src/main/resources/feeds_score_config_thompson.conf
  9. 3 1
      ad-engine-service/pom.xml
  10. 1 10
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdCtrLRScorer.java
  11. 181 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdCvrLRScorer.java
  12. 157 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdThompsonScorer.java
  13. 80 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogCtrCalibretionScorer.java
  14. 81 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogCvrCalibretionScorer.java
  15. 66 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogMergeEcpmScorer.java
  16. 58 0
      ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/impl/RankServiceThompsonImpl.java

+ 1 - 1
ad-engine-commons/pom.xml

@@ -10,7 +10,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>ad-engine-commons</artifactId>
-    <version>1.0.2</version>
+    <version>1.0.3</version>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>1.8</maven.compiler.source>

+ 19 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/BaseMergeScorer.java

@@ -0,0 +1,19 @@
+package com.tzld.piaoquan.ad.engine.commons.score;
+
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseMergeScorer extends AbstractScorer {
+
+    private static Logger LOGGER = LoggerFactory.getLogger(BaseMergeScorer.class);
+
+    public BaseMergeScorer(ScorerConfigInfo scorerConfigInfo) {
+        super(scorerConfigInfo);
+    }
+
+    @Override
+    public void loadModel() {
+    }
+}

+ 20 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/BaseThompsonSamplingScorer.java

@@ -0,0 +1,20 @@
+package com.tzld.piaoquan.ad.engine.commons.score;
+
+import com.tzld.piaoquan.ad.engine.commons.score.model.ThompsonSamplingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BaseThompsonSamplingScorer extends AbstractScorer {
+
+    private static Logger LOGGER = LoggerFactory.getLogger(BaseThompsonSamplingScorer.class);
+
+    public BaseThompsonSamplingScorer(ScorerConfigInfo scorerConfigInfo) {
+        super(scorerConfigInfo);
+    }
+
+    @Override
+    public void loadModel() {
+        doLoadModel(ThompsonSamplingModel.class);
+    }
+}

+ 5 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/ScorerUtils.java

@@ -23,9 +23,14 @@ public final class ScorerUtils {
 
     public static String BASE_CONF = "feeds_score_config_baseline.conf";
 
+    public static String THOMPSON_CONF = "feeds_score_config_baseline.conf";
+
+
     public static void warmUp() {
         log.info("scorer warm up ");
         ScorerUtils.init(BASE_CONF);
+        ScorerUtils.init(THOMPSON_CONF);
+
     }
 
     private ScorerUtils() {

+ 96 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/score/model/ThompsonSamplingModel.java

@@ -0,0 +1,96 @@
+package com.tzld.piaoquan.ad.engine.commons.score.model;
+
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdActionFeature;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
+import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import it.unimi.dsi.fastutil.longs.Long2FloatMap;
+import it.unimi.dsi.fastutil.longs.Long2FloatOpenHashMap;
+import org.apache.commons.math3.distribution.BetaDistribution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class ThompsonSamplingModel extends Model {
+    protected static final int MODEL_FIRST_LOAD_COUNT = 1 << 25;  // 32M
+    private static final Logger LOGGER = LoggerFactory.getLogger(ThompsonSamplingModel.class);
+
+    // key = adid, value = <exp, click, conversation>
+    private Map<Long, AdActionFeature> thompsonSamplingModel;
+
+    public ThompsonSamplingModel() {
+        //配置不同环境的hdfs conf
+        this.thompsonSamplingModel = new HashMap<>();
+    }
+
+    public Map<Long, AdActionFeature> getLrModel() {
+        return this.thompsonSamplingModel;
+    }
+
+
+    public void putFeature(Map<Long, AdActionFeature> model, Long key, Double expose, Double click, Double conversation) {
+        AdActionFeature adActionFeature = new AdActionFeature();
+        adActionFeature.setOriginAdView(expose);
+        adActionFeature.setOriginAdClick(click);
+        adActionFeature.setOriginAdConversion(conversation);
+        model.put(key, adActionFeature);
+    }
+
+
+    @Override
+    public boolean loadFromStream(InputStreamReader in) throws IOException {
+        Map<Long, AdActionFeature> initModel = new HashMap<>();
+        BufferedReader input = new BufferedReader(in);
+        String line = null;
+        int cnt = 0;
+        while ((line = input.readLine()) != null) {
+            String[] items = line.split("\t");
+            if (items.length < 3) {
+                continue;
+            }
+            putFeature(initModel, new BigInteger(items[0].trim()).longValue(), Double.valueOf(items[1].trim()).doubleValue(), Double.valueOf(items[2].trim()).doubleValue(), Double.valueOf(items[3].trim()).doubleValue());
+        }
+
+        this.thompsonSamplingModel = initModel;
+        LOGGER.info("[MODELLOAD] model load over and size " + cnt);
+        input.close();
+        in.close();
+        return true;
+    }
+
+
+    @Override
+    public int getModelSize() {
+        if (this.thompsonSamplingModel == null)
+            return 0;
+        int sum = this.thompsonSamplingModel.size();
+        return sum;
+    }
+
+    public double score(AdRankItem adRankItem, String ctrOrCVR) {
+        double score = 0.0f;
+        AdActionFeature adActionFeature = this.thompsonSamplingModel.get(adRankItem.getAdId());
+        if(ctrOrCVR.equals("ctr"))
+            score = this.betaSampler(adActionFeature.getAdClick(), adActionFeature.getAdView());
+        if(ctrOrCVR.equals("cvr"))
+            score = this.betaSampler(adActionFeature.getAdConversion(), adActionFeature.getAdView());
+        return score;
+    }
+
+    public double betaSampler(double alpha, double beta) {
+        BetaDistribution betaSample = new BetaDistribution(alpha, beta);
+        return betaSample.sample();
+    }
+
+
+}

+ 1 - 1
ad-engine-server/pom.xml

@@ -15,7 +15,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>ad-engine-service</artifactId>
-            <version>1.0.0</version>
+            <version>1.0.3</version>
         </dependency>
 
     </dependencies>

+ 22 - 1
ad-engine-server/src/main/resources/feeds_score_config_baseline.conf

@@ -1,7 +1,28 @@
 scorer-config = {
-  related-score-config = {
+  lr-ctr-score-config = {
     scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogAdCtrLRScorer"
     scorer-priority = 99
     model-path = "ad_ctr_model/model_ad_ctr.txt"
   }
+  lr-cvr-score-config = {
+      scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogAdCvrLRScorer"
+      scorer-priority = 99
+      model-path = "ad_cvr_model/model_ad_cvr.txt"
+  }
+  lr-ctr-calibretion-config = {
+      scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogCtrCalibretionScorer"
+      scorer-priority = 99
+      model-path = "ad_ctr_calibretion/model_ctr_calibretion.txt"
+  }
+
+  lr-cvr-calibretion-config = {
+      scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogCvrCalibretionScorer"
+      scorer-priority = 99
+      model-path = "ad_cvr_calibretion/model_cvr_calibretion.txt"
+  }
+  lr-ecpm-merge-config = {
+      scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogMergeEcpmScorer"
+      scorer-priority = 99
+  }
+
 }

+ 14 - 0
ad-engine-server/src/main/resources/feeds_score_config_thompson.conf

@@ -0,0 +1,14 @@
+scorer-config = {
+  lr-ctr-score-config = {
+    scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogAdThompsonScorer"
+    scorer-priority = 99
+    model-path = "ad_thompson_model/model_ad_thompson.txt"
+  }
+
+  lr-ecpm-merge-config = {
+        scorer-name = "com.tzld.piaoquan.ad.engine.service.score.VlogMergeEcpmScorer"
+        scorer-priority = 99
+  }
+
+
+}

+ 3 - 1
ad-engine-service/pom.xml

@@ -10,6 +10,8 @@
     </parent>
 
     <artifactId>ad-engine-service</artifactId>
+    <version>1.0.3</version>
+
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -21,7 +23,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>ad-engine-commons</artifactId>
-            <version>1.0.2</version>
+            <version>1.0.3</version>
         </dependency>
 
         <dependency>

+ 1 - 10
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdCtrLRScorer.java

@@ -82,9 +82,6 @@ public class VlogAdCtrLRScorer extends BaseLRModelScorer {
             }
         }
 
-        Collections.sort(items);
-
-
         LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
         LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
                 System.currentTimeMillis() - startTime);
@@ -122,14 +119,8 @@ public class VlogAdCtrLRScorer extends BaseLRModelScorer {
                 LOGGER.error("score error for doc={} exception={}", new Object[]{
                         item.getAdId(), ExceptionUtils.getFullStackTrace(e)});
             }
-            // 增加实时特征后打开在线存储日志逻辑
-            //
-            // CtrSamples.Builder samples =  com.tzld.piaoquan.recommend.server.gen.recommend.CtrSamples.newBuilder();
-            // samples.setLr_samples(lrSamples);
-            // item.setSamples(samples);
-            //
         }
-        item.setScore(pro);
+        item.setCtr(pro);
         return pro;
     }
 

+ 181 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdCvrLRScorer.java

@@ -0,0 +1,181 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseLRModelScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+
+//@Service
+public class VlogAdCvrLRScorer extends BaseLRModelScorer {
+
+    private final static int CORE_POOL_SIZE = 64;
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogAdCvrLRScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+    private static final double defaultUserCtrGroupNumber = 10.0;
+    private static final int enterFeedsScoreRatio = 10;
+    private static final int enterFeedsScoreNum = 20;
+
+
+    public VlogAdCvrLRScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userFeature,
+                                    final List<AdRankItem> rankItems) {
+
+        if (userFeature == null || CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<AdRankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(), userFeature);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<AdRankItem> rankByJava(final List<AdRankItem> items,
+                                        final AdRequestContext requestContext,
+                                        final UserAdFeature user) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // userAdBytes
+        UserAdBytesFeature userInfoBytes = null;
+        userInfoBytes = new UserAdBytesFeature(user);
+
+        // 所有都参与打分,按照cvr排序
+        multipleScore(items, userInfoBytes, requestContext, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("after enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i).getScore());
+            }
+        }
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict cvr
+     */
+    public double calcScore(final LRModel lrModel,
+                            final AdRankItem item,
+                            final UserAdBytesFeature userInfoBytes,
+                            final AdRequestContext requestContext) {
+
+        LRSamples lrSamples = null;
+        VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
+
+        try {
+            AdItemBytesFeature newsInfoBytes = new AdItemBytesFeature(item.getItemFeature());
+            lrSamples = bytesFeatureExtractor.single(userInfoBytes, newsInfoBytes,
+                    new AdRequestContextBytesFeature(requestContext));
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{new String(userInfoBytes.getMid()), item.getAdId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getAdId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setCvr(pro);
+        return pro;
+    }
+
+
+    /**
+     * 并行打分
+     *
+     * @param items
+     * @param userInfoBytes
+     * @param requestContext
+     * @param model
+     */
+    private void multipleScore(final List<AdRankItem> items,
+                                  final UserAdBytesFeature userInfoBytes,
+                                  final AdRequestContext requestContext,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userInfoBytes, requestContext);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).adId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", requestContext.getApptype(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getApptype(), items.size(), cancel});
+    }
+}

+ 157 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogAdThompsonScorer.java

@@ -0,0 +1,157 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseThompsonSamplingScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import com.tzld.piaoquan.ad.engine.commons.score.model.ThompsonSamplingModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+
+//@Service
+public class VlogAdThompsonScorer extends BaseThompsonSamplingScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogAdThompsonScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+    public VlogAdThompsonScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userFeature,
+                                    final List<AdRankItem> rankItems) {
+
+        if (userFeature == null || CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<AdRankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(), userFeature);
+
+        LOGGER.debug("thompson sampling ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<AdRankItem> rankByJava(final List<AdRankItem> items,
+                                        final AdRequestContext requestContext,
+                                        final UserAdFeature user) {
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("after enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i).getScore());
+            }
+        }
+
+        LOGGER.debug("thompson ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[thompson ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ecpm
+     */
+    public double calcScore(final ThompsonSamplingModel model,
+                            final AdRankItem item) {
+
+
+        double pctr = 0.0;
+        double pcvr = 0.0;
+        double score = 0.0;
+        double ecpm = 0.0;
+
+        try {
+            pctr = model.score(item, "ctr");
+            pcvr = model.score(item, "cvr");
+            ecpm = item.getCpa() * item.getBid1() * item.getBid2() * pctr * pcvr;
+        } catch (Exception e) {
+            LOGGER.error("score error for doc={} exception={}", new Object[]{
+                    item.getAdId(), ExceptionUtils.getFullStackTrace(e)});
+        }
+        item.setCtr(pctr);
+        item.setCvr(pcvr);
+        item.setEcpm1(ecpm);
+        item.setScore(ecpm);
+        return score;
+    }
+
+
+    /**
+     * 并行打分 ecpm
+     * @param items
+     * @param model
+     */
+    private void multipleCtrScore(final List<AdRankItem> items,
+                                  final ThompsonSamplingModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //设置为原始值为0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex));
+                    } catch (Exception e) {
+                        LOGGER.error("thompson exception: [{}] [{}]", items.get(fIndex).adId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("ecpm Score {}, Total: {}, Cancel: {}", new Object[]{ items.size(), cancel});
+    }
+}

+ 80 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogCtrCalibretionScorer.java

@@ -0,0 +1,80 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseLRModelScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+
+//@Service
+public class VlogCtrCalibretionScorer extends BaseLRModelScorer {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogCtrCalibretionScorer.class);
+
+
+    public VlogCtrCalibretionScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userFeature,
+                                    final List<AdRankItem> rankItems) {
+
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        List<AdRankItem> result = ctrCalibretion(rankItems);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+
+    public List<AdRankItem> ctrCalibretion(List<AdRankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        if (model == null) {
+            LOGGER.error("not found model for ctr calibration [{}]");
+            return items;
+        }
+        int model_size = model.getModelSize();
+        for (AdRankItem item : items) {
+            double oldScore = item.getCtr();
+            double newScore = 0.0;
+            if (oldScore > 1.0 || oldScore < 0) {
+                item.setCtr(0.0);
+                continue;
+            }
+
+            try {
+                long key = (long) Math.floor(oldScore * model_size);
+                newScore = model.getWeight(model.getLrModel(), key);
+                if (newScore == 0) {
+                    LOGGER.error("ctr ctrCalibretion ctr Score: {} error", oldScore);
+                } else {
+                    item.setCtr(newScore);
+                }
+            } catch (Exception e) {
+                LOGGER.error("ctr ctrCalibretion ctr Score: {} couldn`t get key", oldScore);
+                item.setCtr(0.0);
+            }
+            LOGGER.debug("ctr ctrCalibretion ranker , score: {}->{}", new Object[]{oldScore, newScore});
+        }
+        Collections.sort(items);
+        return items;
+    }
+
+
+}

+ 81 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogCvrCalibretionScorer.java

@@ -0,0 +1,81 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseLRModelScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+
+//@Service
+public class VlogCvrCalibretionScorer extends BaseLRModelScorer {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogCvrCalibretionScorer.class);
+
+
+    public VlogCvrCalibretionScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userFeature,
+                                    final List<AdRankItem> rankItems) {
+
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        List<AdRankItem> result = cvrCalibretion(rankItems);
+
+        LOGGER.debug("cvr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+
+    public List<AdRankItem> cvrCalibretion(List<AdRankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        if (model == null) {
+            LOGGER.error("not found model for cvr calibration [{}]");
+            return items;
+        }
+        int model_size = model.getModelSize();
+        for (AdRankItem item : items) {
+            double oldScore = item.getCvr();
+            double newScore = 0.0;
+            if (oldScore > 1.0 || oldScore < 0) {
+                item.setCvr(0.0);
+                continue;
+            }
+
+            try {
+                long key = (long) Math.floor(oldScore * model_size);
+                newScore = model.getWeight(model.getLrModel(), key);
+                if (newScore == 0) {
+                    LOGGER.error("cvr cvrCalibretion cvr Score: {} error", oldScore);
+                } else {
+                    item.setCvr(newScore);
+                }
+            } catch (Exception e) {
+                LOGGER.error("cvr cvrCalibretion ctr Score: {} couldn`t get key", oldScore);
+                item.setCvr(0.0);
+            }
+            LOGGER.debug("cvr cvrCalibretion ranker , score: {}->{}", new Object[]{oldScore, newScore});
+        }
+        Collections.sort(items);
+        return items;
+    }
+
+
+}

+ 66 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/VlogMergeEcpmScorer.java

@@ -0,0 +1,66 @@
+package com.tzld.piaoquan.ad.engine.service.score;
+
+
+import com.tzld.piaoquan.ad.engine.commons.score.BaseLRModelScorer;
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerConfigInfo;
+import com.tzld.piaoquan.ad.engine.commons.score.model.LRModel;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+
+//@Service
+public class VlogMergeEcpmScorer extends BaseLRModelScorer {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogMergeEcpmScorer.class);
+
+
+    public VlogMergeEcpmScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<AdRankItem> scoring(final ScoreParam param,
+                                    final UserAdFeature userFeature,
+                                    final List<AdRankItem> rankItems) {
+
+
+        long startTime = System.currentTimeMillis();
+        List<AdRankItem> result = mergetEcpm(rankItems);
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+
+    public List<AdRankItem> mergetEcpm(List<AdRankItem> items) {
+        long startTime = System.currentTimeMillis();
+
+        for (AdRankItem item : items) {
+            try {
+                double cpa = item.getCpa();
+                double bid1 = item.getBid1();
+                double bid2 = item.getBid2();
+                double pctr = item.getCtr();
+                double pcvr = item.getCvr();
+                double ecpm = cpa * bid1 * bid2 * pcvr * pctr * 1000;
+                item.setEcpm1(ecpm);
+                item.setScore(ecpm);
+            } catch (Exception e) {
+                LOGGER.error("merge Ecpm Score: {} error", "");
+                item.setCtr(0.0);
+            }
+            LOGGER.debug("merge Ecpm ranker , score: {}->{}", "");
+        }
+        Collections.sort(items);
+        return items;
+    }
+
+}

+ 58 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/impl/RankServiceThompsonImpl.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.ad.engine.service.score.impl;
+
+import com.tzld.piaoquan.ad.engine.commons.score.ScoreParam;
+import com.tzld.piaoquan.ad.engine.commons.score.ScorerUtils;
+import com.tzld.piaoquan.ad.engine.commons.util.CommonCollectionUtils;
+import com.tzld.piaoquan.ad.engine.service.score.FeatureRemoteService;
+import com.tzld.piaoquan.ad.engine.service.score.RankService;
+import com.tzld.piaoquan.ad.engine.service.score.convert.RequestConvert;
+import com.tzld.piaoquan.ad.engine.service.score.param.RankRecommendRequestParam;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdItemFeature;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdRankItem;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Service
+public class RankServiceThompsonImpl implements RankService {
+
+    private final static Logger log = LoggerFactory.getLogger(RankServiceThompsonImpl.class);
+
+    @Autowired
+    FeatureRemoteService featureRemoteService;
+
+    public AdRankItem adItemRank(RankRecommendRequestParam request) {
+
+        ScoreParam param = RequestConvert.requestConvert(request);
+        UserAdFeature userAdFeature = new UserAdFeature();
+        List<AdRankItem> rankItems = featureRemoteService.getAllAdFeatureList(
+                CommonCollectionUtils.toList(request.getAdIdList(), id -> id.toString())
+        );
+
+        //兜底方案
+        if (rankItems == null || rankItems.size() == 0) {
+            AdItemFeature feature = new AdItemFeature();
+            feature.setAdId(request.getAdIdList().get(0).toString());
+            AdRankItem adRankItem = new AdRankItem();
+            adRankItem.setAdId(request.getAdIdList().get(0));
+            adRankItem.setItemFeature(feature);
+            rankItems = new ArrayList<>();
+            rankItems.add(adRankItem);
+        }
+        List<AdRankItem> rankResult = ScorerUtils
+                .getScorerPipeline(ScorerUtils.THOMPSON_CONF)
+                .scoring(param, userAdFeature, rankItems);
+
+        if (!CollectionUtils.isEmpty(rankResult)) {
+            return rankResult.get(0);
+        } else {
+            return null;
+        }
+    }
+}