sunmingze 1 year ago
parent
commit
0df44cb86e

+ 0 - 137
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/IndexDescription.java

@@ -1,137 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.candidiate;
-
-
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * 描述检索的key
- */
-public class IndexDescription {
-    public static Logger LOGGER = LoggerFactory.getLogger(IndexDescription.class);
-    public String name;
-
-    public int recallWeight;
-
-    /**
-     * redis,knn
-     */
-    public String db;
-    public String itemType;
-    public String indexName;
-    public String keyType;
-    public String key;
-    public List<Pair<String, String>> matches;
-    public String ordering;
-
-    /**
-     * 处理类似item_cf:when=day:score=click 的情况,提取出
-     * item_cf, when->day
-     */
-    public void processIndexName() {
-        if (indexName.contains(":")) {
-            String[] indics = indexName.split(":");
-            indexName = indics[0];
-            if (indics.length > 1) {
-                matches = Lists.newArrayList();
-                for (int i = 1; i < indics.length; ++i) {
-                    String[] pair = indics[i].split("=");
-                    if (pair.length == 2) {
-                        matches.add(Pair.of(pair[0], pair[1]));
-                    } else {
-                        LOGGER.error("index name [{}] match failed", indics[i]);
-                    }
-                }
-            }
-        }
-    }
-
-
-    /**
-     * 根据当前indexdesc的字段信息,生成candidate,用于后续召回
-     * @param queue
-     * @param indexDescription
-     * @return
-     */
-    public static Candidate convertToCandidate(String queue, IndexDescription indexDescription) {
-        Candidate candidate = new Candidate();
-        indexDescription.processIndexName();
-        QueueName queueName;
-
-        //knn类召回,不需要type
-        if ("knn".equalsIgnoreCase(indexDescription.keyType)) {
-            queueName = new QueueName(indexDescription.itemType, indexDescription.ordering);
-        } else {
-            queueName = new QueueName(indexDescription.itemType, indexDescription.ordering)
-                    .addMatch("type", indexDescription.indexName);
-        }
-
-
-        if (indexDescription.matches != null && !indexDescription.matches.isEmpty()) {
-            for (Map.Entry<String, String> entry : indexDescription.matches) {
-                queueName.addMatch(entry.getKey(), entry.getValue());
-            }
-        }
-
-        //添加类似tag->范冰冰 这类索引
-        if (StringUtils.isNotBlank(indexDescription.keyType)) {
-            queueName.addMatch(indexDescription.keyType, indexDescription.key);
-        }
-
-        candidate.setCandidateKey(queueName.toString());
-        candidate.setCandidateNum(indexDescription.recallWeight);
-        candidate.setMergeQueueNum(indexDescription.recallWeight);
-        candidate.setCandidateQueueName(queueName);
-        candidate.setMergeQueueName(queue);
-
-        return candidate;
-    }
-
-    public static final String RECALL_WEIGHT = "recall-weight";
-    public static final String INDEX_DESC = "index-desc";
-    public static final String DB = "db"; // source
-    public static final String ITEM_TYPE = "item-type";
-    public static final String NAME = "name";
-    public static final String KEY_TYPE = "key-type";
-    public static final String KEY = "key";
-    public static final String ORDERING = "ordering";
-    public static final String MATCHES = "matches";
-
-    public static IndexDescription parseIndexDescription(String name, Config conf) {
-        if (!conf.hasPath(RECALL_WEIGHT) || !conf.hasPath(INDEX_DESC)) {
-            LOGGER.error("name=" + name + ",必须输入recall-weight和indexdesc");
-            return null;
-        }
-
-        Config desc = conf.getConfig(INDEX_DESC);
-        if (!desc.hasPath(DB) || !desc.hasPath(ITEM_TYPE) || !desc.hasPath(NAME) || !desc.hasPath(ORDERING)) {
-            LOGGER.error("name=" + name + ",必须输入recall-weight和indexdesc");
-            return null;
-        }
-
-        IndexDescription indexDescription = new IndexDescription();
-        indexDescription.name = name;
-        indexDescription.recallWeight = conf.getInt(RECALL_WEIGHT);
-        indexDescription.db = desc.getString(DB);
-        indexDescription.itemType = desc.getString(ITEM_TYPE);
-        indexDescription.indexName = desc.getString(NAME);
-        indexDescription.ordering = desc.getString(ORDERING);
-
-        if (desc.hasPath(KEY_TYPE)) {
-            indexDescription.keyType = desc.getString(KEY_TYPE);
-        }
-        if (desc.hasPath(KEY)) {
-            indexDescription.key = desc.getString(KEY);
-        }
-
-        return indexDescription;
-    }
-}

+ 6 - 23
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/QueueName.java

@@ -4,6 +4,7 @@ package com.tzld.piaoquan.recommend.server.framework.candidiate;
 import com.google.common.base.Function;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Joiner;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.FluentIterable;
+import lombok.Data;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Pair;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
@@ -13,6 +14,7 @@ import java.util.List;
 /**
 /**
  * Queue names.召回队列命名处理
  * Queue names.召回队列命名处理
  */
  */
+@Data
 public class QueueName implements Serializable, Comparable<QueueName> {
 public class QueueName implements Serializable, Comparable<QueueName> {
     private static final Function<Pair<String, String>, String> MATCH_PATTERN =
     private static final Function<Pair<String, String>, String> MATCH_PATTERN =
             new Function<Pair<String, String>, String>() {
             new Function<Pair<String, String>, String>() {
@@ -29,6 +31,10 @@ public class QueueName implements Serializable, Comparable<QueueName> {
 
 
     private String metaChannel;  // meta 渠道
     private String metaChannel;  // meta 渠道
 
 
+    public long getTTL(){
+        return this.ttl;
+    }
+
     public QueueName(String itemType, String ordering) {
     public QueueName(String itemType, String ordering) {
         this(itemType, ordering, DEFAULT_LOCAL_CACHE_TTL);
         this(itemType, ordering, DEFAULT_LOCAL_CACHE_TTL);
     }
     }
@@ -64,26 +70,6 @@ public class QueueName implements Serializable, Comparable<QueueName> {
         return Pair.of("", "");
         return Pair.of("", "");
     }
     }
 
 
-    public String getMetaChannel() {
-        return metaChannel;
-    }
-
-    public void setMetaChannel(String metaChannel) {
-        this.metaChannel = metaChannel;
-    }
-
-    public String getItemType() {
-        return itemType;
-    }
-
-    public String getOrdering() {
-        return ordering;
-    }
-
-    public long getTtl() {
-        return ttl;
-    }
-
     public QueueName addMatch(String key, String value) {
     public QueueName addMatch(String key, String value) {
         if (value == null || value.equals("")) {
         if (value == null || value.equals("")) {
             value = "_";
             value = "_";
@@ -102,9 +88,6 @@ public class QueueName implements Serializable, Comparable<QueueName> {
         return FluentIterable.from(matches).transform(MATCH_PATTERN);
         return FluentIterable.from(matches).transform(MATCH_PATTERN);
     }
     }
 
 
-    public long getTTL() {
-        return this.ttl;
-    }
 
 
     @Override
     @Override
     public String toString() {
     public String toString() {

+ 8 - 34
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/MergeUtils.java

@@ -71,38 +71,6 @@ public class MergeUtils {
         return strategyQueue;
         return strategyQueue;
     }
     }
 
 
-    /**
-     * 分发items到策略树中
-     * 分发的过程中不要破坏相对顺序
-     *
-     * @param strategyQueue
-     * @param items
-     */
-    public static void distributeItemsToMultiQueues(StrategyQueue strategyQueue, List<RankItem> items) {
-        List<StrategyQueue> strategyQueueList = strategyQueue.getAllQueues();
-
-        Multimap<String, RankItem> mergeQueuesItems = ArrayListMultimap.create();
-        for (RankItem item : items) {
-            for (CandidateInfo candidateInfo : item.getCandidateInfoList()) {
-                String mergeQueue = candidateInfo.getCandidate().getMergeQueueName();
-                RankItem currentItem = item.deepcopy();
-
-                // set candidate info
-                currentItem.setQueue(mergeQueue);
-                currentItem.setCandidateInfo(candidateInfo);
-
-                mergeQueuesItems.put(mergeQueue, currentItem);
-            }
-        }
-
-        for (StrategyQueue queue : strategyQueueList) {
-            String mergeQueueName = queue.getStrategyQueueInfo().getQueueName();
-            if (mergeQueuesItems.containsKey(mergeQueueName)) {
-                List<RankItem> currMergeQueueItems = new ArrayList<RankItem>(mergeQueuesItems.get(mergeQueueName));
-                queue.setItems(currMergeQueueItems);
-            }
-        }
-    }
 
 
     /**
     /**
      * 基于 score 字段融合: score 较大的 Item 优先
      * 基于 score 字段融合: score 较大的 Item 优先
@@ -112,13 +80,19 @@ public class MergeUtils {
      * @param freeRecNum
      * @param freeRecNum
      * @param expId
      * @param expId
      */
      */
-    public static void simpleMergeByScore(List<RankItem> resultRankerItems, final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap, int freeRecNum, int expId) {
+    public static void simpleMergeByScore(List<RankItem> resultRankerItems,
+                                          final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap,
+                                          int freeRecNum,
+                                          int expId) {
+        // 先定义按score排序优选的独立额
         PriorityQueue<Pair<String, Integer>> mergePriorityQueue = new PriorityQueue<Pair<String, Integer>>(freeRecNum, new Comparator<Pair<String, Integer>>() {
         PriorityQueue<Pair<String, Integer>> mergePriorityQueue = new PriorityQueue<Pair<String, Integer>>(freeRecNum, new Comparator<Pair<String, Integer>>() {
             @Override
             @Override
             public int compare(Pair<String, Integer> o1, Pair<String, Integer> o2) {
             public int compare(Pair<String, Integer> o1, Pair<String, Integer> o2) {
                 return rankerItemsListMap.get(o1.getLeft()).getRight().get(o1.getRight()).compareTo(rankerItemsListMap.get(o2.getLeft()).getRight().get(o2.getRight()));
                 return rankerItemsListMap.get(o1.getLeft()).getRight().get(o1.getRight()).compareTo(rankerItemsListMap.get(o2.getLeft()).getRight().get(o2.getRight()));
             }
             }
         });
         });
+
+        // 大于最小mergenum 同时小于maxMergenum,在队列中add
         for (Pair<MergeRule, List<RankItem>> entry : rankerItemsListMap.values()) {
         for (Pair<MergeRule, List<RankItem>> entry : rankerItemsListMap.values()) {
             if (entry.getLeft().isDisabled(expId) || entry.getRight() == null || entry.getRight().isEmpty()) {
             if (entry.getLeft().isDisabled(expId) || entry.getRight() == null || entry.getRight().isEmpty()) {
                 continue;
                 continue;
@@ -130,7 +104,7 @@ public class MergeUtils {
                 mergePriorityQueue.add(Pair.of(myRule.queueName, myRule.minMergeNum));
                 mergePriorityQueue.add(Pair.of(myRule.queueName, myRule.minMergeNum));
             }
             }
         }
         }
-
+        //
         while (freeRecNum > 0 && !mergePriorityQueue.isEmpty()) {
         while (freeRecNum > 0 && !mergePriorityQueue.isEmpty()) {
             Pair<String, Integer> item = mergePriorityQueue.poll();
             Pair<String, Integer> item = mergePriorityQueue.poll();
             String myName = item.getLeft();
             String myName = item.getLeft();

+ 1 - 17
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/StrategyQueue.java

@@ -65,30 +65,14 @@ public abstract class StrategyQueue {
             }
             }
         }
         }
 
 
-//        getAllQueues();
     }
     }
 
 
-    public List<StrategyQueue> getAllQueues() {
-        if (allQueues == null) {
-            allQueues = new LinkedList<StrategyQueue>();
-
-            if (!strategyQueueInfo.isLeaf()) {
-                for (Map.Entry<String, StrategyQueue> child : children.entrySet()) {
-                    allQueues.addAll(child.getValue().getAllQueues());
-                }
-            }
-
-            allQueues.add(this);
-        }
-        return allQueues;
-    }
 
 
     private void putChild(String queueName, StrategyQueue child) {
     private void putChild(String queueName, StrategyQueue child) {
         children.put(queueName, child);
         children.put(queueName, child);
     }
     }
 
 
 
 
-
     public void clearItems() {
     public void clearItems() {
         items.clear();
         items.clear();
     }
     }
@@ -99,7 +83,7 @@ public abstract class StrategyQueue {
 
 
     public abstract int doMerge(final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap, final int recNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId);
     public abstract int doMerge(final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap, final int recNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId);
 
 
-    // 定义截断函数
+    // 定义索引Key 生成方式
     public abstract int candidate(Map<String, Candidate> candidateMap, final int recallNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId);
     public abstract int candidate(Map<String, Candidate> candidateMap, final int recallNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId);
 
 
     public final int merge(final int recNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId) {
     public final int merge(final int recNum, final User user, final RecommendRequest requestData, final int requestIndex, final int expId) {

+ 2 - 49
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -69,55 +69,8 @@ public class BaseRecaller<InMemoryItem> {
 
 
     }
     }
 
 
-    public ItemProvider<InMemoryItem> getItemProvider() {
-        return itemProvider;
-    }
-
-    public QueueProvider<InMemoryItem> getQueueProvider() {
-        return queueProvider;
-    }
-
-    public Map<Candidate, Queue<InMemoryItem>> getQueue(String queueStr) throws Exception {
-        Candidate candidate = new Candidate();
-        candidate.setCandidateKey(queueStr);
-        return loadQueues(Arrays.asList(candidate));
-    }
-
-    public Optional<InMemoryItem> getItem(String itemId) throws Exception {
-        return itemProvider.get(itemId);
-    }
-
-    public long getItemDBSize() {
-        return this.itemProvider.dbSize();
-    }
-
-    public long getIndexDBSize() {
-        return this.queueProvider.dbSize();
-    }
-
-    public long getQueueTTL(String queueNameStr, Map<String, Long> cacheRules) {
-        QueueName name = QueueName.fromString(queueNameStr);
-
-        for (String match : name.getMatches()) {
-            if (cacheRules.containsKey(match)) {
-                return cacheRules.get(match);
-            }
-        }
-        return name.getTTL();
-    }
-
-    private Map<String, Double> listToMap(List<String> list) {
-        Map<String, Double> map = new HashMap<String, Double>();
-        if (list != null) {
-            for (String elem : list) {
-                map.put(elem, 1.0);
-            }
-        }
-        return map;
-    }
-
     /**
     /**
-     * 把queue中的entry 放入RankItem中
+     * 把queue中的entry, 补充candidiate信息,放入RankItem中
      *
      *
      * @param entries
      * @param entries
      * @param candidate
      * @param candidate
@@ -206,7 +159,7 @@ public class BaseRecaller<InMemoryItem> {
     public List<RankItem> recalling(final RecommendRequest requestData, final User user, int requestIndex, List<Candidate> recallCandidates) {
     public List<RankItem> recalling(final RecommendRequest requestData, final User user, int requestIndex, List<Candidate> recallCandidates) {
 
 
         long startTime = System.currentTimeMillis();
         long startTime = System.currentTimeMillis();
-        final RecallFilter<InMemoryItem> recallFilter = new RecallFilter<InMemoryItem>(this.filterConfig, requestData, user, requestIndex);
+        final RecallFilterPipeline<InMemoryItem> recallFilter = new RecallFilterPipeline<InMemoryItem>(this.filterConfig, requestData, user, requestIndex);
 
 
         // load queue
         // load queue
         long queueLoadStartTime = System.currentTimeMillis();
         long queueLoadStartTime = System.currentTimeMillis();

+ 6 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/RecallFilter.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/RecallFilterPipeline.java

@@ -12,9 +12,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
 
 
-public class RecallFilter<T> {
+public class RecallFilterPipeline<T> {
 
 
-    private static Logger LOGGER = LoggerFactory.getLogger(RecallFilter.class);
+    private static Logger LOGGER = LoggerFactory.getLogger(RecallFilterPipeline.class);
     public int filterNum;
     public int filterNum;
     private FilterConfig config;
     private FilterConfig config;
     private List<AbstractFilter<T>> filters;
     private List<AbstractFilter<T>> filters;
@@ -22,10 +22,10 @@ public class RecallFilter<T> {
     private User user;
     private User user;
     private int requestIndex;
     private int requestIndex;
 
 
-    public RecallFilter(FilterConfig config,
-                        RecommendRequest requestContext,
-                        User user,
-                        int requestIndex) {
+    public RecallFilterPipeline(FilterConfig config,
+                                RecommendRequest requestContext,
+                                User user,
+                                int requestIndex) {
 
 
         this.config = config;
         this.config = config;
         this.requestContext = requestContext;
         this.requestContext = requestContext;