|
@@ -1,6 +1,7 @@
|
|
package com.tzld.piaoquan.recommend.server.framework.recaller;
|
|
package com.tzld.piaoquan.recommend.server.framework.recaller;
|
|
|
|
|
|
|
|
|
|
|
|
+import com.google.common.base.Predicate;
|
|
import com.google.common.collect.FluentIterable;
|
|
import com.google.common.collect.FluentIterable;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Maps;
|
|
import com.google.common.collect.Maps;
|
|
@@ -11,6 +12,7 @@ import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvi
|
|
import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
|
|
import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
|
+import org.checkerframework.checker.nullness.qual.Nullable;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -26,9 +28,14 @@ public class BaseRecaller<Video> {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(BaseRecaller.class);
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(BaseRecaller.class);
|
|
private static final long DEFAULT_QUEUE_LOAD_TIMEOUT = 150; // ms
|
|
private static final long DEFAULT_QUEUE_LOAD_TIMEOUT = 150; // ms
|
|
private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
|
|
private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
|
|
|
|
+
|
|
|
|
+ private static final String FILTER_CONF = ""; // ms
|
|
private static final ExecutorService filterExecutorService = Executors.newFixedThreadPool(128);
|
|
private static final ExecutorService filterExecutorService = Executors.newFixedThreadPool(128);
|
|
private static final ExecutorService fetchQueueExecutorService = Executors.newFixedThreadPool(128);
|
|
private static final ExecutorService fetchQueueExecutorService = Executors.newFixedThreadPool(128);
|
|
private final QueueProvider<Video> queueProvider;
|
|
private final QueueProvider<Video> queueProvider;
|
|
|
|
+
|
|
|
|
+ private final FilterConfig filterConfig;
|
|
|
|
+
|
|
private final long QUEUE_LOAD_TIMEOUT;
|
|
private final long QUEUE_LOAD_TIMEOUT;
|
|
|
|
|
|
public BaseRecaller(QueueProvider<Video> queueProvider) {
|
|
public BaseRecaller(QueueProvider<Video> queueProvider) {
|
|
@@ -38,9 +45,20 @@ public class BaseRecaller<Video> {
|
|
public BaseRecaller(QueueProvider<Video> queueProvider,
|
|
public BaseRecaller(QueueProvider<Video> queueProvider,
|
|
long queueLoadTimeout) {
|
|
long queueLoadTimeout) {
|
|
this.queueProvider = queueProvider;
|
|
this.queueProvider = queueProvider;
|
|
|
|
+ this.filterConfig = new FilterConfig(FILTER_CONF);
|
|
this.QUEUE_LOAD_TIMEOUT = queueLoadTimeout;
|
|
this.QUEUE_LOAD_TIMEOUT = queueLoadTimeout;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ public BaseRecaller(QueueProvider<Video> queueProvider,
|
|
|
|
+ FilterConfig filterConfig,
|
|
|
|
+ long queueLoadTimeout) {
|
|
|
|
+ this.queueProvider = queueProvider;
|
|
|
|
+ this.filterConfig = filterConfig;
|
|
|
|
+ this.QUEUE_LOAD_TIMEOUT = queueLoadTimeout;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
public String extractItemId(Entry<Video> entry) {
|
|
public String extractItemId(Entry<Video> entry) {
|
|
return entry.id;
|
|
return entry.id;
|
|
}
|
|
}
|
|
@@ -162,14 +180,20 @@ public class BaseRecaller<Video> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- List<RankItem> result = convertToRankItem(candidateQueueMap);
|
|
|
|
|
|
+ List<RankItem> result = convertToRankItem(candidateQueueMap, requestData, user);
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
// 转成RankItem
|
|
// 转成RankItem
|
|
// 同时给Filter预留处理
|
|
// 同时给Filter预留处理
|
|
- private List<RankItem> convertToRankItem(Map<Candidate, Queue<Video>> candidateQueueMap) {
|
|
|
|
|
|
+ private List<RankItem> convertToRankItem(Map<Candidate, Queue<Video>> candidateQueueMap,
|
|
|
|
+ RecommendRequest requestData,
|
|
|
|
+ User user) {
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ final FilterPipeline<Video> recallFilter = new FilterPipeline<Video>(this.filterConfig, requestData, user);
|
|
|
|
+
|
|
|
|
|
|
final List<Callable<List<RankItem>>> callables = new ArrayList<Callable<List<RankItem>>>();
|
|
final List<Callable<List<RankItem>>> callables = new ArrayList<Callable<List<RankItem>>>();
|
|
int expectedRecallSum = 0;
|
|
int expectedRecallSum = 0;
|
|
@@ -181,7 +205,12 @@ public class BaseRecaller<Video> {
|
|
final Candidate candidate = entry.getKey();
|
|
final Candidate candidate = entry.getKey();
|
|
try {
|
|
try {
|
|
// 1. filter TODO 待后续增加自定义filter
|
|
// 1. filter TODO 待后续增加自定义filter
|
|
- Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).limit(candidate.getCandidateNum());
|
|
|
|
|
|
+ Iterable<Entry<Video>> entries = FluentIterable.from(entry.getValue()).filter(new Predicate<Entry<Video>>() {
|
|
|
|
+ @Override
|
|
|
|
+ public boolean apply(@Nullable Entry<Video> videoEntry) {
|
|
|
|
+ return recallFilter.predicate(candidate, videoEntry.item);
|
|
|
|
+ }
|
|
|
|
+ }).limit(candidate.getCandidateNum());
|
|
|
|
|
|
// 2. toHits
|
|
// 2. toHits
|
|
candidateHits.addAll(toHits(entries, candidate));
|
|
candidateHits.addAll(toHits(entries, candidate));
|