| 
					
				 | 
			
			
				@@ -1,13 +1,17 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 package com.tzld.piaoquan.recommend.server.service.filter.strategy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.service.filter.FilterParam; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.service.filter.FilterStrategy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.collections4.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.collections4.MapUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.lang3.StringUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.beans.factory.annotation.Autowired; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.beans.factory.annotation.Qualifier; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Value; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.dao.DataAccessException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.data.mongodb.core.MongoTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.data.mongodb.core.query.Criteria; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -19,6 +23,7 @@ import org.springframework.data.redis.core.SetOperations; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.Set; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.stream.Collectors; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -37,9 +42,26 @@ public class ViewedStrategy implements FilterStrategy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private MongoTemplate mongoTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private String keyFormat = "user:exclude:videoidset:%s"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private String newKeyFormat = "user:exclude:videoidset:new:%s"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @ApolloJsonValue("${view.exp.config:{}}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private Map<String, Integer> viewExpConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${video.filter.cache.expire.new:600}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private Long videoFilterCacheNewExpire; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public List<Long> filter(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        boolean hit = MapUtils.isNotEmpty(viewExpConfig) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                && CollectionUtils.isNotEmpty(param.getAbExpCodes()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                && CollectionUtils.containsAny(viewExpConfig.keySet(), param.getAbExpCodes()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (hit) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return filterNew(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return filterOld(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> filterOld(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         String user = StringUtils.isNotBlank(param.getUid()) ? param.getUid() : param.getMid(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (StringUtils.isBlank(user)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             return param.getVideoIds(); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -50,8 +72,10 @@ public class ViewedStrategy implements FilterStrategy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             // 从mongo取曝光数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             Criteria criteria = new Criteria(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             criteria.and("uid").is(user); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // criteria.and("create_time").gte(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             Query query = new Query(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             query.addCriteria(criteria); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            query.limit(10000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             List<VideoView> list = mongoTemplate.find(query, VideoView.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             //TODO 为什么限制最多10000条?是不是限制近几天更合适? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if (CollectionUtils.isNotEmpty(list)) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -59,21 +83,83 @@ public class ViewedStrategy implements FilterStrategy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 for (int i = list.size() - 1; i >= 0 && limit-- > 0; i--) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     viewedVideoIds.add(String.valueOf(list.get(i).getVideoId())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                // 异步写Redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                ThreadPoolFactory.defaultPool().execute(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    redisTemplate.executePipelined(new SessionCallback<String>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            SetOperations<String, String> operations = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                    (SetOperations<String, String>) redisOperations.opsForSet(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()])); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            redisTemplate.expire(key, 360 * 3600, TimeUnit.SECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                // 避免穿透 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                viewedVideoIds.add("-1"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // 异步写Redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ThreadPoolFactory.defaultPool().execute(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                redisTemplate.executePipelined(new SessionCallback<String>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        SetOperations<String, String> operations = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                (SetOperations<String, String>) redisOperations.opsForSet(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()])); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        redisTemplate.expire(key, 360 * 3600, TimeUnit.SECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (CollectionUtils.isEmpty(viewedVideoIds)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return param.getVideoIds(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return param.getVideoIds().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .filter(vid -> !viewedVideoIds.contains(String.valueOf(vid))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> filterNew(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String user = StringUtils.isNotBlank(param.getUid()) ? param.getUid() : param.getMid(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (StringUtils.isBlank(user)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return param.getVideoIds(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String key = String.format(newKeyFormat, user); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Set<String> viewedVideoIds = redisTemplate.opsForSet().members(key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (CollectionUtils.isEmpty(viewedVideoIds)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // 从mongo取曝光数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Criteria criteria = new Criteria(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            criteria.and("uid").is(user); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            long videoViewedFilterTime = 30 * 24 * 3600; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (MapUtils.isNotEmpty(viewExpConfig)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for (Map.Entry<String, Integer> entry : viewExpConfig.entrySet()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (CommonCollectionUtils.contains(param.getAbExpCodes(), entry.getKey())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        videoViewedFilterTime = entry.getValue() != null 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                ? entry.getValue() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                : videoViewedFilterTime; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            criteria.and("create_time").gte(System.currentTimeMillis() - videoViewedFilterTime * 1000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Query query = new Query(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            query.addCriteria(criteria); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            query.limit(10000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            List<VideoView> list = mongoTemplate.find(query, VideoView.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            //TODO 为什么限制最多10000条?是不是限制近几天更合适? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (CollectionUtils.isNotEmpty(list)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                int limit = 10000; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for (int i = list.size() - 1; i >= 0 && limit-- > 0; i--) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    viewedVideoIds.add(String.valueOf(list.get(i).getVideoId())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                // 避免穿透 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                viewedVideoIds.add("-1"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // 异步写Redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ThreadPoolFactory.defaultPool().execute(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                redisTemplate.executePipelined(new SessionCallback<String>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        SetOperations<String, String> operations = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                (SetOperations<String, String>) redisOperations.opsForSet(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        operations.add(key, viewedVideoIds.toArray(new String[viewedVideoIds.size()])); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        redisTemplate.expire(key, videoFilterCacheNewExpire, TimeUnit.SECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (CollectionUtils.isEmpty(viewedVideoIds)) { 
			 |