| 
					
				 | 
			
			
				@@ -0,0 +1,165 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+package com.tzld.piaoquan.recommend.server.service.filter.strategy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.alibaba.fastjson.JSON; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.google.common.cache.CacheBuilder; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.google.common.cache.CacheLoader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.google.common.cache.LoadingCache; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatus; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatusRepository; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRel; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRelRepository; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.service.filter.FilterStrategy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.collections4.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.lang3.RandomUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.lang3.StringUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Autowired; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Qualifier; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.dao.DataAccessException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.RedisOperations; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.RedisTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.SessionCallback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.ValueOperations; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import javax.annotation.PostConstruct; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.stream.Collectors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@Component 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@Slf4j 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+public class GeneralSpiderStrategy implements FilterStrategy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Qualifier("redisTemplate") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private RedisTemplate<String, String> redisTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private WxVideoStatusRepository wxVideoStatusRepository; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * 存储视频的uid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * key: video:uid:{videoId} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * value: uid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private final String keyFormat = "video:uid:%s"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * 非ugc用户 大数据同步 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public static final String GENERAL_SPIDER_USER_KEY = "general:spider:user"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private LoadingCache<String, Set<String>> generalSpiderUserCache = CacheBuilder.newBuilder() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .maximumSize(10000) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .refreshAfterWrite(600, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .expireAfterWrite(600, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .expireAfterAccess(600, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .build(new CacheLoader<String, Set<String>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                public Set<String> load(String key) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Set<String> result = redisTemplate.opsForSet().members(key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (CollectionUtils.isEmpty(result)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        return Collections.emptySet(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private WxVideoTagRelRepository repository; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @ApolloJsonValue("${general.spider.exclude.tagids:[]}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> generalSpiderExcludeTagIds; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private String generalSpiderExcludeVideoCacheKey = "general.spider.exclude.video"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    // 内存持久保存不淘汰 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private LoadingCache<String, Set<Long>> generalSpiderExcludeVideoCache = CacheBuilder.newBuilder() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .maximumSize(10) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .refreshAfterWrite(60, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .expireAfterWrite(60, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .expireAfterAccess(60, TimeUnit.SECONDS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .build(new CacheLoader<String, Set<Long>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                public Set<Long> load(String key) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    List<WxVideoTagRel> rels = repository.findAllByTagIdIn(generalSpiderExcludeTagIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return CommonCollectionUtils.toSet(rels, WxVideoTagRel::getVideoId); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @PostConstruct 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public void init() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Set<Long> data = generalSpiderExcludeVideoCache.getUnchecked(generalSpiderExcludeVideoCacheKey); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log.info("generalSpiderExcludeVideoCache size {}", data.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public List<Long> filter(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (param == null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return Collections.emptyList(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (StringUtils.isBlank(param.getMid()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                || CollectionUtils.isEmpty(param.getVideoIds())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return param.getVideoIds(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // vid -> uid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<String> keys = param.getVideoIds().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .map(id -> String.format(keyFormat, id)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<String> uids = redisTemplate.opsForValue().multiGet(keys); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<Long> cacheMissVideoIds = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Map<Long, String> vid2UidMap = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (int i = 0; i < param.getVideoIds().size(); i++) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String value = uids.get(i); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (StringUtils.isBlank(value)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                cacheMissVideoIds.add(param.getVideoIds().get(i)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                vid2UidMap.put(param.getVideoIds().get(i), value); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (CollectionUtils.isNotEmpty(cacheMissVideoIds)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            List<WxVideoStatus> status = wxVideoStatusRepository.findAllByVideoIdIn(cacheMissVideoIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (CollectionUtils.isNotEmpty(status)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for (WxVideoStatus v : status) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    vid2UidMap.put(v.getVideoId(), String.valueOf(v.getVideoUid())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // TODO 异步更新缓存 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    ThreadPoolFactory.defaultPool().execute(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        redisTemplate.executePipelined(new SessionCallback<String>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                ValueOperations<String, String> operations = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        (ValueOperations<String, String>) redisOperations.opsForValue(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                status.forEach(v -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    operations.set(String.format(keyFormat, v.getVideoId()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            String.valueOf(v.getVideoUid()), RandomUtils.nextInt(30, 60), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            TimeUnit.SECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        //需要过滤的视频uid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Set<String> generalSpiderUserIds = generalSpiderUserCache.getUnchecked(GENERAL_SPIDER_USER_KEY); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        //不过滤的视频 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Set<Long> data = generalSpiderExcludeVideoCache.getUnchecked(generalSpiderExcludeVideoCacheKey); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<Long> videoIds = param.getVideoIds().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .filter(l -> data.contains(l) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        || (vid2UidMap.containsKey(l) && !generalSpiderUserIds.contains(vid2UidMap.get(l)))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return videoIds; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 |