| 
					
				 | 
			
			
				@@ -195,34 +195,30 @@ public class BaseRecaller<Video> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         final FilterPipeline<Video> recallFilter = new FilterPipeline<>(this.filterConfig, requestData, user); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        final List<Callable<List<RankItem>>> callables = new ArrayList<Callable<List<RankItem>>>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        int expectedRecallSum = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        final List<Callable<List<RankItem>>> callables = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         for (final Map.Entry<Candidate, Queue<Video>> entry : candidateQueueMap.entrySet()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            callables.add(new Callable<List<RankItem>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                public List<RankItem> call() throws Exception { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    List<RankItem> candidateHits = new ArrayList<RankItem>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    final Candidate candidate = entry.getKey(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        // 1. filter  TODO 待后续增加自定义filter 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).filter(new Predicate<Entry<Video>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            public boolean apply(@Nullable Entry<Video> videoEntry) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                return recallFilter.predicate(candidate, videoEntry.item); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        }).limit(candidate.getCandidateNum()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            callables.add(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                List<RankItem> candidateHits = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                final Candidate candidate = entry.getKey(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // 1. filter  TODO 待后续增加自定义filter 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).filter(new Predicate<Entry<Video>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        public boolean apply(@Nullable Entry<Video> videoEntry) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            return recallFilter.predicate(candidate, videoEntry.item); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    }).limit(candidate.getCandidateNum()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        // 2. toHits 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        candidateHits.addAll(toHits(entries, candidate)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // 2. toHits 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    candidateHits.addAll(toHits(entries, candidate)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        // debug log for tracing 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        LOGGER.debug("recalled candidate [{}], queue length [{}], expected [{}], hit [{}]", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                new Object[]{candidate.getCandidateKey(), entry.getValue().size(), candidate.getCandidateNum(), candidateHits.size()}); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        LOGGER.error("recall filter queue occur error, queue [{}], error: [{}]", candidate.toString(), ExceptionUtils.getFullStackTrace(e)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    return candidateHits; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // debug log for tracing 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    LOGGER.debug("recalled candidate [{}], queue length [{}], expected [{}], hit [{}]", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            new Object[]{candidate.getCandidateKey(), entry.getValue().size(), candidate.getCandidateNum(), candidateHits.size()}); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    LOGGER.error("recall filter queue occur error, queue [{}], error: [{}]", candidate.toString(), ExceptionUtils.getFullStackTrace(e)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return candidateHits; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 |