sunmingze 1 ano atrás
pai
commit
d4f1fc5386

+ 81 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/FlowPoolRecommendPipeline.java

@@ -0,0 +1,81 @@
+package com.tzld.piaoquan.recommend.server.implement;
+
+
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.MergeUtils;
+import com.tzld.piaoquan.recommend.server.framework.merger.SimilarityUtils;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
+import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
+import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueue;
+import com.tzld.piaoquan.recommend.server.framework.score.ScorerPipeline;
+import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorPipeline;
+import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
+import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class FlowPoolRecommendPipeline {
+
+    public static final String MERGE_CONF = "flow_merge_config.conf";
+
+    public static final String PREFIX = "";
+
+    private List<RankItem> feedByRec(final RecommendRequest requestData,
+                                       final int requestIndex,
+                                       final User userInfo) {
+        int recallNum =  200;
+
+        // Step 1: Attention extraction
+        long timestamp = System.currentTimeMillis();
+        UserAttentionExtractorPipeline attentionExtractorPipeline = UserAttentionExtractorUtils.getAtttentionPipeline(UserAttentionExtractorUtils.BASE_CONF);
+        attentionExtractorPipeline.extractAttention(requestData, userInfo);
+
+        // Step 2: create top queue
+        StrategyQueue topQueue = MergeUtils.createTopQueue(MERGE_CONF, "top-queue");
+
+
+        // Step 3: Candidate
+        Map<String, Candidate> candidates = new HashMap<String, Candidate>();
+        topQueue.candidate(candidates, recallNum, userInfo, requestData, 0, 0);
+
+
+        // Step 4: Recalling & Basic Scoring
+        RedisSmartClient client = new RedisSmartClient();
+        RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
+
+
+        BaseRecaller recaller = new BaseRecaller(queueProvider);
+        List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<Candidate>(candidates.values()));
+
+
+        // Step 4: Advance Scoring
+        timestamp = System.currentTimeMillis();
+        ScorerPipeline scorerPipeline = getScorerPipeline(requestData);
+        items = scorerPipeline.scoring(requestData, userInfo, requestIndex, items);
+
+
+        // Step 5: Merger
+        MergeUtils.distributeItemsToMultiQueues(topQueue, items);
+        topQueue.merge(recallNum * 3, userInfo, requestData, requestIndex, 0);
+
+        // step 6:多样性融合
+        List<RankItem> mergeItems = topQueue.getItems();
+        MergeUtils.diversityRerank(mergeItems, SimilarityUtils.getIsSameUserTagOrCategoryFunc(), recallNum, 6, 2);
+
+
+        // Step 6: Global Rank & subList
+        // TODO: Global Rank
+
+
+        return items;
+    }
+
+
+}

+ 58 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Global24hHotCandidate.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.recommend.server.implement.candidate;
+
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.IndexCandidateQueue;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueConfig;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueInfo;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Global24hHotCandidate extends IndexCandidateQueue {
+
+    public static final String ItemType = "video";
+
+    public Global24hHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+        super(strategyQueueInfo, strategyQueueConfig);
+    }
+
+    @Override
+    public int addCandidateKey(Map<String, Candidate> candidates, int recallNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
+        int currRecall = 0;
+        String region = user.getRegion();
+        Map<String, Candidate> simplifiedCandidates = new ConcurrentHashMap<String, Candidate>();
+
+
+        // index key  video:queue:type=global24h:region=北京:ordering=rov
+        QueueName queueName24HGlobal = new QueueName(ItemType, "rov")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=realplay
+        queueName24HGlobal = new QueueName(ItemType, "realplay")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=str
+        queueName24HGlobal = new QueueName(ItemType, "str")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=realplay_rate
+        queueName24HGlobal = new QueueName(ItemType, "realplay_ratio")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+
+        candidates.putAll(simplifiedCandidates);
+        return currRecall;
+
+    }
+}

+ 66 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Region1hHotCandidate.java

