瀏覽代碼

MOD:优化过滤器

sunxy 1 年之前
父節點
當前提交
8e9c604b9b

+ 8 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -32,8 +32,13 @@ public class BaseRecaller<Video> {
     private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
 
     private static final String FILTER_CONF = "filter_config.conf"; // ms
-    private static final ExecutorService filterExecutorService = Executors.newFixedThreadPool(128);
-    private static final ExecutorService fetchQueueExecutorService = Executors.newFixedThreadPool(128);
+    private static final ExecutorService filterExecutorService = new ThreadPoolExecutor(128, 128,
+            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000), r -> new Thread(r, "filterExecutorService"),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    private static final ExecutorService fetchQueueExecutorService = new ThreadPoolExecutor(128, 128,
+            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000), r -> new Thread(r, "fetchQueueExecutorService"),
+            new ThreadPoolExecutor.CallerRunsPolicy());
     private final QueueProvider<Video> queueProvider;
 
     private static final FilterConfig filterConfig;
@@ -146,11 +151,10 @@ public class BaseRecaller<Video> {
      *
      * @param requestData
      * @param user
-     * @param requestIndex
      * @param recallCandidates
      * @return
      */
-    public List<RankItem> recalling(final RecommendRequest requestData, final User user, int requestIndex, List<Candidate> recallCandidates) {
+    public List<RankItem> recalling(final RecommendRequest requestData, final User user, List<Candidate> recallCandidates) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         stopwatch.reset().start();
         // load from redis

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java

@@ -136,7 +136,7 @@ public class TopRecommendPipeline {
         // Step 4: Recalling & Basic Scoring
         stopwatch.reset().start();
         BaseRecaller recaller = new BaseRecaller(queueProvider);
-        List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<>(candidates.values()));
+        List<RankItem> items = recaller.recalling(requestData, userInfo, new ArrayList<>(candidates.values()));
         if (logPrint) {
             log.info("traceId = {}, cost = {}, items = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(items));

+ 26 - 32
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/recall/ViewedHistoryFilter.java

@@ -1,8 +1,6 @@
 package com.tzld.piaoquan.recommend.server.implement.recall;
 
 
-import com.alibaba.fastjson.JSONObject;
-import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.framework.recaller.AbstractFilter;
@@ -14,17 +12,15 @@ import com.tzld.piaoquan.recommend.server.service.SpringContextHolder;
 import com.tzld.piaoquan.recommend.server.service.filter.strategy.VideoView;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.springframework.dao.DataAccessException;
+import org.apache.commons.lang3.time.StopWatch;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.redis.core.RedisOperations;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.SessionCallback;
-import org.springframework.data.redis.core.SetOperations;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -59,8 +55,6 @@ public class ViewedHistoryFilter extends AbstractFilter<Video> {
         }
         LOGGER.info("ViewedHistoryFilter doFilter start traceId:{}, user:{}, apptype:{}", requestContext.getRequestId(),
                 user, requestContext.getAppType());
-        LOGGER.info("ViewedHistoryFilter doFilter traceId:{}, historySet:{}", requestContext.getRequestId(),
-                JSONObject.toJSONString(historySet));
         videoList.removeIf(video -> this.historySet.contains(String.valueOf(video.getVideoId())));
         LOGGER.info("ViewedHistoryFilter doFilter end traceId:{}, list:{}", requestContext.getRequestId(),
                 videoList.stream().map(Video::getVideoId).map(Object::toString).collect(Collectors.joining(",")));
@@ -69,6 +63,8 @@ public class ViewedHistoryFilter extends AbstractFilter<Video> {
 
     public Set<String> loadUserHistory(User user) {
         String userid = StringUtils.isNotBlank(user.getUid()) ? user.getUid() : user.getMid();
+        LOGGER.info("ViewedHistoryFilter loadUserHistory start traceId:{}, userid:{}, apptype:{}", requestContext.getRequestId(),
+                userid, requestContext.getAppType());
         if (StringUtils.isBlank(userid)) {
             return new HashSet<>();
         }
@@ -76,34 +72,32 @@ public class ViewedHistoryFilter extends AbstractFilter<Video> {
         String key = String.format(keyFormat, userid);
         Set<String> viewedVideoIds = redisTemplate.opsForSet().members(key);
         if (CollectionUtils.isEmpty(viewedVideoIds)) {
-            // 从mongo取曝光数据
-            Criteria criteria = new Criteria();
-            criteria.and("uid").is(user);
+            viewedVideoIds = new HashSet<>();
+            StopWatch stopWatch = new StopWatch();
+            stopWatch.start();
+            // 从mongodb中取曝光记录
+            Criteria criteriatime = new Criteria();
+            criteriatime.and("uid").is(userid);
             Query query = new Query();
-            query.addCriteria(criteria);
+            query.addCriteria(criteriatime);
+            //查新库
             List<VideoView> list = mongoTemplate.find(query, VideoView.class);
-
-            //TODO 为什么限制最多10000条?是不是限制近几天更合适?
+            //限制最多10000条
+            if (Objects.nonNull(list) && list.size() > 10000) {
+                list = list.subList(list.size() - 10000, list.size());
+            }
             if (CollectionUtils.isNotEmpty(list)) {
-                int limit = 10000;
-                for (int i = list.size() - 1; i >= 0 && limit-- > 0; i--) {
-                    viewedVideoIds.add(String.valueOf(list.get(i).getVideoId()));
-                }
-
-                // 异步写Redis
-                ThreadPoolFactory.defaultPool().execute(() -> {
-                    redisTemplate.executePipelined(new SessionCallback<String>() {
-                        @Override
-                        public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
-                            SetOperations<String, String> operations =
-                                    (SetOperations<String, String>) redisOperations.opsForSet();
-                            operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()]));
-                            redisTemplate.expire(key, 360 * 3600, TimeUnit.SECONDS);
-                            return null;
-                        }
-                    });
-                });
+                viewedVideoIds.addAll(list.stream().map(VideoView::getVideoId).map(String::valueOf).collect(Collectors.toSet()));
+            } else {
+                // 避免缓存击穿
+                viewedVideoIds.add("-1");
             }
+            stopWatch.stop();
+            LOGGER.info("ViewedHistoryFilter loadUserHistory end traceId:{}, userid:{}, apptype:{}, viewedVideoIds:{}, time:{}",
+                    requestContext.getRequestId(), userid, requestContext.getAppType(), viewedVideoIds, stopWatch.getTime());
+
+            redisTemplate.opsForSet().add(key, viewedVideoIds.toArray(new String[0]));
+            redisTemplate.expire(key, 1, TimeUnit.DAYS);
         }
 
         return viewedVideoIds;