Переглянути джерело

Merge remote-tracking branch 'origin/vlog_merge_refactor_smz' into vlog_merge_refactor_smz

sunxy 1 рік тому
батько
коміт
4c1c2a5219
25 змінених файлів з 1232 додано та 317 видалено
  1. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/RankItem.java
  2. 3 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/Entry.java
  3. 0 13
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/common/ArticleInfo.java
  4. 0 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/common/User.java
  5. 43 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/MergeUtils.java
  6. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/SimpleMergeQueue.java
  7. 4 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/StrategyQueue.java
  8. 28 32
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java
  9. 0 32
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/InMemoryItem.java
  10. 23 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/Index.java
  11. 0 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/QueueProvider.java
  12. 27 64
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueue.java
  13. 12 13
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueueWithoutMetaCacheLoader.java
  14. 0 37
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/utils/IndexUtils.java
  15. 17 49
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/utils/RedisSmartClient.java
  16. 19 17
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/FlowPoolRecommendPipeline.java
  17. 91 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java
  18. 58 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Global24hHotCandidate.java
  19. 66 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Region1hHotCandidate.java
  20. 21 6
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Region24hHotCandidate.java
  21. 0 33
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/recaller/HistoryLongPeriodFilter.java
  22. 331 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer.java
  23. 331 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer4Ros.java
  24. 144 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogThompsonScorer.java
  25. 12 6
      recommend-server-service/src/main/resources/merge_config.conf

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/RankItem.java

