| 
					
				 | 
			
			
				@@ -1,41 +1,51 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 package com.tzld.piaoquan.recommend.server.service.filter; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.google.common.base.Stopwatch; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.thoughtworks.xstream.mapper.Mapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.service.PreViewedService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.service.ViewedService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.service.filter.strategy.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.util.JSONUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.collections4.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.lang3.StringUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.beans.factory.annotation.Autowired; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Value; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.ArrayList; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.Set; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.CountDownLatch; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.ExecutorService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.Future; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.stream.Collectors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * @author dyp 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 @Slf4j 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-public abstract class AbstractFilterService implements FilterService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public Integer forceTruncation; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+public abstract class AbstractFilterService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private PreViewedService preViewedService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private ViewedService viewedService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private ExecutorService pool = ThreadPoolFactory.filterPool(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${newFilterGlobalSwitch:false}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private boolean newFilterGlobalSwitch; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${newFilterAbExpCode:}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private String newFilterAbExpCode; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    protected List<Long> viewFilter(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> viewFilterOld(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // log.info("filterParam={}", JSONUtils.toJson(param)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 风险过滤 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         List<Long> videoIds = filterWithRiskVideo(param.getRiskFilterFlag(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 param.getAppType(), param.getRegionCode(), param.getAppRegionFiltered(), param.getVideosWithRisk(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                param.getVideoIds()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                param.getVideoIds(), param.getForceTruncation()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // log.info("filterByRiskVideos videoIds={}", JSONUtils.toJson(videoIds)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         videoIds = filterByPreViewed(param.getAppType(), param.getMid(), videoIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // log.info("filterByPreViewed videoIds={}", JSONUtils.toJson(videoIds)); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -73,22 +83,23 @@ public abstract class AbstractFilterService implements FilterService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                            String regionCode, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                            Map<Integer, List<String>> rules, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                            List<Long> videosWithRisk, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                           List<Long> videoIds){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (!riskFlag){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return this.truncation(videoIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                           List<Long> videoIds, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                           int forceTruncation) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (!riskFlag) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return this.truncation(videoIds, forceTruncation); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 1 判断是否过滤,不展示的app+区域列表。 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         boolean filterFlag; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (rules.containsKey(appType)){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (rules.containsKey(appType)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             filterFlag = false; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (rules.get(appType).contains(regionCode)){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (rules.get(appType).contains(regionCode)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 filterFlag = true; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             filterFlag = true; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (!filterFlag){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return this.truncation(videoIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (!filterFlag) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return this.truncation(videoIds, forceTruncation); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 2 开始过滤。 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         List<Long> videoIdNew = new ArrayList<>(); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -97,13 +108,101 @@ public abstract class AbstractFilterService implements FilterService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 videoIdNew.add(videoId); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return this.truncation(videoIdNew); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return this.truncation(videoIdNew, forceTruncation); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private List<Long> truncation(List<Long> videoIds){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (this.forceTruncation == null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> truncation(List<Long> videoIds, int forceTruncation) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (forceTruncation == 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             return videoIds; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }else{ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return videoIds.subList(0, Math.min(this.forceTruncation, videoIds.size())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return videoIds.subList(0, Math.min(forceTruncation, videoIds.size())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    protected List<Long> viewFilter(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        boolean hit = newFilterGlobalSwitch 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                || CommonCollectionUtils.contains(param.getAbExpCodes(), newFilterAbExpCode); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (hit) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return viewFilterNew(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return viewFilterOld(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<Long> viewFilterNew(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // hardcode : 风险过滤会做截断,所以先同步调用 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<Long> riskVideoIds = filterWithRiskVideo(param.getRiskFilterFlag(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                param.getAppType(), param.getRegionCode(), param.getAppRegionFiltered(), param.getVideosWithRisk(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                param.getVideoIds(), param.getForceTruncation()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        param.setVideoIds(riskVideoIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<FilterStrategy> strategies = getStrategies(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log.info("filter strategies {}", JSONUtils.toJson(CommonCollectionUtils.toList(strategies, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                o -> o.getClass().getSimpleName()))); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        CountDownLatch cdl = new CountDownLatch(strategies.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<Future<List<Long>>> futures = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (final FilterStrategy strategy : strategies) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Future<List<Long>> future = pool.submit(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Stopwatch stopwatch = Stopwatch.createStarted(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                List<Long> result = strategy.filter(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                cdl.countDown(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                log.info("{} param {} result {} cost {} ms", strategy.getClass().getSimpleName(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        JSONUtils.toJson(param.getVideoIds()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        JSONUtils.toJson(result), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        stopwatch.elapsed().toMillis()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            futures.add(future); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            cdl.await(1000, TimeUnit.MILLISECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } catch (InterruptedException e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log.error("filter error", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<List<Long>> videoIds = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (Future<List<Long>> f : futures) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                videoIds.add(f.get()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                log.error("future get error ", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (CollectionUtils.isEmpty(videoIds)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return Collections.emptyList(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<Long> result = videoIds.get(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (CollectionUtils.isEmpty(result)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return Collections.emptyList(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (int i = 1; i < videoIds.size(); ++i) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            result.retainAll(videoIds.get(i)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log.info("filter result {}", JSONUtils.toJson(result)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private List<FilterStrategy> getStrategies(FilterParam param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<FilterStrategy> strategies = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        strategies.add(ServiceBeanFactory.getBean(PreViewedStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        strategies.add(ServiceBeanFactory.getBean(ViewedStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        strategies.add(ServiceBeanFactory.getBean(RecommendStatusStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        switch (param.getAppType()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            case 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            case 4: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                strategies.add(ServiceBeanFactory.getBean(AllowListStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            case 13: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            default: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                strategies.add(ServiceBeanFactory.getBean(AllowListStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                strategies.add(ServiceBeanFactory.getBean(TagStrategy.class)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return strategies; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |