Browse Source

v0.2 20240201

sunmingze 1 year ago
parent
commit
cf4bcc8cd4

+ 8 - 0
recommend-server-service/pom.xml

@@ -211,6 +211,14 @@
             <version>18.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
 
 

+ 1 - 7
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/VlogRecommendPipeline.java

@@ -4,9 +4,6 @@ package com.tzld.piaoquan.recommend.server.framework;
 import com.tzld.piaoquan.recommend.server.framework.merger.MergeUtils;
 import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
 import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
-import com.tzld.piaoquan.recommend.server.framework.recaller.FilterConfig;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.InMemoryItem;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.ItemProvider;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueueWithoutMeta;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
@@ -51,13 +48,10 @@ public class VlogRecommendPipeline {
 
         // Step 4: Recalling & Basic Scoring
         RedisSmartClient client = new RedisSmartClient();
-        ItemProvider<InMemoryItem> itemProvider = new ItemProvider<>(client, PREFIX);
         RedisBackedQueueWithoutMeta queueProvider = new RedisBackedQueueWithoutMeta(client, 1000L);
 
-        FilterConfig filterConfig = new FilterConfig();
-        filterConfig.load(FILTER_CONF);
 
-        BaseRecaller recaller = new BaseRecaller(itemProvider, queueProvider, filterConfig);
+        BaseRecaller recaller = new BaseRecaller(queueProvider);
         List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<Candidate>(candidates.values()));
 
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/IndexCandidateQueue.java

@@ -31,7 +31,7 @@ public abstract class IndexCandidateQueue extends StrategyQueue {
     @Override
     public int doMerge(final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap, int recNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
         // TODO: sublist by every queue
-        return MergeUtils.simpleMergeWithProtection(items, rankerItemsListMap, recNum, user, requestData, requestIndex, expId);
+        return MergeUtils.simpleMergeWithProtection(items, rankerItemsListMap, recNum, expId);
     }
 
     @Override

+ 3 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/MergeUtils.java

@@ -130,8 +130,6 @@ public class MergeUtils {
      *
      * @param rankerItemsListMap
      * @param recNum
-     * @param user
-     * @param requestData
      * @param requestIndex
      * @param expId
      * @return
@@ -139,9 +137,6 @@ public class MergeUtils {
     public static int simpleMergeWithProtection(List<RankItem> resultRankerItems,
                                                 final Map<String, Pair<MergeRule, List<RankItem>>> rankerItemsListMap,
                                                 int recNum,
-                                                User user,
-                                                RecommendRequest requestData,
-                                                int requestIndex,
                                                 int expId) {
         if (resultRankerItems == null) {
             resultRankerItems = new LinkedList<RankItem>();
@@ -160,7 +155,9 @@ public class MergeUtils {
             freeRecNum -= myMinMergeNum;
 
             int prevSize = resultRankerItems.size();
+            // 先保证最小长度添加
             resultRankerItems.addAll(entry.getRight().subList(0, myMinMergeNum));
+
             int afterSize = resultRankerItems.size();
             for (int ix = prevSize; ix < afterSize; ++ix) {
                 resultRankerItems.get(ix).addMergeDecisionLabel("min_protect:" + myRule.queueName);
@@ -168,6 +165,7 @@ public class MergeUtils {
         }
 
         if (freeRecNum > 0) {
+            //保证最小长度后,其他按score融合
             simpleMergeByScore(resultRankerItems, rankerItemsListMap, freeRecNum, expId);
         }
 

+ 0 - 33
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/AbstractFilter.java

@@ -1,33 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller;
-
-import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
-import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class AbstractFilter<T> {
-    protected final static Logger LOGGER = LoggerFactory.getLogger(AbstractFilter.class);
-    protected final FilterConfigInfo filterConfigInfo;
-    protected final RecommendRequest requestContext;
-    protected final User user;
-    protected final int requestIndex;
-
-    public AbstractFilter(FilterConfigInfo filterConfigInfo,
-                          RecommendRequest requestContext,
-                          User user,
-                          Integer requestIndex) {
-
-        this.filterConfigInfo = filterConfigInfo;
-        this.requestContext = requestContext;
-        this.user = user;
-        this.requestIndex = requestIndex;
-    }
-
-    public FilterConfigInfo getFilterConfigInfo() {
-        return filterConfigInfo;
-    }
-
-    public abstract boolean predicate(Candidate candidate, T t);
-}

+ 24 - 52
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -2,8 +2,6 @@ package com.tzld.piaoquan.recommend.server.framework.recaller;
 
 
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -12,7 +10,6 @@ import com.google.common.collect.Maps;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.*;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.ItemProvider;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import org.apache.commons.collections4.CollectionUtils;
@@ -21,7 +18,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,21 +36,16 @@ public class BaseRecaller<InMemoryItem> {
     private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
     private static final ExecutorService filterExecutorService = Executors.newFixedThreadPool(128);
     private static final ExecutorService fetchQueueExecutorService = Executors.newFixedThreadPool(128);
-
-    protected final ItemProvider<InMemoryItem> itemProvider;
     private final QueueProvider<InMemoryItem> queueProvider;
-    private final FilterConfig filterConfig ;
     private final long QUEUE_LOAD_TIMEOUT;
 
-    public BaseRecaller(ItemProvider<InMemoryItem> itemProvider, QueueProvider<InMemoryItem> queueProvider, FilterConfig filterConfig) {
-        this(itemProvider, queueProvider, filterConfig, DEFAULT_QUEUE_LOAD_TIMEOUT);
+    public BaseRecaller(QueueProvider<InMemoryItem> queueProvider) {
+        this(queueProvider, DEFAULT_QUEUE_LOAD_TIMEOUT);
     }
 
-    public BaseRecaller(ItemProvider<InMemoryItem> itemProvider, QueueProvider<InMemoryItem> queueProvider,
-                        FilterConfig filterConfig, long queueLoadTimeout) {
-        this.itemProvider = itemProvider;
+    public BaseRecaller(QueueProvider<InMemoryItem> queueProvider,
+                        long queueLoadTimeout) {
         this.queueProvider = queueProvider;
-        this.filterConfig = filterConfig;
         this.QUEUE_LOAD_TIMEOUT = queueLoadTimeout;
     }
 
@@ -63,10 +54,8 @@ public class BaseRecaller<InMemoryItem> {
     }
 
 
-
-    public boolean isValidItem(InMemoryItem item){
-        return item!= null;
-
+    public boolean isValidItem(InMemoryItem item) {
+        return item != null;
     }
 
     /**
@@ -74,10 +63,9 @@ public class BaseRecaller<InMemoryItem> {
      *
      * @param entries
      * @param candidate
-     * @param user
      * @return
      */
-    private List<RankItem> toHits(final Iterable<Entry<InMemoryItem>> entries, final Candidate candidate, final User user) {
+    private List<RankItem> toHits(final Iterable<Entry<InMemoryItem>> entries, final Candidate candidate) {
 
         List<RankItem> result = new ArrayList<RankItem>();
         for (Entry entry : entries) {
@@ -98,9 +86,7 @@ public class BaseRecaller<InMemoryItem> {
 
     // 读取redis中的数据放入queue中
     public Map<Candidate, Queue<InMemoryItem>> loadQueues(List<Candidate> candidates) {
-
         // update queueName
-        // final Map<String, Long> cacheRules = getCacheRulesConfig();
         Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(new Function<Candidate, Candidate>() {
             @Override
             public Candidate apply(Candidate candidate) {
@@ -146,9 +132,6 @@ public class BaseRecaller<InMemoryItem> {
 
     /**
      * recall
-     * 1. construct recall filter
-     * 2. Redis并行召回
-     * 3. do filter
      *
      * @param requestData
      * @param user
@@ -159,7 +142,6 @@ public class BaseRecaller<InMemoryItem> {
     public List<RankItem> recalling(final RecommendRequest requestData, final User user, int requestIndex, List<Candidate> recallCandidates) {
 
         long startTime = System.currentTimeMillis();
-        final RecallFilterPipeline<InMemoryItem> recallFilter = new RecallFilterPipeline<InMemoryItem>(this.filterConfig, requestData, user, requestIndex);
 
         // load queue
         long queueLoadStartTime = System.currentTimeMillis();
@@ -204,11 +186,15 @@ public class BaseRecaller<InMemoryItem> {
             }
         }
 
-        // do filter
-        // 执行 recall filter配置文件中的方法
-        long filterStartTime = System.currentTimeMillis();
 
-        List<Map.Entry<Candidate, Queue<InMemoryItem>>> batch = new ArrayList<Map.Entry<Candidate, Queue<InMemoryItem>>>();
+        List<RankItem> result = convertToRankItem(candidateQueueMap);
+
+        return result;
+    }
+
+
+    private List<RankItem> convertToRankItem(Map<Candidate, Queue<InMemoryItem>> candidateQueueMap) {
+
         final List<Callable<List<RankItem>>> callables = new ArrayList<Callable<List<RankItem>>>();
         int expectedRecallSum = 0;
         for (final Map.Entry<Candidate, Queue<InMemoryItem>> entry : candidateQueueMap.entrySet()) {
@@ -219,16 +205,10 @@ public class BaseRecaller<InMemoryItem> {
                     final Candidate candidate = entry.getKey();
                     try {
                         // 1. filter
-                        Iterable<Entry<InMemoryItem>> entries = FluentIterable.from(entry.getValue()).filter(new Predicate<Entry<InMemoryItem>>() {
-                            @Override
-                            public boolean apply(Entry<InMemoryItem> entry) {
-                                return isValidItem(entry.item) &&
-                                        recallFilter.predicate(candidate, entry.item);
-                            }
-                        }).limit(candidate.getCandidateNum());
+                        Iterable<Entry<InMemoryItem>> entries = FluentIterable.from(entry.getValue()).limit(candidate.getCandidateNum());
 
                         // 2. toHits
-                        candidateHits.addAll(toHits(entries, candidate, user));
+                        candidateHits.addAll(toHits(entries, candidate));
 
                         // debug log for tracing
                         LOGGER.debug("recalled candidate [{}], queue length [{}], expected [{}], hit [{}]",
@@ -236,7 +216,6 @@ public class BaseRecaller<InMemoryItem> {
                     } catch (Exception e) {
                         LOGGER.error("recall filter queue occur error, queue [{}], error: [{}]", candidate.toString(), ExceptionUtils.getFullStackTrace(e));
                     }
-
                     return candidateHits;
                 }
             });
@@ -245,7 +224,6 @@ public class BaseRecaller<InMemoryItem> {
         Map<String, RankItem> hits = new HashMap<String, RankItem>();
         try {
             List<Future<List<RankItem>>> futures = filterExecutorService.invokeAll(callables, DEFAULT_PARALLEL_FILTER_TIMEOUT, TimeUnit.MILLISECONDS);
-
             for (Future<List<RankItem>> future : futures) {
                 try {
                     if (future.isDone() && !future.isCancelled() && future.get() != null) {
@@ -259,34 +237,28 @@ public class BaseRecaller<InMemoryItem> {
                                 hits.put(item.getId(), item);
                             }
                         }
+                        LOGGER.debug("collect items from filter task, userid: [{}], item size [{}]", "", part.size());
                     } else {
-                        LOGGER.error("parallel recall filter Canceled {} ", requestData.getRequestId());
+                        LOGGER.error("parallel recall filter Canceled {} ", "");
                     }
                 } catch (Exception e) {
                     LOGGER.error("parallel recall filter occur error, uid: [{}], Exception [{}]",
-                            requestData.getRequestId(), ExceptionUtils.getFullStackTrace(e));
+                            "", ExceptionUtils.getFullStackTrace(e));
                 }
             }
         } catch (Exception e) {
             LOGGER.error("parallel recall filter occur error, uid: [{}], Exception [{}]",
-                    requestData.getRequestId(), ExceptionUtils.getFullStackTrace(e));
+                    "", ExceptionUtils.getFullStackTrace(e));
         }
 
         List<RankItem> result = new ArrayList<RankItem>(hits.values());
+
         return result;
     }
 
-    private Map<Candidate, Queue<InMemoryItem>> obtainQueue(List<Candidate> refactorCandidates, RecommendRequest requestData, User user, boolean isFromRedis) {
-        if (isFromRedis) {
-            return loadQueues(refactorCandidates);
-        } else {
-            return fetchQueues(refactorCandidates, requestData, user);
-        }
 
+    private Map<Candidate, Queue<InMemoryItem>> obtainQueue(List<Candidate> refactorCandidates, RecommendRequest requestData, User user, boolean isFromRedis) {
+        return loadQueues(refactorCandidates);
     }
 
-    protected Map<Candidate, Queue<InMemoryItem>> fetchQueues(List<Candidate> candidates, RecommendRequest requestData, User user) {
-
-        return null;
-    }
 }

+ 0 - 99
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/FilterConfig.java

@@ -1,99 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller;
-
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigObject;
-import com.typesafe.config.ConfigValue;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
-
- */
-
-public class FilterConfig {
-    private static Logger LOGGER = LoggerFactory.getLogger(FilterConfig.class);
-    private List<FilterConfigInfo> filterConfigInfoList = new ArrayList<FilterConfigInfo>();
-
-    public FilterConfig(Config config) {
-        this.load(config);
-    }
-
-
-    public FilterConfig(String configFile) {
-        this.load(configFile);
-    }
-
-    public FilterConfig() {
-    }
-
-    public boolean load(String configFile) {
-        Config filterConfig = ConfigFactory.parseResources(configFile);
-        return load(filterConfig);
-    }
-
-    public boolean load(Config config) {
-        Config recallConfig = config.getConfig("recall-config");
-
-        Config filterConf = recallConfig.getConfig("filter-config");
-        try {
-            loadFilters(filterConf);
-            int pos = 0;
-            for (FilterConfigInfo filterConfigInfo : filterConfigInfoList) {
-                LOGGER.info("filter at position [{}], priority [{}] filter name [{}] ",
-                        new Object[]{pos++, filterConfigInfo.getFilterPriority(), filterConfigInfo.getConfigName()});
-            }
-            LOGGER.debug("Load filter config success");
-        } catch (Exception e) {
-            LOGGER.error("Load filter config failed, [{}]", ExceptionUtils.getFullStackTrace(e));
-            return false;
-        }
-
-        return true;
-    }
-
-    public List<FilterConfigInfo> getFilterConfigInfoList() {
-        return filterConfigInfoList;
-    }
-
-    private void loadFilters(Config config) throws Exception {
-        ConfigObject confObj = config.root();
-        for (ConfigObject.Entry<String, ConfigValue> it : confObj.entrySet()) {
-            Config conf = ((ConfigObject) it.getValue()).toConfig();
-
-            // parse config
-            String configName = it.getKey();
-            String filterName = conf.getString("filter-name");
-            int filterPriority = 0;
-            if (conf.hasPath("filter-priority"))
-                filterPriority = conf.getInt("filter-priority");
-
-            List<Integer> disableExpIds = new ArrayList<Integer>();
-            if (conf.hasPath("disable-expids")) {
-                disableExpIds = conf.getIntList("disable-expids");
-            }
-            FilterConfigInfo filterConfigInfo = new FilterConfigInfo(configName,
-                    filterName, filterPriority, disableExpIds);
-            LOGGER.debug("parse filter config info [{}]", filterConfigInfo);
-
-            addConfigByPriority(filterConfigInfoList, filterConfigInfo);
-        }
-    }
-
-    private void addConfigByPriority(List<FilterConfigInfo> configInfoList, FilterConfigInfo addConfigInfo) {
-
-        int pos = 0;
-        for (; pos < configInfoList.size(); pos++) {
-            if (configInfoList.get(pos).getFilterPriority() <= addConfigInfo.getFilterPriority()) {
-                break;
-            }
-        }
-        configInfoList.add(pos, addConfigInfo);
-    }
-}

+ 0 - 59
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/FilterConfigInfo.java

@@ -1,59 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller;
-
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-
-public class FilterConfigInfo {
-    private Set<Integer> disableExpIds;
-    private String filterName;
-    private Integer filterPriority;
-    private String configName;
-
-    public FilterConfigInfo(String configName,
-                            String filterName,
-                            Integer filterPriority,
-                            List<Integer> expIds) {
-        this.configName = configName;
-        this.filterName = filterName;
-        this.filterPriority = filterPriority;
-        this.disableExpIds = new HashSet<Integer>();
-        this.disableExpIds.addAll(expIds);
-    }
-
-    public Integer getFilterPriority() {
-        return filterPriority;
-    }
-
-    public String getFilterName() {
-        return filterName;
-    }
-
-    public Set<Integer> getDisableExpIds() {
-        return disableExpIds;
-    }
-
-    public String getConfigName() {
-        return configName;
-    }
-
-    @Override
-    public String toString() {
-        return configName + ":" + filterPriority + ":" + filterName + ":" +
-                Joiner.on(",").join(FluentIterable.from(disableExpIds).transform(new Function<Integer, String>() {
-                    @Nullable
-                    @Override
-                    public String apply(Integer integer) {
-                        return integer.toString();
-                    }
-                }));
-    }
-}

+ 0 - 68
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/RecallFilterPipeline.java

@@ -1,68 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller;
-
-
-import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class RecallFilterPipeline<T> {
-
-    private static Logger LOGGER = LoggerFactory.getLogger(RecallFilterPipeline.class);
-    public int filterNum;
-    private FilterConfig config;
-    private List<AbstractFilter<T>> filters;
-    private RecommendRequest requestContext;
-    private User user;
-    private int requestIndex;
-
-    public RecallFilterPipeline(FilterConfig config,
-                                RecommendRequest requestContext,
-                                User user,
-                                int requestIndex) {
-
-        this.config = config;
-        this.requestContext = requestContext;
-        this.user = user;
-        this.requestIndex = requestIndex;
-        this.filters = new ArrayList<AbstractFilter<T>>();
-        this.constructFilters(config);
-    }
-
-    public List<AbstractFilter<T>> getFilters() {
-        return filters;
-    }
-
-    public void constructFilters(FilterConfig config) {
-        // add filter
-        if (config != null) {
-            for (FilterConfigInfo filterConfigInfo : config.getFilterConfigInfoList()) {
-                try {
-                    AbstractFilter<T> filter = (AbstractFilter) Class.forName(filterConfigInfo.getFilterName())
-                            .getConstructor(FilterConfigInfo.class, RecommendRequest.class, User.class, Integer.class)
-                            .newInstance(filterConfigInfo, this.requestContext, this.user, this.requestIndex);
-
-                    this.filters.add(filter);
-                } catch (Exception e) {
-                    LOGGER.error("Filter config info construct error, [{}] [{}]",
-                            filterConfigInfo, ExceptionUtils.getFullStackTrace(e));
-                }
-            }
-        }
-    }
-
-    public boolean predicate(Candidate candidate, T t) {
-        for (AbstractFilter filter : filters) {
-            if (!filter.predicate(candidate, t)) {
-                return false;
-            }
-        }
-        return true;
-    }
-}

+ 0 - 301
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/ItemProvider.java

@@ -1,301 +0,0 @@
-package com.tzld.piaoquan.recommend.server.framework.recaller.provider;
-
-
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import com.tzld.piaoquan.recommend.server.framework.utils.FixedThreadPoolHelper;
-
-import com.tzld.piaoquan.recommend.server.framework.utils.IndexUtils;
-import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.lang.math.RandomUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Generalized item access to Redis.
- */
-public class ItemProvider<InMemoryItem> {
-    private static final Logger logger = LoggerFactory.getLogger(ItemProvider.class);
-
-    private static final int REDIS_DEFAULT_TTL = 7 * 24 * 60 * 60;
-    private static final long CACHE_TIMEOUT_MS = 15 * 60 * 1000L;
-    private static final long CACHE_MAXIMUMSIZE = 80 * 10000; // default 80w
-
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-    private final ExecutorService itemRefreshExecutorService = new FixedThreadPoolHelper(4, "itemRefresh").getThreadPoolExecutor();
-    // 不能为 static
-    private final BlockingQueue<String> asyncRefreshQueue = new LinkedBlockingQueue<String>(100000);
-    private final RedisSmartClient client;
-    private final String prefix;
-    private final LoadingCache<String, CacheEntry<InMemoryItem>> cache;
-    private final long cacheMaximumSize;
-    private int redisTtl = REDIS_DEFAULT_TTL;
-    private long cacheTimeout = CACHE_TIMEOUT_MS;
-    private long refreshDynamicInterval = CACHE_TIMEOUT_MS;
-
-
-    /**
-     * 构造带有本地 cache 的 item provider, 使用redis作为底层存储,
-     * 本地cache基于 Guava Cache 构建, 配置失效时间与异步刷新时间;
-     *
-     * @param client         redis-cluster client
-     * @param prefix         业务标志
-     * @param cacheTimeOutMs 缓存失效时间
-     */
-    public ItemProvider(RedisSmartClient client,
-                        String prefix,
-                        long cacheTimeOutMs,
-                        long refreshDynamicInterval) {
-        this(client,
-                prefix,
-                cacheTimeOutMs,
-                refreshDynamicInterval,
-                CACHE_MAXIMUMSIZE,
-                REDIS_DEFAULT_TTL,
-                TimeUnit.SECONDS);
-    }
-
-    /**
-     * 构造带有本地 cache 的 item provider, 使用redis作为底层存储,
-     * 本地cache基于 Guava Cache 构建, 配置失效时间与异步刷新时间;
-     *
-     * @param client         redis-cluster client
-     * @param prefix         业务标志
-     * @param cacheTimeOutMs 缓存失效时间
-     * @param redisKeyTtl
-     * @param redisTtlUnit
-     */
-    public ItemProvider(RedisSmartClient client,
-                        String prefix,
-                        long cacheTimeOutMs,
-                        long refreshDynamicInterval,
-                        final long cacheMaximumSize,
-                        long redisKeyTtl,
-                        TimeUnit redisTtlUnit) {
-
-        this.cacheTimeout = cacheTimeOutMs;
-        this.refreshDynamicInterval = refreshDynamicInterval;
-        this.redisTtl = (int) redisTtlUnit.toSeconds(redisKeyTtl);
-        this.cacheMaximumSize = cacheMaximumSize;
-
-        this.client = client;
-        this.prefix = prefix;
-        this.cache = CacheBuilder.newBuilder()
-                .expireAfterWrite((long) (this.cacheTimeout * 1.4), TimeUnit.MILLISECONDS)
-                .maximumSize(cacheMaximumSize)
-                .build(new CacheLoader<String, CacheEntry<InMemoryItem>>() {
-                    @Override
-                    public CacheEntry<InMemoryItem> load(String key) throws Exception {
-                        if (key == null) {
-                            return null;
-                        } else {
-                            return loadItem(key);
-                        }
-                    }
-                });
-
-        // cache refresh
-        for (int i = 0; i < 4; i++) {
-            itemRefreshExecutorService.execute(new AsyncRefreshCacheRunnable());
-        }
-    }
-
-    public ItemProvider(RedisSmartClient client, String prefix) {
-        this(client, prefix, CACHE_TIMEOUT_MS, CACHE_TIMEOUT_MS);
-    }
-
-    public RedisSmartClient getClient() {
-        return client;
-    }
-
-
-    public InMemoryItem deserialize(byte[] bytes){
-        return null;
-    }
-
-
-    /**
-     * 初始时定义 expire time = System.currentTimeMillis()+cacheTimeout
-     * @param id
-     * @return
-     */
-    private CacheEntry<InMemoryItem> loadItem(String id) throws Exception {
-        long start = System.nanoTime();
-        byte[] bytes = client.get(IndexUtils.convertKey(prefix + id));
-        CacheEntry<InMemoryItem> result;
-        if (bytes == null) {
-            result = null;
-        } else {
-            result = makeCacheEntry(id, Optional.of(deserialize(bytes)));
-        }
-
-        return result;
-    }
-
-    public long dbSize() {
-        return this.cache.size();
-    }
-
-    private CacheEntry<InMemoryItem> makeCacheEntry(String itemId, Optional<InMemoryItem> t) {
-        long expireTime = System.currentTimeMillis() + cacheTimeout + RandomUtils.nextInt(10 * 60 * 1000);
-        long dynamicExpireTime = System.currentTimeMillis() + refreshDynamicInterval;
-        return new CacheEntry<InMemoryItem>(itemId, expireTime, dynamicExpireTime, t);
-    }
-
-
-    public CacheEntry<InMemoryItem> getCacheEntry(String id) throws ExecutionException {
-        return cache.get(id);
-
-    }
-
-    public Optional<InMemoryItem> getDirect(String id) throws Exception {
-        CacheEntry<InMemoryItem> cacheEntry = loadItem(id);
-        return cacheEntry.getItem();
-    }
-
-    public Optional<InMemoryItem> get(String id) throws ExecutionException {
-        CacheEntry<InMemoryItem> result = cache.get(id);
-
-        if (result == null || !result.getItem().isPresent()) {
-            cache.invalidate(id);
-            return null;
-        }
-
-        // minimum expire time, update key expire time
-        if (result.getExpireTime() <= 0) {
-            result = makeCacheEntry(id, result.getItem());
-            cache.put(id, result);
-        }
-
-        // check expire and refresh
-        long currentTime = System.currentTimeMillis();
-        if ((result.getDynamicFeatureExpireTime() > 0 && currentTime > result.getDynamicFeatureExpireTime()) ||
-                (result.getExpireTime() > 0 && currentTime > result.getExpireTime())) {
-            boolean ret = asyncRefreshQueue.offer(id);
-            if (logger.isDebugEnabled()) {
-                logger.debug("offer async queue itemid [{}], result [{}]", id, ret);
-            }
-
-            if (asyncRefreshQueue.size() > CACHE_MAXIMUMSIZE) {
-                logger.warn("item backgroud blocking queue length [{}], it is danger", asyncRefreshQueue.size());
-            }
-
-            if (!ret) {
-                logger.error("item backgroud offer blocking queue error, itemid [{}], with queue length [{}]", id, asyncRefreshQueue.size());
-            }
-        }
-
-        return result.getItem();
-    }
-
-    public String setLocalCache(String id, InMemoryItem t) throws ExecutionException {
-        long start = System.nanoTime();
-
-        // put to local cache
-        CacheEntry<InMemoryItem> result = makeCacheEntry(id, Optional.of(t));
-        cache.put(id, result);
-
-        return "";
-    }
-
-    public String set(String id, InMemoryItem t) throws Exception {
-        long start = System.nanoTime();
-
-        // set redis
-        String strKey = prefix + id;
-        String ret = client.setex(IndexUtils.convertKey(strKey), this.redisTtl, t.toString().getBytes());
-
-        // put to cache
-        CacheEntry<InMemoryItem> result = makeCacheEntry(id, Optional.of(t));
-        cache.put(id, result);
-
-        return ret;
-    }
-
-    public long remove(String id) throws Exception {
-        long start = System.nanoTime();
-
-        // del redis
-        long affectRow = client.del(IndexUtils.convertKey(prefix + id));
-
-        // del cache
-        cache.invalidate(id);
-        return affectRow;
-    }
-
-
-    /**
-     * 通过 blocking queue 异步刷新即将到期的item
-     */
-    public class AsyncRefreshCacheRunnable implements Runnable {
-
-        @Override
-        public void run() {
-            while (true) {
-                String itemId = "";
-                try {
-                    itemId = asyncRefreshQueue.take(); // take async item, blocked when no item
-
-                    final long currentTimeMillis = System.currentTimeMillis();
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("async refresh thread get itemid [{}]", itemId); // get itemid
-                    }
-
-                    CacheEntry<InMemoryItem> entry = cache.get(itemId);
-                    // 全量刷新
-                    if (currentTimeMillis > entry.getExpireTime()) {
-                        refreshStaticFeatures(entry);
-                    }
-                    // 刷新动态特征
-                    if (currentTimeMillis > entry.getDynamicFeatureExpireTime()) {
-                        refreshDynamicFeatures(entry);
-                    }
-
-                    long endTime = System.currentTimeMillis();
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("async refresh items spenttime [{}]", endTime - currentTimeMillis);
-                    }
-                } catch (Exception e) {
-                    logger.error("async refresh item error, itemId [{}], [{}]", itemId, ExceptionUtils.getFullStackTrace(e));
-                }
-            }
-        }
-
-        public void refreshStaticFeatures(CacheEntry<InMemoryItem> entry) {
-            try {
-                cache.refresh(entry.getItemId());
-                if (logger.isDebugEnabled()) {
-                    logger.debug("async refresh item [{}]", entry.getItemId());
-                }
-            } catch (Exception e) {
-                logger.error("async refresh item error, itemId [{}], exception [{}]", entry.getItemId(), ExceptionUtils.getFullStackTrace(e));
-            }
-        }
-
-        public void refreshDynamicFeatures(CacheEntry<InMemoryItem> entry) {
-            try {
-                // reload dynamic items and update expireTime
-                entry.setItem(Optional.of(entry.getItem().get()));
-                entry.setDynamicFeatureExpireTime(System.currentTimeMillis() + refreshDynamicInterval);
-                if (logger.isDebugEnabled()) {
-                    logger.debug("async refresh items dynamic features [{}]", entry.getItemId());
-                }
-            } catch (Exception e) {
-                logger.error("async refresh item dynamic features error, itemId [{}], exception [{}]",
-                        entry.getItemId(), ExceptionUtils.getFullStackTrace(e));
-            }
-        }
-    }
-}

+ 0 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/recaller/HistoryLongPeriodFilter.java

@@ -2,8 +2,6 @@ package com.tzld.piaoquan.recommend.server.implement.recaller;
 
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.recaller.AbstractFilter;
-import com.tzld.piaoquan.recommend.server.framework.recaller.FilterConfigInfo;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.InMemoryItem;
 import com.google.common.collect.Sets;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;