| 
					
				 | 
			
			
				@@ -1,7 +1,6 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 package com.tzld.piaoquan.recommend.server.framework.recaller; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.google.common.base.Predicate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.google.common.collect.FluentIterable; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.google.common.collect.Lists; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.google.common.collect.Maps; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -10,9 +9,9 @@ import com.tzld.piaoquan.recommend.server.framework.candidiate.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.framework.common.User; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.collections4.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.lang.exception.ExceptionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.checkerframework.checker.nullness.qual.Nullable; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.slf4j.Logger; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.slf4j.LoggerFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -21,6 +20,7 @@ import java.util.HashMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.concurrent.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.stream.Collectors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 public class BaseRecaller<Video> { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -88,6 +88,8 @@ public class BaseRecaller<Video> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             candidateInfo.setCandidateQueueName(candidate.getCandidateKey()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             candidateInfo.setCandidate(candidate); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            item.setCandidateInfo(candidateInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            item.addToCandidateInfoList(candidateInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             result.add(item); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return result; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -200,17 +202,24 @@ public class BaseRecaller<Video> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             callables.add(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 List<RankItem> candidateHits = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 final Candidate candidate = entry.getKey(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                List<Entry<Video>> entriesValue = CommonCollectionUtils.iterableToList(entry.getValue()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    final Map<Video, Entry<Video>> entryMap = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    List<Video> entriesList = entriesValue.stream().map(e -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        entryMap.put(e.item, e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        return e.item; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    }).collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    List<Video> entriesAfterFilter = recallFilter.doFilter(candidate, entriesList); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     // 1. filter  TODO 待后续增加自定义filter 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).filter(new Predicate<Entry<Video>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        public boolean apply(@Nullable Entry<Video> videoEntry) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            return recallFilter.predicate(candidate, videoEntry.item); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    }).limit(candidate.getCandidateNum()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    // 2. toHits 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    candidateHits.addAll(toHits(entries, candidate)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (CollectionUtils.isNotEmpty(entriesAfterFilter)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        List<Entry<Video>> entries = entriesAfterFilter.stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                .limit(candidate.getCandidateNum()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                .map(e -> entryMap.get(e)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        // 2. toHits 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        candidateHits.addAll(toHits(entries, candidate)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     // debug log for tracing 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     LOGGER.debug("recalled candidate [{}], queue length [{}], expected [{}], hit [{}]", 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -222,7 +231,7 @@ public class BaseRecaller<Video> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<String, RankItem> hits = new HashMap<String, RankItem>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Map<String, RankItem> hits = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             List<Future<List<RankItem>>> futures = filterExecutorService.invokeAll(callables, DEFAULT_PARALLEL_FILTER_TIMEOUT, TimeUnit.MILLISECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             for (Future<List<RankItem>> future : futures) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -252,7 +261,7 @@ public class BaseRecaller<Video> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     "", ExceptionUtils.getFullStackTrace(e)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<RankItem> result = new ArrayList<RankItem>(hits.values()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        List<RankItem> result = new ArrayList<>(hits.values()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 |