@@ -23,7 +23,7 @@ public class RankItem implements Comparable<RankItem> {
     public Map<String, Double> scoresMap = new HashMap<>();
     public Map<String, String> itemBasicFeature = new HashMap<>();
     public Map<String, Map<String, Double>> itemRealTimeFeature = new HashMap<>();
-gi    public long videoId;
+    public long videoId;
     private double score; // 记录最终的score
     private Video video;
     private double scoreRos; // 记录ros的score

+ 3 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/Entry.java

@@ -9,15 +9,16 @@ import java.util.Map;
  * @param <T>
  */
 public class Entry<T> {
+    public final String id;
     public final T item;
     public final Map<String, Double> scores;
     public final Map<String, String> explanations;
-    public final String id;
+
 
     public Entry(T item, String id) {
         this.item = item;
-        this.scores = new HashMap<String, Double>();
         this.id = id;
+        this.scores = new HashMap<String, Double>();
         this.explanations = new HashMap<String, String>();
     }
 

+ 0 - 13
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/common/ArticleInfo.java

@@ -1,13 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.common;
-
-
-import lombok.Data;
-
-@Data
-public class ArticleInfo {
-    private String id;
-    private String articleName;
-    private String title;
-    private String titleVector;
-
-}

+ 0 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/common/User.java

@@ -18,7 +18,6 @@ public class User {
     private String region;
     private String city;
 
-
     // user-content info 用户内容tag统计数据
     private UserAction last1monthVideoTags;
     private UserAction last7dayVideoTags;
@@ -30,7 +29,6 @@ public class User {
     private UserAction last1dayTitleTags;
     private UserAction lastSessionTitleTags;
 
-
     // 用户关注up主信息
     private UserAction last1monthPublishers;
 
@@ -41,7 +39,6 @@ public class User {
     private UserAction last1hourUserAction;
     private UserAction lastSessionUserAction;
 
-
     // user-group info
     private String userGroup;
 

+ 43 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/MergeUtils.java

@@ -32,8 +32,6 @@ public class MergeUtils {
 
     public static StrategyQueue createTopQueue(String mergeConfFile, String topQueueName) {
         StrategyQueueConfig strategyQueueConfig = new StrategyQueueConfig();
-        // Config mergeQueueConf = Configuration.getConfigByFileName(mergeConfFile, configDefault);
-
         Config mergeQueueConf = ConfigFactory.parseResources(mergeConfFile);
         if (strategyQueueConfig.load(mergeQueueConf)) {
             LOGGER.debug("Merger config init succ");
@@ -72,6 +70,49 @@ public class MergeUtils {
     }
 
 
+    /**
+     * 分发items到策略树中
+     * 分发的过程中不要破坏相对顺序
+     *
+     * @param strategyQueue
+     * @param items
+     */
+    public static void distributeItemsToMultiQueues(StrategyQueue strategyQueue, List<RankItem> items) {
+        List<StrategyQueue> strategyQueueList = strategyQueue.getAllQueues();
+
+        Multimap<String, RankItem> mergeQueuesItems = ArrayListMultimap.create();
+        for (RankItem item : items) {
+            for (CandidateInfo candidateInfo : item.getCandidateInfoList()) {
+                String mergeQueue = candidateInfo.getCandidate().getMergeQueueName();
+                RankItem currentItem = item.deepcopy();
+
+                // set candidate info
+                currentItem.setQueue(mergeQueue);
+                currentItem.setCandidateInfo(candidateInfo);
+
+                mergeQueuesItems.put(mergeQueue, currentItem);
+            }
+        }
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (Map.Entry<String, RankItem> entry : mergeQueuesItems.entries()) {
+                LOGGER.debug("after distribute item queue info: [{}] [{}] [{}]",
+                        new Object[]{entry.getKey(), entry.getValue().getId(), entry.getValue().getQueue()});
+            }
+        }
+
+        for (StrategyQueue queue : strategyQueueList) {
+            String mergeQueueName = queue.getStrategyQueueInfo().getQueueName();
+            if (mergeQueuesItems.containsKey(mergeQueueName)) {
+                List<RankItem> currMergeQueueItems = new ArrayList<RankItem>(mergeQueuesItems.get(mergeQueueName));
+                queue.setItems(currMergeQueueItems);
+            }
+        }
+    }
+
+
+
+
     /**
      * 基于 score 字段融合: score 较大的 Item 优先
      *
@@ -130,7 +171,6 @@ public class MergeUtils {
      *
      * @param rankerItemsListMap
      * @param recNum
-     * @param requestIndex
      * @param expId
      * @return
      */

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/SimpleMergeQueue.java

@@ -17,7 +17,7 @@ public class SimpleMergeQueue extends StrategyQueue {
 
     @Override
     public int doMerge(final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap, final int recNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId) {
-        return MergeUtils.simpleMergeWithProtection(items, rankerItemsListMap, recNum, user, requestData, requestIndex, expId);
+        return MergeUtils.simpleMergeWithProtection(items, rankerItemsListMap, recNum, expId);
     }
 
     @Override

+ 4 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/StrategyQueue.java

@@ -46,7 +46,9 @@ public abstract class StrategyQueue {
             StrategyQueue childQueue = null;
             String childQueueClass = childQueueInfo.getQueueClass();
             try {
-                childQueue = (StrategyQueue) Class.forName(childQueueClass).getConstructor(childQueueInfo.getClass(), strategyQueueConfig.getClass()).newInstance(childQueueInfo, strategyQueueConfig);
+                childQueue = (StrategyQueue) Class.forName(childQueueClass)
+                        .getConstructor(childQueueInfo.getClass(), strategyQueueConfig.getClass())
+                        .newInstance(childQueueInfo, strategyQueueConfig);
             } catch (InstantiationException e) {
                 LOGGER.error("construct StrategyQueue {}: [{}]", childName + " [" + childQueueClass + "]", e);
             } catch (IllegalAccessException e) {
@@ -107,6 +109,7 @@ public abstract class StrategyQueue {
                     continue;
                 }
                 int myRecNum = Math.min(recNum, rule.maxMergeNum);
+                //递归的执行子节点的merge逻辑
                 int actualRecNum = children.get(rule.queueName).merge(myRecNum, user, requestData, requestIndex, expId);
                 queueCount += 1;
                 itemCount += actualRecNum;

+ 28 - 32
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -12,6 +12,7 @@ import com.tzld.piaoquan.recommend.server.framework.candidiate.*;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+import com.tzld.piaoquan.recommend.server.model.Video;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -29,32 +30,32 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 
-public class BaseRecaller<InMemoryItem> {
+public class BaseRecaller<Video> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BaseRecaller.class);
     private static final long DEFAULT_QUEUE_LOAD_TIMEOUT = 150; // ms
     private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
     private static final ExecutorService filterExecutorService = Executors.newFixedThreadPool(128);
     private static final ExecutorService fetchQueueExecutorService = Executors.newFixedThreadPool(128);
-    private final QueueProvider<InMemoryItem> queueProvider;
+    private final QueueProvider<Video> queueProvider;
     private final long QUEUE_LOAD_TIMEOUT;
 
-    public BaseRecaller(QueueProvider<InMemoryItem> queueProvider) {
+    public BaseRecaller(QueueProvider<Video> queueProvider) {
         this(queueProvider, DEFAULT_QUEUE_LOAD_TIMEOUT);
     }
 
-    public BaseRecaller(QueueProvider<InMemoryItem> queueProvider,
+    public BaseRecaller(QueueProvider<Video> queueProvider,
                         long queueLoadTimeout) {
         this.queueProvider = queueProvider;
         this.QUEUE_LOAD_TIMEOUT = queueLoadTimeout;
     }
 
-    public String extractItemId(Entry<InMemoryItem> entry) {
+    public String extractItemId(Entry<Video> entry) {
         return entry.id;
     }
 
 
-    public boolean isValidItem(InMemoryItem item) {
+    public boolean isValidItem(Video item) {
         return item != null;
     }
 
@@ -65,7 +66,7 @@ public class BaseRecaller<InMemoryItem> {
      * @param candidate
      * @return
      */
-    private List<RankItem> toHits(final Iterable<Entry<InMemoryItem>> entries, final Candidate candidate) {
+    private List<RankItem> toHits(final Iterable<Entry<Video>> entries, final Candidate candidate) {
 
         List<RankItem> result = new ArrayList<RankItem>();
         for (Entry entry : entries) {
@@ -85,7 +86,7 @@ public class BaseRecaller<InMemoryItem> {
 
 
     // 读取redis中的数据放入queue中
-    public Map<Candidate, Queue<InMemoryItem>> loadQueues(List<Candidate> candidates) {
+    public Map<Candidate, Queue<Video>> loadQueues(List<Candidate> candidates) {
         // update queueName
         Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(new Function<Candidate, Candidate>() {
             @Override
@@ -109,17 +110,17 @@ public class BaseRecaller<InMemoryItem> {
             }
         });
 
-        // parallel load queues
-        // redis 或者缓存获取index
-        Map<QueueName, Queue<InMemoryItem>> queues = Maps.newConcurrentMap();
+        // parallel load queues, redis 或者缓存获取index
+        Map<QueueName, Queue<Video>> queues = Maps.newConcurrentMap();
         try {
+            // 格式
             queues = queueProvider.loads(Lists.newArrayList(queueNames), QUEUE_LOAD_TIMEOUT, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             LOGGER.error("load queue occur error [{}]", ExceptionUtils.getFullStackTrace(e));
         }
 
         // parse candidate map
-        Map<Candidate, Queue<InMemoryItem>> candidateQueueMap = Maps.newConcurrentMap();
+        Map<Candidate, Queue<Video>> candidateQueueMap = Maps.newConcurrentMap();
         for (Candidate candidate : updateCandidates) {
             QueueName name = candidate.getCandidateQueueName();
             if (queues.containsKey(name) && queues.get(name) != null) {
@@ -141,23 +142,18 @@ public class BaseRecaller<InMemoryItem> {
      */
     public List<RankItem> recalling(final RecommendRequest requestData, final User user, int requestIndex, List<Candidate> recallCandidates) {
 
-        long startTime = System.currentTimeMillis();
-
-        // load queue
-        long queueLoadStartTime = System.currentTimeMillis();
-
         // load from redis
-        List<Callable<Map<Candidate, Queue<InMemoryItem>>>> fetchQueueCalls = Lists.newArrayList();
-        fetchQueueCalls.add(new Callable<Map<Candidate, Queue<InMemoryItem>>>() {
+        List<Callable<Map<Candidate, Queue<Video>>>> fetchQueueCalls = Lists.newArrayList();
+        fetchQueueCalls.add(new Callable<Map<Candidate, Queue<Video>>>() {
             @Override
-            public Map<Candidate, Queue<InMemoryItem>> call() throws Exception {
+            public Map<Candidate, Queue<Video>> call() throws Exception {
                 boolean isFromRedis = true;
                 return obtainQueue(recallCandidates, requestData, user, isFromRedis);
             }
         });
 
-
-        List<Future<Map<Candidate, Queue<InMemoryItem>>>> fetchQueueFutures = null;
+        // 多线程执行load
+        List<Future<Map<Candidate, Queue<Video>>>> fetchQueueFutures = null;
         try {
             fetchQueueFutures = fetchQueueExecutorService.invokeAll(fetchQueueCalls, DEFAULT_QUEUE_LOAD_TIMEOUT,
                     TimeUnit.MILLISECONDS);
@@ -167,11 +163,11 @@ public class BaseRecaller<InMemoryItem> {
             LOGGER.error("[fetch queue error] ex", ExceptionUtils.getStackTrace(e));
         }
 
-        Map<Candidate, Queue<InMemoryItem>> candidateQueueMap = Maps.newHashMap();
+        Map<Candidate, Queue<Video>> candidateQueueMap = Maps.newHashMap();
         if (CollectionUtils.isNotEmpty(fetchQueueFutures)) {
-            for (Future<Map<Candidate, Queue<InMemoryItem>>> future : fetchQueueFutures) {
+            for (Future<Map<Candidate, Queue<Video>>> future : fetchQueueFutures) {
                 if (future.isDone() && !future.isCancelled()) {
-                    Map<Candidate, Queue<InMemoryItem>> result = null;
+                    Map<Candidate, Queue<Video>> result = null;
                     try {
                         result = future.get();
                     } catch (InterruptedException e) {
@@ -186,26 +182,26 @@ public class BaseRecaller<InMemoryItem> {
             }
         }
 
-
         List<RankItem> result = convertToRankItem(candidateQueueMap);
-
         return result;
     }
 
 
-    private List<RankItem> convertToRankItem(Map<Candidate, Queue<InMemoryItem>> candidateQueueMap) {
+    // 转成RankItem
+    // 同时给Filter预留处理
+    private List<RankItem> convertToRankItem(Map<Candidate, Queue<Video>> candidateQueueMap) {
 
         final List<Callable<List<RankItem>>> callables = new ArrayList<Callable<List<RankItem>>>();
         int expectedRecallSum = 0;
-        for (final Map.Entry<Candidate, Queue<InMemoryItem>> entry : candidateQueueMap.entrySet()) {
+        for (final Map.Entry<Candidate, Queue<Video>> entry : candidateQueueMap.entrySet()) {
             callables.add(new Callable<List<RankItem>>() {
                 @Override
                 public List<RankItem> call() throws Exception {
                     List<RankItem> candidateHits = new ArrayList<RankItem>();
                     final Candidate candidate = entry.getKey();
                     try {
-                        // 1. filter
-                        Iterable<Entry<InMemoryItem>> entries = FluentIterable.from(entry.getValue()).limit(candidate.getCandidateNum());
+                        // 1. filter  TODO 待后续增加自定义filter
+                        Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).limit(candidate.getCandidateNum());
 
                         // 2. toHits
                         candidateHits.addAll(toHits(entries, candidate));
@@ -257,7 +253,7 @@ public class BaseRecaller<InMemoryItem> {
     }
 
 
-    private Map<Candidate, Queue<InMemoryItem>> obtainQueue(List<Candidate> refactorCandidates, RecommendRequest requestData, User user, boolean isFromRedis) {
+    private Map<Candidate, Queue<Video>> obtainQueue(List<Candidate> refactorCandidates, RecommendRequest requestData, User user, boolean isFromRedis) {
         return loadQueues(refactorCandidates);
     }
 

+ 0 - 32
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/InMemoryItem.java

@@ -1,32 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller.provider;
-
-
-import com.google.common.base.Joiner;
-import com.tzld.piaoquan.recommend.server.framework.common.ArticleInfo;
-import java.util.Arrays;
-
-
-public class InMemoryItem {
-
-    public String id;
-    public ArticleInfo articleInfo;
-    public Object itemBytes;
-
-    public InMemoryItem(String id) {
-        this.id = id;
-    }
-
-    public InMemoryItem(String id, ArticleInfo articleInfo, Object itemBytes) {
-        this.id = id;
-        this.articleInfo = articleInfo;
-        this.itemBytes = itemBytes;
-    }
-
-    @Override
-    public String toString() {
-        return Joiner.on(":").join(Arrays.asList(
-                "id=" + this.id,
-                "articleinfo=" + this.articleInfo
-        ));
-    }
-}

+ 23 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/Index.java

@@ -1,7 +1,12 @@
 package com.tzld.piaoquan.recommend.server.framework.recaller.provider;
 
+import com.google.common.reflect.TypeToken;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
 
+import java.util.Collections;
 import java.util.List;
 
 @Data
@@ -9,7 +14,24 @@ public class Index {
     private String indexName;
     private List<IndexEntry> indexEntryList;
 
-    public Index(byte[] index){
 
+    // TODO
+    // redis中一路召回队列key 的结果 转化到Index中
+    public Index(String index) {
+        if (StringUtils.isBlank(index)) {
+            return;
+        }
+        List<List<String>> videoScores = JSONUtils.fromJson(index, new TypeToken<List<List<String>>>() {
+        }, Collections.emptyList());
+
+        videoScores.stream().forEach(itemAndScore -> {
+                    IndexEntry indexEntry = new IndexEntry();
+                    if (itemAndScore.size() >= 2) {
+                        indexEntry.setItemId(itemAndScore.get(0));
+                        indexEntry.setScore(Double.valueOf(itemAndScore.get(1)));
+                    }
+                }
+        );
     }
+
 }

+ 0 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/QueueProvider.java

@@ -12,9 +12,6 @@ public interface QueueProvider<T> {
 
     Queue<T> load(QueueName name) throws Exception;
 
-    long dbSize();
-
-
     Index get(QueueName name) throws Exception;
 
     Map<QueueName, Queue<T>> loads(List<QueueName> names) throws Exception;

+ 27 - 64
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueueWithoutMeta.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueue.java

@@ -8,8 +8,8 @@ import com.google.common.collect.Iterables;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Queue;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
 import com.tzld.piaoquan.recommend.server.framework.utils.FixedThreadPoolHelper;
-import com.tzld.piaoquan.recommend.server.framework.utils.IndexUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
+import com.tzld.piaoquan.recommend.server.model.Video;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -17,16 +17,12 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 
-public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueueWithoutMeta.class);
-
+public class RedisBackedQueue implements QueueProvider<Video> {
+    private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueue.class);
     private static final int DEFAULT_TTL = 7 * 24 * 60 * 60;
     private static final long CACHE_MAXIMUMSIZE = 50 * 10000; // default 50w
     private static final long CACHE_TIMEOUT_MS = 5 * 60 * 1000L;
@@ -39,14 +35,15 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
     // 不能为static
     private final BlockingQueue<QueueName> asyncRefreshQueue = new LinkedBlockingQueue<QueueName>(50000);
     private final RedisSmartClient client;
-    private final LoadingCache<QueueName, Pair<Long, Queue<String>>> cache;
+    private final LoadingCache<QueueName, Pair<Long, Queue<Video>>> cache;
 
     /**
      * 实例化 RedisBackedQueue, 并提供基于Guava Cache的本地缓存管理工作
+     *
      * @param client         redis-cluster 连接池
      * @param cacheTimeOutMs 缓存有效期, 开启缓存的情况下,默认缓存5min,以写时间为基准
      */
-    public RedisBackedQueueWithoutMeta(RedisSmartClient client, final long cacheTimeOutMs) {
+    public RedisBackedQueue(RedisSmartClient client, final long cacheTimeOutMs) {
         this.client = client;
 
         cache = CacheBuilder.newBuilder()
@@ -60,7 +57,7 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
         }
     }
 
-    public RedisBackedQueueWithoutMeta(RedisSmartClient client) {
+    public RedisBackedQueue(RedisSmartClient client) {
         this(client, CACHE_TIMEOUT_MS);
     }
 
@@ -68,50 +65,30 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
         return client;
     }
 
+
+    // redis读取队列内内容
     public Index get(QueueName name) throws Exception {
-        byte[] bytes = client.get(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
-        Index index = new Index(bytes);
+        String valueJson = client.get(name.toString());
+        Index index = new Index(valueJson);
         return index;
     }
 
-    public Index getDirect(QueueName name) throws Exception {
-        return get(name);
-    }
-
-    public long dbSize() {
-        return this.cache.size();
-    }
-
-    public long updateIndex(QueueName name, Index index) throws Exception {
-        return updateIndex(name.toString(), index);
-    }
 
-    public long updateIndex(String name, Index index) throws Exception {
-        Preconditions.checkArgument(StringUtils.equals(name, index.getIndexName()), "queue name not equal indexName, invalid Param");
-        long start = System.nanoTime();
-
-        // TODO fix redis key value details
-        String result = client.setex(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1), this.DEFAULT_TTL, index.toString().getBytes());
-        logger.debug("updateIndexResult= {}, queue= {}, indexSize= {}", new Object[]{result, name, index.getIndexEntryList().size()});
-
-        return CollectionUtils.isNotEmpty(index.getIndexEntryList()) ? index.getIndexEntryList().size() : 0l;
-    }
 
     /**
      * 加载指定索引, 并指定索引本地缓存的TTL
-     *
      * @param name QueueName
      * @return
      * @throws ExecutionException
      */
-    public Queue<String> load(QueueName name) throws ExecutionException {
+    public Queue<Video> load(QueueName name) throws ExecutionException {
 
-        Pair<Long, Queue<String>> cachedQueue = cache.get(name);
+        Pair<Long, Queue<Video>> cachedQueue = cache.get(name);
         if (cachedQueue == null) {
             // 清理本地缓存中当前key的数据
             cache.invalidate(name);
             logger.debug("invalidate queue [{}]", name);
-            return new Queue<String>(name.toString());
+            return new Queue<Video>(name.toString());
         }
         // update key refresh time
         if (cachedQueue.getKey() == Long.MIN_VALUE) {
@@ -140,21 +117,20 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
         return cachedQueue.getValue();
     }
 
-    public Map<QueueName, Queue<String>> loads(List<QueueName> names) throws ExecutionException {
+    public Map<QueueName, Queue<Video>> loads(List<QueueName> names) throws ExecutionException {
         return this.loads(names, 200, TimeUnit.MILLISECONDS);
     }
 
-    public Map<QueueName, Queue<String>> loads(List<QueueName> names, long timeout, TimeUnit timeUnit) throws ExecutionException {
+    public Map<QueueName, Queue<Video>> loads(List<QueueName> names, long timeout, TimeUnit timeUnit) throws ExecutionException {
 
-        long startTime = System.currentTimeMillis();
         final Iterable<List<QueueName>> namesIterable = Iterables.partition(names, 1);
-        final List<Callable<Map<QueueName, Queue<String>>>> callables = new ArrayList<Callable<Map<QueueName, Queue<String>>>>();
+        final List<Callable<Map<QueueName, Queue<Video>>>> callables =
+                new ArrayList<Callable<Map<QueueName, Queue<Video>>>>();
         for (final List<QueueName> queueNames : namesIterable) {
-            callables.add(new Callable<Map<QueueName, Queue<String>>>() {
+            callables.add(new Callable<Map<QueueName, Queue<Video>>>() {
                 @Override
-                public Map<QueueName, Queue<String>> call() {
-
-                    Map<QueueName, Queue<String>> result = new HashMap<QueueName, Queue<String>>();
+                public Map<QueueName, Queue<Video>> call() {
+                    Map<QueueName, Queue<Video>> result = new HashMap<QueueName, Queue<Video>>();
                     for (final QueueName name : queueNames) {
                         try {
                             Queue queue = load(name);
@@ -168,13 +144,13 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
             });
         }
 
-        Map<QueueName, Queue<String>> loadQueue = new ConcurrentHashMap<QueueName, Queue<String>>();
+        Map<QueueName, Queue<Video>> loadQueue = new ConcurrentHashMap<QueueName, Queue<Video>>();
         try {
-            List<Future<Map<QueueName, Queue<String>>>> futures = executorService.invokeAll(callables);
-            for (Future<Map<QueueName, Queue<String>>> future : futures) {
+            List<Future<Map<QueueName, Queue<Video>>>> futures = executorService.invokeAll(callables);
+            for (Future<Map<QueueName, Queue<Video>>> future : futures) {
                 if (future.isDone() && !future.isCancelled()) {
                     try {
-                        Map<QueueName, Queue<String>> ret = future.get();
+                        Map<QueueName, Queue<Video>> ret = future.get();
                         loadQueue.putAll(ret);
                     } catch (ExecutionException ee) {
                         logger.error("Failed to execute load a queue partition for {}", ExceptionUtils.getFullStackTrace(ee));
@@ -184,22 +160,9 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
         } catch (InterruptedException ie) {
             logger.error("Interrupted when waiting for parallel queue load finish for {}", ExceptionUtils.getFullStackTrace(ie));
         }
-
-
         return loadQueue;
     }
 
-    public long len(QueueName name) throws Exception {
-        byte[] bytes = client.get(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
-        // 检索结果
-        Index index = new Index(bytes);
-
-        return CollectionUtils.isNotEmpty(index.getIndexEntryList()) ? index.getIndexEntryList().size() : 0L;
-    }
-
-    public void delete(QueueName name) throws Exception {
-        client.del(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
-    }
 
 
     // 使用Guava Cache 的异步刷新接口, 异步刷新到期的key
@@ -216,7 +179,7 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
                         logger.debug("async refresh thread get queuename [{}]", queueName); // get itemid
                     }
 
-                    Pair<Long, Queue<String>> entry = cache.get(queueName);
+                    Pair<Long, Queue<Video>> entry = cache.get(queueName);
                     // check refresh
                     if (startTime > entry.getKey()) {
                         cache.refresh(queueName);

+ 12 - 13
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueueWithoutMetaCacheLoader.java

@@ -7,8 +7,8 @@ import com.google.common.collect.Lists;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Entry;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Queue;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
-import com.tzld.piaoquan.recommend.server.framework.utils.IndexUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
+import com.tzld.piaoquan.recommend.server.model.Video;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 
-public class RedisBackedQueueWithoutMetaCacheLoader extends CacheLoader<QueueName, Pair<Long, Queue<String>>> {
+public class RedisBackedQueueWithoutMetaCacheLoader extends CacheLoader<QueueName, Pair<Long, Queue<Video>>> {
 
     private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueueWithoutMetaCacheLoader.class);
     private final RedisSmartClient client;
@@ -27,28 +27,27 @@ public class RedisBackedQueueWithoutMetaCacheLoader extends CacheLoader<QueueNam
     }
 
     @Override
-    public Pair<Long, Queue<String>> load(QueueName name) throws Exception {
-        long start = System.nanoTime();
-
-        byte[] bytes = client.get(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
-        Index index = new Index(bytes);
+    public Pair<Long, Queue<Video>> load(QueueName name) throws Exception {
+        String valueJson = client.get(name.toString());
+        Index index = new Index(valueJson);
 
         if (null == index || CollectionUtils.isEmpty(index.getIndexEntryList())) {
             logger.error("empty_index_fetch");
         }
 
-        Queue<String> queue = new Queue<String>(name.toString());
-
-        Iterable<Entry<String>> iterable = Iterables.transform(index.getIndexEntryList(), new Function<IndexEntry, Entry<String>>() {
+        //IndexEntry 转化到Entry中 video 格式
+        Queue<Video> queue = new Queue<Video>(name.toString());
+        Iterable<Entry<Video>> iterable = Iterables.transform(index.getIndexEntryList(), new Function<IndexEntry, Entry<Video>>() {
             @Nullable
             @Override
-            public Entry<String> apply(IndexEntry indexEntry) {
-                Entry<String> entry = new Entry<String>("", indexEntry.getItemId());
+            public Entry<Video> apply(IndexEntry indexEntry) {
+                Video recallData = new Video();
+                recallData.setVideoId(Long.valueOf(indexEntry.getItemId()));
+                Entry<Video> entry = new Entry<Video>(recallData, indexEntry.getItemId());
                 entry.addScore("ordering", indexEntry.getScore());
                 return entry;
             }
         });
-
         queue.addAll(Lists.newArrayList(iterable));
 
         long end = System.nanoTime();

+ 0 - 37
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/utils/IndexUtils.java

@@ -1,37 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.utils;
-
-
-import com.google.common.base.Charsets;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
-
-/**
- * 正排倒排的key相关的utils
- */
-public class IndexUtils {
-    /**
-     * 倒排索引专用的前缀, 用于区分同一个queue的多个存储版本, 读写要一致
-     **/
-    public static final String queuePrefixV1 = "a:";
-
-    public static byte[] convertKey(String itemId) {
-        return itemId.getBytes(Charsets.UTF_8);//BaseSafeEncoder.SAFE_CHARSET
-    }
-
-    /**
-     * 用于转换倒排索引queue, 写入和读取的前缀要一致
-     **/
-    public static byte[] convertKey(QueueName queueName, String queuePrefix) throws Exception {
-        return convertKey(queueName.toString(), queuePrefix);
-    }
-
-    /**
-     * 用于转换倒排索引queue, 写入和读取的前缀要一致
-     **/
-    public static byte[] convertKey(String queueName, String queuePrefix) throws Exception {
-        return convertKey(queuePrefix + queueName);
-    }
-
-
-
-
-}

+ 17 - 49
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/utils/RedisSmartClient.java

@@ -24,69 +24,37 @@ public class RedisSmartClient {
 
     public RedisSmartClient() {
     }
-
-
-
-
-    public String set(final byte[] key, final byte[] value) {
-        return "";
-
+    public void set(final String key, final String value) {
+        redisTemplate.opsForValue().set(key, value);
     }
 
-    public byte[] get(final byte[] key) {
-        return "".getBytes();
-    }
-
-
-    public String setex(final byte[] key, final int seconds, final byte[] value) {
-        return "";
-
-    }
-
-
-
-    public Long del(final byte[] key) {
-        return 1L;
-
-    }
-
-
-
-    public Long hset(final byte[] key, final byte[] field, final byte[] value) {
-        return 1L;
+    public String get(final String key) {
+        return redisTemplate.opsForValue().get(key);
     }
 
-    public Long hlen(final byte[] key) {
-        return 1L;
-
+    // hget
+    public String hget(final String key, final String field) {
+        return redisTemplate.opsForHash().get(key, field).toString();
     }
 
 
-
-
-
-    public String set(final String key, final String value) {
-        return "";
-
-
+    // getRandomValueFromSet
+    public List<String> getRandomList(final String key, long num) {
+        return redisTemplate.opsForSet().randomMembers(key, num);
     }
 
-    public String get(final String key) {
-        return "";
-
 
+    // hgetall
+    public Map<Object, Object> hgetall(String key) {
+        return redisTemplate.opsForHash().entries(key);
     }
 
-
-    public String hget(final String key, final String field) {
-        return "";
-
+    // TODO 异步写入redis 作为用户的分发历史
+    public <T> T write(String key, String value) {
+        T t = null;
+        return t;
     }
 
-    public String hmset(final String key, final Map<String, String> hash) {
-        return "";
-
-    }
 
 
 }

+ 19 - 17
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/VlogRecommendPipeline.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/FlowPoolRecommendPipeline.java

@@ -1,13 +1,15 @@
-package com.tzld.piaoquan.recommend.server.framework;
+package com.tzld.piaoquan.recommend.server.implement;
 
 
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.framework.merger.MergeUtils;
+import com.tzld.piaoquan.recommend.server.framework.merger.SimilarityUtils;
 import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
 import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueueWithoutMeta;
-import com.tzld.piaoquan.recommend.server.common.base.RankItem;
-import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueue;
+import com.tzld.piaoquan.recommend.server.framework.score.ScorerPipeline;
 import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorPipeline;
 import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
@@ -19,15 +21,12 @@ import java.util.List;
 import java.util.Map;
 
 
+public class FlowPoolRecommendPipeline {
 
-public class VlogRecommendPipeline {
-
-    public static final String FILTER_CONF = "filter_config.conf";
-    public static final String MERGE_CONF = "merge_config.conf";
+    public static final String MERGE_CONF = "flow_merge_config.conf";
 
     public static final String PREFIX = "";
 
-
     private List<RankItem> feedByRec(final RecommendRequest requestData,
                                        final int requestIndex,
                                        final User userInfo) {
@@ -46,30 +45,33 @@ public class VlogRecommendPipeline {
         Map<String, Candidate> candidates = new HashMap<String, Candidate>();
         topQueue.candidate(candidates, recallNum, userInfo, requestData, 0, 0);
 
+
         // Step 4: Recalling & Basic Scoring
         RedisSmartClient client = new RedisSmartClient();
-        RedisBackedQueueWithoutMeta queueProvider = new RedisBackedQueueWithoutMeta(client, 1000L);
+        RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
 
 
         BaseRecaller recaller = new BaseRecaller(queueProvider);
         List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<Candidate>(candidates.values()));
 
 
-        /*
         // Step 4: Advance Scoring
         timestamp = System.currentTimeMillis();
         ScorerPipeline scorerPipeline = getScorerPipeline(requestData);
         items = scorerPipeline.scoring(requestData, userInfo, requestIndex, items);
 
 
-        // Step 5: Rerank
-        timestamp = System.currentTimeMillis();
-        RankPipeline rankPipeline = getRankPipeline(requestData;
-        List<RankItem> resultItems = rankPipeline.doRank(requestData, userInfo, requestIndex, items);
+        // Step 5: Merger
+        MergeUtils.distributeItemsToMultiQueues(topQueue, items);
+        topQueue.merge(recallNum * 3, userInfo, requestData, requestIndex, 0);
+
+        // step 6:多样性融合
+        List<RankItem> mergeItems = topQueue.getItems();
+        MergeUtils.diversityRerank(mergeItems, SimilarityUtils.getIsSameUserTagOrCategoryFunc(), recallNum, 6, 2);
+
 
         // Step 6: Global Rank & subList
         // TODO: Global Rank
-        */
 
 
         return items;

+ 91 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java

@@ -0,0 +1,91 @@
+package com.tzld.piaoquan.recommend.server.implement;
+
+
+import com.tzld.piaoquan.recommend.server.framework.merger.MergeUtils;
+import com.tzld.piaoquan.recommend.server.framework.merger.SimilarityUtils;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
+import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
+import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueue;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.score.ScorerPipeline;
+import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorPipeline;
+import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
+import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.tzld.piaoquan.recommend.server.service.score.ScorerUtils.getScorerPipeline;
+
+
+public class TopRecommendPipeline {
+
+    public static final String FILTER_CONF = "filter_config.conf";
+    public static final String MERGE_CONF = "merge_config.conf";
+
+    public static final String PREFIX = "";
+
+
+    private List<RankItem> feedByRec(final RecommendRequest requestData,
+                                       final int requestIndex,
+                                       final User userInfo) {
+        int recallNum =  200;
+
+        // Step 1: Attention extraction
+        long timestamp = System.currentTimeMillis();
+        UserAttentionExtractorPipeline attentionExtractorPipeline = UserAttentionExtractorUtils.getAtttentionPipeline(UserAttentionExtractorUtils.BASE_CONF);
+        attentionExtractorPipeline.extractAttention(requestData, userInfo);
+
+        // Step 2: create top queue
+        StrategyQueue topQueue = MergeUtils.createTopQueue(MERGE_CONF, "top-queue");
+
+
+        // Step 3: Candidate
+        Map<String, Candidate> candidates = new HashMap<String, Candidate>();
+        topQueue.candidate(candidates, recallNum, userInfo, requestData, 0, 0);
+
+
+        // Step 4: Recalling & Basic Scoring
+        RedisSmartClient client = new RedisSmartClient();
+        RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
+
+
+        BaseRecaller recaller = new BaseRecaller(queueProvider);
+        List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<Candidate>(candidates.values()));
+
+
+        // Step 4: Advance Scoring
+        timestamp = System.currentTimeMillis();
+        ScorerPipeline scorerPipeline = getScorerPipeline(requestData);
+        items = scorerPipeline.scoring(requestData, userInfo, requestIndex, items);
+
+
+        // Step 5: Merger
+        MergeUtils.distributeItemsToMultiQueues(topQueue, items);
+        topQueue.merge(recallNum * 3, userInfo, requestData, requestIndex, 0);
+
+        // step 6:多样性融合
+        List<RankItem> mergeItems = topQueue.getItems();
+        MergeUtils.diversityRerank(mergeItems, SimilarityUtils.getIsSameUserTagOrCategoryFunc(), recallNum, 6, 2);
+
+
+        //
+        // timestamp = System.currentTimeMillis();
+        // RankPipeline rankPipeline = getRankPipeline(requestData;
+        // List<RankItem> resultItems = rankPipeline.doRank(requestData, userInfo, requestIndex, items);
+
+
+        // Step 6: Global Rank & subList
+        // TODO: Global Rank
+
+
+        return items;
+    }
+
+
+}

+ 58 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Global24hHotCandidate.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.recommend.server.implement.candidate;
+
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.IndexCandidateQueue;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueConfig;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueInfo;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Global24hHotCandidate extends IndexCandidateQueue {
+
+    public static final String ItemType = "video";
+
+    public Global24hHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+        super(strategyQueueInfo, strategyQueueConfig);
+    }
+
+    @Override
+    public int addCandidateKey(Map<String, Candidate> candidates, int recallNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
+        int currRecall = 0;
+        String region = user.getRegion();
+        Map<String, Candidate> simplifiedCandidates = new ConcurrentHashMap<String, Candidate>();
+
+
+        // index key  video:queue:type=global24h:region=北京:ordering=rov
+        QueueName queueName24HGlobal = new QueueName(ItemType, "rov")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=realplay
+        queueName24HGlobal = new QueueName(ItemType, "realplay")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=str
+        queueName24HGlobal = new QueueName(ItemType, "str")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=global24h:region=北京:ordering=realplay_rate
+        queueName24HGlobal = new QueueName(ItemType, "realplay_ratio")
+                .addMatch("type", "global24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HGlobal, 1, getStrategyQueueInfo().getQueueName());
+
+
+        candidates.putAll(simplifiedCandidates);
+        return currRecall;
+
+    }
+}

+ 66 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Region1hHotCandidate.java

@@ -0,0 +1,66 @@
+package com.tzld.piaoquan.recommend.server.implement.candidate;
+
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.IndexCandidateQueue;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueConfig;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueInfo;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Region1hHotCandidate extends IndexCandidateQueue {
+
+    public static final String ItemType = "video";
+
+    public Region1hHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+        super(strategyQueueInfo, strategyQueueConfig);
+    }
+
+    @Override
+    public int addCandidateKey(Map<String, Candidate> candidates, int recallNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
+        int currRecall = 0;
+        String region = user.getRegion();
+        Map<String, Candidate> simplifiedCandidates = new ConcurrentHashMap<String, Candidate>();
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=rov
+        QueueName queueName1HRegion = new QueueName(ItemType, "short-rov")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=rov
+        queueName1HRegion = new QueueName(ItemType, "rov")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=realplay
+        queueName1HRegion = new QueueName(ItemType, "realplay")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=str
+        queueName1HRegion = new QueueName(ItemType, "str")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+
+        // index key  video:queue:type=region1h:region=北京:ordering=realplay_rate
+        queueName1HRegion = new QueueName(ItemType, "realplay_ratio")
+                .addMatch("type", "region1h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+        candidates.putAll(simplifiedCandidates);
+        return currRecall;
+    }
+}

+ 21 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/SimpleHotCandidate.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/Region24hHotCandidate.java

@@ -11,11 +11,11 @@ import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class SimpleHotCandidate extends IndexCandidateQueue {
+public class Region24hHotCandidate extends IndexCandidateQueue {
 
-    public static final String ItemType = "vlog";
+    public static final String ItemType = "video";
 
-    public SimpleHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+    public Region24hHotCandidate(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
         super(strategyQueueInfo, strategyQueueConfig);
     }
 
@@ -25,16 +25,31 @@ public class SimpleHotCandidate extends IndexCandidateQueue {
         String region = user.getRegion();
         Map<String, Candidate> simplifiedCandidates = new ConcurrentHashMap<String, Candidate>();
 
+
+        // index key  video:queue:type=region24h:region=北京:ordering=rov
         QueueName queueName24HRegion = new QueueName(ItemType, "rov")
                 .addMatch("type", "region24h")
                 .addMatch("region", region);
         currRecall += addCandidateKey(simplifiedCandidates, queueName24HRegion, 1, getStrategyQueueInfo().getQueueName());
 
-        QueueName queueName1HRegion = new QueueName(ItemType, "rov")
-                .addMatch("type", "region1h")
+        // index key  video:queue:type=region24h:region=北京:ordering=realplay
+        queueName24HRegion = new QueueName(ItemType, "realplay")
+                .addMatch("type", "region24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=region24h:region=北京:ordering=str
+        queueName24HRegion = new QueueName(ItemType, "str")
+                .addMatch("type", "region24h")
                 .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HRegion, 1, getStrategyQueueInfo().getQueueName());
+
+        // index key  video:queue:type=region24h:region=北京:ordering=realplay_rate
+        queueName24HRegion = new QueueName(ItemType, "realplay_ratio")
+                .addMatch("type", "region24h")
+                .addMatch("region", region);
+        currRecall += addCandidateKey(simplifiedCandidates, queueName24HRegion, 1, getStrategyQueueInfo().getQueueName());
 
-        currRecall += addCandidateKey(simplifiedCandidates, queueName1HRegion, 1, getStrategyQueueInfo().getQueueName());
 
         candidates.putAll(simplifiedCandidates);
         return currRecall;

+ 0 - 33
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/recaller/HistoryLongPeriodFilter.java

@@ -1,33 +0,0 @@
-package com.tzld.piaoquan.recommend.server.implement.recaller;
-
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
-import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.InMemoryItem;
-import com.google.common.collect.Sets;
-import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-
-import java.util.Set;
-
-
-public class HistoryLongPeriodFilter extends AbstractFilter<InMemoryItem> {
-
-    protected Set<String> historySet = Sets.newHashSet();
-
-    public HistoryLongPeriodFilter(FilterConfigInfo filterConfigInfo,
-                         RecommendRequest recommendRequest,
-                         User user,
-                         Integer requestIndex) {
-        super(filterConfigInfo, recommendRequest, user, requestIndex);
-        // 用户近1个月播放行为历史历史
-        historySet = Sets.newHashSet(user.getLast1monthUserAction().getFeedsPlayed());
-        if (historySet == null || historySet.size() == 0) {
-            historySet = Sets.newHashSet();
-        }
-    }
-
-    public boolean predicate(Candidate candidate, InMemoryItem t) {
-        return !this.historySet.contains(t.id);
-    }
-
-
-}

+ 331 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer.java

@@ -0,0 +1,331 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.rank.strategy.OfflineVlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.service.score.BaseLRModelScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.LRModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class VlogShareLRScorer extends BaseLRModelScorer {
+
+    private final static int CORE_POOL_SIZE = 64;
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogShareLRScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+    private static final double defaultUserCtrGroupNumber = 10.0;
+    private static final int enterFeedsScoreRatio = 10;
+    private static final int enterFeedsScoreNum = 20;
+
+
+    public VlogShareLRScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(),
+                userFeature == null ? UserFeature.defaultInstance(param.getMid()) : userFeature);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // userBytes
+        UserBytesFeature userInfoBytes = null;
+        userInfoBytes = new UserBytesFeature(user);
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userInfoBytes, requestContext, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ctr
+     */
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final UserBytesFeature userInfoBytes,
+                            final RequestContext requestContext) {
+
+        LRSamples lrSamples = null;
+        VlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
+
+        try {
+            VideoBytesFeature newsInfoBytes = new VideoBytesFeature(item.getItemFeature() == null
+                    ? ItemFeature.defaultInstance(item.getVideoId() + "")
+                    : item.getItemFeature());
+            lrSamples = bytesFeatureExtractor.single(userInfoBytes, newsInfoBytes,
+                    new RequestContextBytesFeature(requestContext));
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{new String(userInfoBytes.getUid()), item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+            // 增加实时特征后打开在线存储日志逻辑
+            //
+            // CtrSamples.Builder samples =  com.tzld.piaoquan.recommend.server.gen.recommend.CtrSamples.newBuilder();
+            // samples.setLr_samples(lrSamples);
+            // item.setSamples(samples);
+            //
+        }
+        item.setScore(pro);
+        return pro;
+    }
+
+
+    /**
+     * 并行打分
+     *
+     * @param items
+     * @param userInfoBytes
+     * @param requestContext
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final UserBytesFeature userInfoBytes,
+                                  final RequestContext requestContext,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userInfoBytes, requestContext);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", requestContext.getRequest_id(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getRequest_id(), items.size(), cancel});
+    }
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                           final Map<String, String> userFeatureMap,
+                                           final List<RankItem> rankItems){
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        // userBytes
+        Map<String, byte[]> userFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: userFeatureMap.entrySet()){
+            userFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+        //sceneBytes
+        Map<String, byte[]> sceneFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: sceneFeatureMap.entrySet()){
+            sceneFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMapByte, sceneFeatureMapByte, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, byte[]> userFeatureMapByte,
+                                  final Map<String, byte[]> sceneFeatureMapByte,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            // items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMapByte, sceneFeatureMapByte);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMapByte.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{sceneFeatureMapByte.size(), items.size(), cancel});
+    }
+
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final Map<String, byte[]> userFeatureMapByte,
+                            final Map<String, byte[]> sceneFeatureMapByte) {
+
+        LRSamples lrSamples = null;
+        OfflineVlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor();
+
+        try {
+
+            Map<String, byte[]> itemFeatureByte = new HashMap<>();
+            for (Map.Entry<String, String> entry: item.getFeatureMap().entrySet()){
+                itemFeatureByte.put(entry.getKey(), entry.getValue().getBytes());
+            }
+            lrSamples = bytesFeatureExtractor.single(userFeatureMapByte, itemFeatureByte, sceneFeatureMapByte);
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{"", item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setScoreStr(pro);
+        return pro;
+    }
+}

+ 331 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogShareLRScorer4Ros.java

@@ -0,0 +1,331 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.rank.strategy.OfflineVlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.service.score.BaseLRModelScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.LRModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class VlogShareLRScorer4Ros extends BaseLRModelScorer {
+
+    private final static int CORE_POOL_SIZE = 64;
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogShareLRScorer4Ros.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+    private static final double defaultUserCtrGroupNumber = 10.0;
+    private static final int enterFeedsScoreRatio = 10;
+    private static final int enterFeedsScoreNum = 20;
+
+
+    public VlogShareLRScorer4Ros(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(),
+                userFeature == null ? UserFeature.defaultInstance(param.getMid()) : userFeature);
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // userBytes
+        UserBytesFeature userInfoBytes = null;
+        userInfoBytes = new UserBytesFeature(user);
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userInfoBytes, requestContext, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ctr
+     */
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final UserBytesFeature userInfoBytes,
+                            final RequestContext requestContext) {
+
+        LRSamples lrSamples = null;
+        VlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
+
+        try {
+            VideoBytesFeature newsInfoBytes = new VideoBytesFeature(item.getItemFeature() == null
+                    ? ItemFeature.defaultInstance(item.getVideoId() + "")
+                    : item.getItemFeature());
+            lrSamples = bytesFeatureExtractor.single(userInfoBytes, newsInfoBytes,
+                    new RequestContextBytesFeature(requestContext));
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{new String(userInfoBytes.getUid()), item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+            // 增加实时特征后打开在线存储日志逻辑
+            //
+            // CtrSamples.Builder samples =  com.tzld.piaoquan.recommend.server.gen.recommend.CtrSamples.newBuilder();
+            // samples.setLr_samples(lrSamples);
+            // item.setSamples(samples);
+            //
+        }
+        item.setScore(pro);
+        return pro;
+    }
+
+
+    /**
+     * 并行打分
+     *
+     * @param items
+     * @param userInfoBytes
+     * @param requestContext
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final UserBytesFeature userInfoBytes,
+                                  final RequestContext requestContext,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userInfoBytes, requestContext);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", requestContext.getRequest_id(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getRequest_id(), items.size(), cancel});
+    }
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                  final Map<String, String> userFeatureMap,
+                                  final List<RankItem> rankItems){
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        // userBytes
+        Map<String, byte[]> userFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: userFeatureMap.entrySet()){
+            userFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+        //sceneBytes
+        Map<String, byte[]> sceneFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: sceneFeatureMap.entrySet()){
+            sceneFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMapByte, sceneFeatureMapByte, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, byte[]> userFeatureMapByte,
+                                  final Map<String, byte[]> sceneFeatureMapByte,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+//            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMapByte, sceneFeatureMapByte);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMapByte.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{sceneFeatureMapByte.size(), items.size(), cancel});
+    }
+
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final Map<String, byte[]> userFeatureMapByte,
+                            final Map<String, byte[]> sceneFeatureMapByte) {
+
+        LRSamples lrSamples = null;
+        OfflineVlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor();
+
+        try {
+
+            Map<String, byte[]> itemFeatureByte = new HashMap<>();
+            for (Map.Entry<String, String> entry: item.getFeatureMap().entrySet()){
+                itemFeatureByte.put(entry.getKey(), entry.getValue().getBytes());
+            }
+            lrSamples = bytesFeatureExtractor.single(userFeatureMapByte, itemFeatureByte, sceneFeatureMapByte);
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{"", item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setScoreRos(pro);
+        return pro;
+    }
+}

+ 144 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/score/VlogThompsonScorer.java

@@ -0,0 +1,144 @@
+package com.tzld.piaoquan.recommend.server.implement.score;
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext;
+import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.score.BaseThompsonSamplingScorer;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerConfigInfo;
+import com.tzld.piaoquan.recommend.server.service.score.model.ThompsonSamplingModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+
+//@Service
+public class VlogThompsonScorer extends BaseThompsonSamplingScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogThompsonScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+    public VlogThompsonScorer(ScorerConfigInfo configInfo) {
+        super(configInfo);
+    }
+
+    @Override
+    public List<RankItem> scoring(final ScoreParam param,
+                                  final UserFeature userFeature,
+                                  final List<RankItem> rankItems) {
+
+        if (userFeature == null || CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(rankItems, param.getRequestContext(), userFeature);
+
+        LOGGER.debug("thompson sampling ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final List<RankItem> items,
+                                      final RequestContext requestContext,
+                                      final UserFeature user) {
+        long startTime = System.currentTimeMillis();
+        ThompsonSamplingModel model = (ThompsonSamplingModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // 所有都参与打分,按照ROV Thompson排序
+        multipleCtrScore(items, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("after enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i).getScore());
+            }
+        }
+
+        LOGGER.debug("thompson ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[thompson ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+
+    /**
+     * 计算 predict ROV
+     */
+    public double calcScore(final ThompsonSamplingModel model,
+                            final RankItem item) {
+        double score = 0d;
+        try {
+            score = model.score(item);
+        } catch (Exception e) {
+            LOGGER.error("score error for doc={} exception={}", new Object[]{
+                    item.getVideo(), ExceptionUtils.getFullStackTrace(e)});
+        }
+        item.setScore(score);
+        return score;
+    }
+
+
+    /**
+     * 并行打分 Thompson ROV
+     *
+     * @param items
+     * @param model
+     */
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final ThompsonSamplingModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //设置为原始值为0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex));
+                    } catch (Exception e) {
+                        LOGGER.error("thompson exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("ROV-Thompson Score {}, Total: {}, Cancel: {}", new Object[]{items.size(), cancel});
+    }
+}

+ 12 - 6
recommend-server-service/src/main/resources/merge_config.conf

@@ -1,17 +1,23 @@
 queue-config = {
   top-queue = {
-    class = "com.tzld.piaoquan.recommend.server.framework.SimpleMergeQueue"
+    class = "com.tzld.piaoquan.recommend.server.framework.merger.SimpleMergeQueue"
     children = {
       hot-queue = {
-        class = "com.tzld.piaoquan.recommend.server.framework.SimpleMergeQueue"
+        class = "com.tzld.piaoquan.recommend.server.framework.merger.SimpleMergeQueue"
         children = {
-          hot-index = {
-            class = "com.tzld.piaoquan.recommend.server.implement.candidate.HotCandidateQueue"
+          global1h-index = {
+            class = "com.tzld.piaoquan.recommend.server.implement.candidate.Global24hHotCandidate"
+          }
+          region1h-index = {
+            class = "com.tzld.piaoquan.recommend.server.implement.candidate.Region1hHotCandidate"
+          }
+          region24h-index = {
+            class = "com.tzld.piaoquan.recommend.server.implement.candidate.Region24hHotCandidate"
           }
         }
       }
       explore-queue = {
-        class = "com.tzld.piaoquan.recommend.server.framework.SimpleMergeQueue"
+        class = "com.tzld.piaoquan.recommend.server.framework.merger.SimpleMergeQueue"
         children = {
           category-explore-index = {
             class = "com.tzld.piaoquan.recommend.server.implement.candidate.HotCandidateQueue"
@@ -19,7 +25,7 @@ queue-config = {
         }
       }
       exploit-queue = {
-        class = "com.tzld.piaoquan.recommend.server.framework.SimpleMergeQueue"
+        class = "com.tzld.piaoquan.recommend.server.framework.merger.SimpleMergeQueue"
         children = {
           user-group-index = {
             class = "com.tzld.piaoquan.recommend.server.implement.candidate.HotCandidateQueue"