|  | @@ -1,18 +1,14 @@
 | 
											
												
													
														|  |  package com.tzld.piaoquan.recommend.server.framework.recaller;
 |  |  package com.tzld.piaoquan.recommend.server.framework.recaller;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -import com.google.common.base.Function;
 |  | 
 | 
											
												
													
														|  |  import com.google.common.collect.FluentIterable;
 |  |  import com.google.common.collect.FluentIterable;
 | 
											
												
													
														|  |  import com.google.common.collect.Lists;
 |  |  import com.google.common.collect.Lists;
 | 
											
												
													
														|  |  import com.google.common.collect.Maps;
 |  |  import com.google.common.collect.Maps;
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  |  import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 |  |  import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 | 
											
												
													
														|  |  import com.tzld.piaoquan.recommend.server.framework.candidiate.*;
 |  |  import com.tzld.piaoquan.recommend.server.framework.candidiate.*;
 | 
											
												
													
														|  |  import com.tzld.piaoquan.recommend.server.framework.common.User;
 |  |  import com.tzld.piaoquan.recommend.server.framework.common.User;
 | 
											
												
													
														|  |  import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider;
 |  |  import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider;
 | 
											
												
													
														|  |  import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 |  |  import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 | 
											
												
													
														|  | -import com.tzld.piaoquan.recommend.server.model.Video;
 |  | 
 | 
											
												
													
														|  |  import org.apache.commons.collections4.CollectionUtils;
 |  |  import org.apache.commons.collections4.CollectionUtils;
 | 
											
												
													
														|  |  import org.apache.commons.lang.exception.ExceptionUtils;
 |  |  import org.apache.commons.lang.exception.ExceptionUtils;
 | 
											
												
													
														|  |  import org.slf4j.Logger;
 |  |  import org.slf4j.Logger;
 | 
											
										
											
												
													
														|  | @@ -22,12 +18,7 @@ import java.util.ArrayList;
 | 
											
												
													
														|  |  import java.util.HashMap;
 |  |  import java.util.HashMap;
 | 
											
												
													
														|  |  import java.util.List;
 |  |  import java.util.List;
 | 
											
												
													
														|  |  import java.util.Map;
 |  |  import java.util.Map;
 | 
											
												
													
														|  | -import java.util.concurrent.Callable;
 |  | 
 | 
											
												
													
														|  | -import java.util.concurrent.ExecutionException;
 |  | 
 | 
											
												
													
														|  | -import java.util.concurrent.ExecutorService;
 |  | 
 | 
											
												
													
														|  | -import java.util.concurrent.Executors;
 |  | 
 | 
											
												
													
														|  | -import java.util.concurrent.Future;
 |  | 
 | 
											
												
													
														|  | -import java.util.concurrent.TimeUnit;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import java.util.concurrent.*;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  public class BaseRecaller<Video> {
 |  |  public class BaseRecaller<Video> {
 | 
											
										
											
												
													
														|  | @@ -88,27 +79,19 @@ public class BaseRecaller<Video> {
 | 
											
												
													
														|  |      // 读取redis中的数据放入queue中
 |  |      // 读取redis中的数据放入queue中
 | 
											
												
													
														|  |      public Map<Candidate, Queue<Video>> loadQueues(List<Candidate> candidates) {
 |  |      public Map<Candidate, Queue<Video>> loadQueues(List<Candidate> candidates) {
 | 
											
												
													
														|  |          // update queueName
 |  |          // update queueName
 | 
											
												
													
														|  | -        Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(new Function<Candidate, Candidate>() {
 |  | 
 | 
											
												
													
														|  | -            @Override
 |  | 
 | 
											
												
													
														|  | -            public Candidate apply(Candidate candidate) {
 |  | 
 | 
											
												
													
														|  | -                try {
 |  | 
 | 
											
												
													
														|  | -                    long ttl = QueueName.DEFAULT_LOCAL_CACHE_TTL;
 |  | 
 | 
											
												
													
														|  | -                    candidate.setCandidateQueueName(QueueName.fromString(candidate.getCandidateKey(), ttl));
 |  | 
 | 
											
												
													
														|  | -                } catch (Exception e) {
 |  | 
 | 
											
												
													
														|  | -                    candidate.setCandidateQueueName(null);
 |  | 
 | 
											
												
													
														|  | -                    LOGGER.error("error parse QueueName [{}]", candidate.getCandidateKey());
 |  | 
 | 
											
												
													
														|  | -                }
 |  | 
 | 
											
												
													
														|  | -                return candidate;
 |  | 
 | 
											
												
													
														|  | 
 |  | +        Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(candidate -> {
 | 
											
												
													
														|  | 
 |  | +            try {
 | 
											
												
													
														|  | 
 |  | +                long ttl = QueueName.DEFAULT_LOCAL_CACHE_TTL;
 | 
											
												
													
														|  | 
 |  | +                candidate.setCandidateQueueName(QueueName.fromString(candidate.getCandidateKey(), ttl));
 | 
											
												
													
														|  | 
 |  | +            } catch (Exception e) {
 | 
											
												
													
														|  | 
 |  | +                candidate.setCandidateQueueName(null);
 | 
											
												
													
														|  | 
 |  | +                LOGGER.error("error parse QueueName [{}]", candidate.getCandidateKey());
 | 
											
												
													
														|  |              }
 |  |              }
 | 
											
												
													
														|  | 
 |  | +            return candidate;
 | 
											
												
													
														|  |          });
 |  |          });
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          // parse queues
 |  |          // parse queues
 | 
											
												
													
														|  | -        Iterable<QueueName> queueNames = FluentIterable.from(updateCandidates).transform(new Function<Candidate, QueueName>() {
 |  | 
 | 
											
												
													
														|  | -            @Override
 |  | 
 | 
											
												
													
														|  | -            public QueueName apply(Candidate candidate) {
 |  | 
 | 
											
												
													
														|  | -                return candidate.getCandidateQueueName();
 |  | 
 | 
											
												
													
														|  | -            }
 |  | 
 | 
											
												
													
														|  | -        });
 |  | 
 | 
											
												
													
														|  | 
 |  | +        Iterable<QueueName> queueNames = FluentIterable.from(updateCandidates).transform(candidate -> candidate.getCandidateQueueName());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          // parallel load queues, redis 或者缓存获取index
 |  |          // parallel load queues, redis 或者缓存获取index
 | 
											
												
													
														|  |          Map<QueueName, Queue<Video>> queues = Maps.newConcurrentMap();
 |  |          Map<QueueName, Queue<Video>> queues = Maps.newConcurrentMap();
 | 
											
										
											
												
													
														|  | @@ -144,12 +127,9 @@ public class BaseRecaller<Video> {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          // load from redis
 |  |          // load from redis
 | 
											
												
													
														|  |          List<Callable<Map<Candidate, Queue<Video>>>> fetchQueueCalls = Lists.newArrayList();
 |  |          List<Callable<Map<Candidate, Queue<Video>>>> fetchQueueCalls = Lists.newArrayList();
 | 
											
												
													
														|  | -        fetchQueueCalls.add(new Callable<Map<Candidate, Queue<Video>>>() {
 |  | 
 | 
											
												
													
														|  | -            @Override
 |  | 
 | 
											
												
													
														|  | -            public Map<Candidate, Queue<Video>> call() throws Exception {
 |  | 
 | 
											
												
													
														|  | -                boolean isFromRedis = true;
 |  | 
 | 
											
												
													
														|  | -                return obtainQueue(recallCandidates, requestData, user, isFromRedis);
 |  | 
 | 
											
												
													
														|  | -            }
 |  | 
 | 
											
												
													
														|  | 
 |  | +        fetchQueueCalls.add(() -> {
 | 
											
												
													
														|  | 
 |  | +            boolean isFromRedis = true;
 | 
											
												
													
														|  | 
 |  | +            return obtainQueue(recallCandidates, requestData, user, isFromRedis);
 | 
											
												
													
														|  |          });
 |  |          });
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          // 多线程执行load
 |  |          // 多线程执行load
 |