@@ -0,0 +1,66 @@
+package com.tzld.piaoquan.recommend.server.implement.candidate;
+
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.IndexCandidateQueue;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueConfig;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueInfo;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Region1hHotCandidate extends IndexCandidateQueue {
+
+    public static final String ItemType = "video";
+
+    public Region1hHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+        super(strategyQueueInfo, strategyQueueConfig);
+    }
+
+    @Override
+    public int addCandidateKey(Map<String, Candidate> candidates, int recallNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
+        int currRecall = 0;
+        String region = user.getRegion();
+        Map<String, Candidate> simplifiedCandidates = new ConcurrentHashMap<String, Candidate>();
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=rov
+        QueueName queueName1HRegion = new QueueName(ItemType, "short-rov")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=rov
+        queueName1HRegion = new QueueName(ItemType, "rov")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=realplay
+        queueName1HRegion = new QueueName(ItemType, "realplay")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=str
+        queueName1HRegion = new QueueName(ItemType, "str")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=realplay_rate
+        queueName1HRegion = new QueueName(ItemType, "realplay_ratio")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+        candidates.putAll(simplifiedCandidates);
+        return currRecall;
+    }
+}

+ 331 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer.java

@@ -0,0 +1,331 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.rank.strategy.OfflineVlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.service.score.BaseLRModelScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.LRModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class VlogShareLRScorer extends BaseLRModelScorer {
+
+    private final static int CORE_POOL_SIZE = 64;
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogShareLRScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+    private static final double defaultUserCtrGroupNumber = 10.0;
+    private static final int enterFeedsScoreRatio = 10;
+    private static final int enterFeedsScoreNum = 20;
+
+
+    public VlogShareLRScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(),
+                userFeature == null ? UserFeature.defaultInstance(param.getMid()) : userFeature);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // userBytes
+        UserBytesFeature userInfoBytes = null;
+        userInfoBytes = new UserBytesFeature(user);
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userInfoBytes, requestContext, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ctr
+     */
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final UserBytesFeature userInfoBytes,
+                            final RequestContext requestContext) {
+
+        LRSamples lrSamples = null;
+        VlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
+
+        try {
+            VideoBytesFeature newsInfoBytes = new VideoBytesFeature(item.getItemFeature() == null
+                    ? ItemFeature.defaultInstance(item.getVideoId() + "")
+                    : item.getItemFeature());
+            lrSamples = bytesFeatureExtractor.single(userInfoBytes, newsInfoBytes,
+                    new RequestContextBytesFeature(requestContext));
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{new String(userInfoBytes.getUid()), item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+            // 增加实时特征后打开在线存储日志逻辑
+            //
+            // CtrSamples.Builder samples =  com.tzld.piaoquan.recommend.server.gen.recommend.CtrSamples.newBuilder();
+            // samples.setLr_samples(lrSamples);
+            // item.setSamples(samples);
+            //
+        }
+        item.setScore(pro);
+        return pro;
+    }
+
+
+    /**
+     * 并行打分
+     *
+     * @param items
+     * @param userInfoBytes
+     * @param requestContext
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final UserBytesFeature userInfoBytes,
+                                  final RequestContext requestContext,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userInfoBytes, requestContext);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", requestContext.getRequest_id(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getRequest_id(), items.size(), cancel});
+    }
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                           final Map<String, String> userFeatureMap,
+                                           final List<RankItem> rankItems){
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        // userBytes
+        Map<String, byte[]> userFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: userFeatureMap.entrySet()){
+            userFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+        //sceneBytes
+        Map<String, byte[]> sceneFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: sceneFeatureMap.entrySet()){
+            sceneFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMapByte, sceneFeatureMapByte, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, byte[]> userFeatureMapByte,
+                                  final Map<String, byte[]> sceneFeatureMapByte,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            // items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMapByte, sceneFeatureMapByte);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMapByte.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{sceneFeatureMapByte.size(), items.size(), cancel});
+    }
+
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final Map<String, byte[]> userFeatureMapByte,
+                            final Map<String, byte[]> sceneFeatureMapByte) {
+
+        LRSamples lrSamples = null;
+        OfflineVlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor();
+
+        try {
+
+            Map<String, byte[]> itemFeatureByte = new HashMap<>();
+            for (Map.Entry<String, String> entry: item.getFeatureMap().entrySet()){
+                itemFeatureByte.put(entry.getKey(), entry.getValue().getBytes());
+            }
+            lrSamples = bytesFeatureExtractor.single(userFeatureMapByte, itemFeatureByte, sceneFeatureMapByte);
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{"", item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setScoreStr(pro);
+        return pro;
+    }
+}

+ 331 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer4Ros.java

@@ -0,0 +1,331 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.rank.strategy.OfflineVlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.service.score.BaseLRModelScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.LRModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class VlogShareLRScorer4Ros extends BaseLRModelScorer {
+
+    private final static int CORE_POOL_SIZE = 64;
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogShareLRScorer4Ros.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+    private static final double defaultUserCtrGroupNumber = 10.0;
+    private static final int enterFeedsScoreRatio = 10;
+    private static final int enterFeedsScoreNum = 20;
+
+
+    public VlogShareLRScorer4Ros(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(),
+                userFeature == null ? UserFeature.defaultInstance(param.getMid()) : userFeature);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // userBytes
+        UserBytesFeature userInfoBytes = null;
+        userInfoBytes = new UserBytesFeature(user);
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userInfoBytes, requestContext, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ctr
+     */
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final UserBytesFeature userInfoBytes,
+                            final RequestContext requestContext) {
+
+        LRSamples lrSamples = null;
+        VlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
+
+        try {
+            VideoBytesFeature newsInfoBytes = new VideoBytesFeature(item.getItemFeature() == null
+                    ? ItemFeature.defaultInstance(item.getVideoId() + "")
+                    : item.getItemFeature());
+            lrSamples = bytesFeatureExtractor.single(userInfoBytes, newsInfoBytes,
+                    new RequestContextBytesFeature(requestContext));
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{new String(userInfoBytes.getUid()), item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+            // 增加实时特征后打开在线存储日志逻辑
+            //
+            // CtrSamples.Builder samples =  com.tzld.piaoquan.recommend.server.gen.recommend.CtrSamples.newBuilder();
+            // samples.setLr_samples(lrSamples);
+            // item.setSamples(samples);
+            //
+        }
+        item.setScore(pro);
+        return pro;
+    }
+
+
+    /**
+     * 并行打分
+     *
+     * @param items
+     * @param userInfoBytes
+     * @param requestContext
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final UserBytesFeature userInfoBytes,
+                                  final RequestContext requestContext,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userInfoBytes, requestContext);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", requestContext.getRequest_id(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getRequest_id(), items.size(), cancel});
+    }
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                  final Map<String, String> userFeatureMap,
+                                  final List<RankItem> rankItems){
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        // userBytes
+        Map<String, byte[]> userFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: userFeatureMap.entrySet()){
+            userFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+        //sceneBytes
+        Map<String, byte[]> sceneFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: sceneFeatureMap.entrySet()){
+            sceneFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMapByte, sceneFeatureMapByte, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, byte[]> userFeatureMapByte,
+                                  final Map<String, byte[]> sceneFeatureMapByte,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+//            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMapByte, sceneFeatureMapByte);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMapByte.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{sceneFeatureMapByte.size(), items.size(), cancel});
+    }
+
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final Map<String, byte[]> userFeatureMapByte,
+                            final Map<String, byte[]> sceneFeatureMapByte) {
+
+        LRSamples lrSamples = null;
+        OfflineVlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor();
+
+        try {
+
+            Map<String, byte[]> itemFeatureByte = new HashMap<>();
+            for (Map.Entry<String, String> entry: item.getFeatureMap().entrySet()){
+                itemFeatureByte.put(entry.getKey(), entry.getValue().getBytes());
+            }
+            lrSamples = bytesFeatureExtractor.single(userFeatureMapByte, itemFeatureByte, sceneFeatureMapByte);
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{"", item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setScoreRos(pro);
+        return pro;
+    }
+}

+ 144 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogThompsonScorer.java

@@ -0,0 +1,144 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext;
+import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.score.BaseThompsonSamplingScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.ThompsonSamplingModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+
+//@Service
+public class VlogThompsonScorer extends BaseThompsonSamplingScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogThompsonScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+    public VlogThompsonScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (userFeature == null || CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(), userFeature);
+
+        LOGGER.debug("thompson sampling ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // 所有都参与打分,按照ROV Thompson排序
+        multipleCtrScore(items, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("after enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i).getScore());
+            }
+        }
+
+        LOGGER.debug("thompson ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[thompson ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ROV
+     */
+    public double calcScore(final ThompsonSamplingModel model,
+                            final RankItem item) {
+        double score = 0d;
+        try {
+            score = model.score(item);
+        } catch (Exception e) {
+            LOGGER.error("score error for doc={} exception={}", new Object[]{
+                    item.getVideo(), ExceptionUtils.getFullStackTrace(e)});
+        }
+        item.setScore(score);
+        return score;
+    }
+
+
+    /**
+     * 并行打分 Thompson ROV
+     *
+     * @param items
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final ThompsonSamplingModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //设置为原始值为0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex));
+                    } catch (Exception e) {
+                        LOGGER.error("thompson exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("ROV-Thompson Score {}, Total: {}, Cancel: {}", new Object[]{items.size(), cancel});
+    }
+}