Преглед изворни кода

Merge branch 'feature_filter' of algorithm/recommend-server into master

dingyunpeng пре 1 година
родитељ
комит
69c2595e87

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/ViewedService.java

@@ -65,6 +65,7 @@ public class ViewedService {
             param.put("videoIds", videoIds);
             param.put("cityCode", cityCode);
             param.put("hotSenceType", hotSceneType);
+            param.put("abExpCodes", abExpCodes);
             List<Integer> recommendStatus = new ArrayList<>();
             recommendStatus.add(-6);
             param.put("recommendStatus", recommendStatus);

+ 99 - 13
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/strategy/ViewedStrategy.java

@@ -1,13 +1,17 @@
 package com.tzld.piaoquan.recommend.server.service.filter.strategy;
 
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterStrategy;
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.query.Criteria;
@@ -19,6 +23,7 @@ import org.springframework.data.redis.core.SetOperations;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -37,9 +42,26 @@ public class ViewedStrategy implements FilterStrategy {
     private MongoTemplate mongoTemplate;
 
     private String keyFormat = "user:exclude:videoidset:%s";
+    private String newKeyFormat = "user:exclude:videoidset:new:%s";
+
+    @ApolloJsonValue("${view.exp.config:{}}")
+    private Map<String, Integer> viewExpConfig;
+    @Value("${video.filter.cache.expire.new:600}")
+    private Long videoFilterCacheNewExpire;
 
     @Override
     public List<Long> filter(FilterParam param) {
+        boolean hit = MapUtils.isNotEmpty(viewExpConfig)
+                && CollectionUtils.isNotEmpty(param.getAbExpCodes())
+                && CollectionUtils.containsAny(viewExpConfig.keySet(), param.getAbExpCodes());
+        if (hit) {
+            return filterNew(param);
+        } else {
+            return filterOld(param);
+        }
+    }
+
+    private List<Long> filterOld(FilterParam param) {
         String user = StringUtils.isNotBlank(param.getUid()) ? param.getUid() : param.getMid();
         if (StringUtils.isBlank(user)) {
             return param.getVideoIds();
@@ -50,8 +72,10 @@ public class ViewedStrategy implements FilterStrategy {
             // 从mongo取曝光数据
             Criteria criteria = new Criteria();
             criteria.and("uid").is(user);
+            // criteria.and("create_time").gte();
             Query query = new Query();
             query.addCriteria(criteria);
+            query.limit(10000);
             List<VideoView> list = mongoTemplate.find(query, VideoView.class);
             //TODO 为什么限制最多10000条?是不是限制近几天更合适?
             if (CollectionUtils.isNotEmpty(list)) {
@@ -59,21 +83,83 @@ public class ViewedStrategy implements FilterStrategy {
                 for (int i = list.size() - 1; i >= 0 && limit-- > 0; i--) {
                     viewedVideoIds.add(String.valueOf(list.get(i).getVideoId()));
                 }
-
-                // 异步写Redis
-                ThreadPoolFactory.defaultPool().execute(() -> {
-                    redisTemplate.executePipelined(new SessionCallback<String>() {
-                        @Override
-                        public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
-                            SetOperations<String, String> operations =
-                                    (SetOperations<String, String>) redisOperations.opsForSet();
-                            operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()]));
-                            redisTemplate.expire(key, 360 * 3600, TimeUnit.SECONDS);
-                            return null;
-                        }
-                    });
+            } else {
+                // 避免穿透
+                viewedVideoIds.add("-1");
+            }
+            // 异步写Redis
+            ThreadPoolFactory.defaultPool().execute(() -> {
+                redisTemplate.executePipelined(new SessionCallback<String>() {
+                    @Override
+                    public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
+                        SetOperations<String, String> operations =
+                                (SetOperations<String, String>) redisOperations.opsForSet();
+                        operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()]));
+                        redisTemplate.expire(key, 360 * 3600, TimeUnit.SECONDS);
+                        return null;
+                    }
                 });
+            });
+        }
+
+        if (CollectionUtils.isEmpty(viewedVideoIds)) {
+            return param.getVideoIds();
+        }
+        return param.getVideoIds().stream()
+                .filter(vid -> !viewedVideoIds.contains(String.valueOf(vid)))
+                .collect(Collectors.toList());
+    }
+
+    private List<Long> filterNew(FilterParam param) {
+        String user = StringUtils.isNotBlank(param.getUid()) ? param.getUid() : param.getMid();
+        if (StringUtils.isBlank(user)) {
+            return param.getVideoIds();
+        }
+        String key = String.format(newKeyFormat, user);
+        Set<String> viewedVideoIds = redisTemplate.opsForSet().members(key);
+        if (CollectionUtils.isEmpty(viewedVideoIds)) {
+            // 从mongo取曝光数据
+            Criteria criteria = new Criteria();
+            criteria.and("uid").is(user);
+            long videoViewedFilterTime = 30 * 24 * 3600;
+            if (MapUtils.isNotEmpty(viewExpConfig)) {
+                for (Map.Entry<String, Integer> entry : viewExpConfig.entrySet()) {
+                    if (CommonCollectionUtils.contains(param.getAbExpCodes(), entry.getKey())) {
+                        videoViewedFilterTime = entry.getValue() != null
+                                ? entry.getValue()
+                                : videoViewedFilterTime;
+                        break;
+                    }
+                }
+            }
+            criteria.and("create_time").gte(System.currentTimeMillis() - videoViewedFilterTime * 1000);
+            Query query = new Query();
+            query.addCriteria(criteria);
+            query.limit(10000);
+            List<VideoView> list = mongoTemplate.find(query, VideoView.class);
+            //TODO 为什么限制最多10000条?是不是限制近几天更合适?
+            if (CollectionUtils.isNotEmpty(list)) {
+                int limit = 10000;
+                for (int i = list.size() - 1; i >= 0 && limit-- > 0; i--) {
+                    viewedVideoIds.add(String.valueOf(list.get(i).getVideoId()));
+                }
+            } else {
+                // 避免穿透
+                viewedVideoIds.add("-1");
             }
+            // 异步写Redis
+            ThreadPoolFactory.defaultPool().execute(() -> {
+                redisTemplate.executePipelined(new SessionCallback<String>() {
+                    @Override
+                    public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
+                        SetOperations<String, String> operations =
+                                (SetOperations<String, String>) redisOperations.opsForSet();
+                        operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()]));
+                        redisTemplate.expire(key, videoFilterCacheNewExpire, TimeUnit.SECONDS);
+                        return null;
+                    }
+                });
+            });
         }
 
         if (CollectionUtils.isEmpty(viewedVideoIds)) {