|
@@ -1,34 +1,45 @@
|
|
|
package com.tzld.piaoquan.recommend.server.service.filter;
|
|
|
|
|
|
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
import com.tzld.piaoquan.recommend.server.service.PreViewedService;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
|
|
|
import com.tzld.piaoquan.recommend.server.service.ViewedService;
|
|
|
-import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.filter.strategy.*;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.context.ApplicationContext;
|
|
|
-import org.springframework.context.ApplicationContextAware;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author dyp
|
|
|
*/
|
|
|
@Slf4j
|
|
|
-public abstract class AbstractFilterService implements ApplicationContextAware {
|
|
|
+public abstract class AbstractFilterService {
|
|
|
@Autowired
|
|
|
private PreViewedService preViewedService;
|
|
|
|
|
|
@Autowired
|
|
|
private ViewedService viewedService;
|
|
|
|
|
|
- protected final Map<String, RecallStrategy> strategyMap = new HashMap<>();
|
|
|
+ private ExecutorService pool = ThreadPoolFactory.filterPool();
|
|
|
|
|
|
- protected ApplicationContext applicationContext;
|
|
|
+ @Value("${newFilterGlobalSwitch:false}")
|
|
|
+ private boolean newFilterGlobalSwitch;
|
|
|
+ @Value("${newFilterAbExpCode:}")
|
|
|
+ private String newFilterAbExpCode;
|
|
|
|
|
|
- protected List<Long> viewFilter(FilterParam param) {
|
|
|
+
|
|
|
+ private List<Long> viewFilterOld(FilterParam param) {
|
|
|
// log.info("filterParam={}", JSONUtils.toJson(param));
|
|
|
// 风险过滤
|
|
|
List<Long> videoIds = filterWithRiskVideo(param.getRiskFilterFlag(),
|
|
@@ -107,9 +118,80 @@ public abstract class AbstractFilterService implements ApplicationContextAware {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected boolean isNewFilter() {
|
|
|
- return false;
|
|
|
+ protected List<Long> viewFilter(FilterParam param) {
|
|
|
+ boolean hit = newFilterGlobalSwitch
|
|
|
+ || CommonCollectionUtils.contains(param.getAbExpCodes(), newFilterAbExpCode);
|
|
|
+ if (hit) {
|
|
|
+ return viewFilterNew(param);
|
|
|
+ } else {
|
|
|
+ return viewFilterOld(param);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<Long> viewFilterNew(FilterParam param) {
|
|
|
+
|
|
|
+ // hardcode : 风险过滤会做截断,所以先同步调用
|
|
|
+ List<Long> riskVideoIds = filterWithRiskVideo(param.getRiskFilterFlag(),
|
|
|
+ param.getAppType(), param.getRegionCode(), param.getAppRegionFiltered(), param.getVideosWithRisk(),
|
|
|
+ param.getVideoIds(), param.getForceTruncation());
|
|
|
+ param.setVideoIds(riskVideoIds);
|
|
|
+
|
|
|
+ List<FilterStrategy> strategies = getStrategies(param);
|
|
|
+ log.info("strategies {}", JSONUtils.toJson(CommonCollectionUtils.toList(strategies, o -> o.getClass().getSimpleName())));
|
|
|
+ CountDownLatch cdl = new CountDownLatch(strategies.size());
|
|
|
+ List<Future<List<Long>>> futures = new ArrayList<>();
|
|
|
+ for (final FilterStrategy strategy : strategies) {
|
|
|
+ Future<List<Long>> future = pool.submit(() -> {
|
|
|
+ List<Long> result = strategy.filter(param);
|
|
|
+ cdl.countDown();
|
|
|
+ return result;
|
|
|
+ });
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ cdl.await(3000, TimeUnit.MILLISECONDS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("recall error", e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<List<Long>> videoIds = new ArrayList<>();
|
|
|
+ for (Future<List<Long>> f : futures) {
|
|
|
+ try {
|
|
|
+ videoIds.add(f.get());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("future get error ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ List<Long> result = videoIds.get(0);
|
|
|
+ for (int i = 1; i < videoIds.size(); ++i) {
|
|
|
+ result.retainAll(videoIds.get(i));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
+ private List<FilterStrategy> getStrategies(FilterParam param) {
|
|
|
+ List<FilterStrategy> strategies = new ArrayList<>();
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(PreViewedStrategy.class));
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(ViewedStrategy.class));
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(RecommendStatusStrategy.class));
|
|
|
+ switch (param.getAppType()) {
|
|
|
+ case 0:
|
|
|
+ case 4:
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(AllowListStrategy.class));
|
|
|
+ break;
|
|
|
+ case 13:
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(AllowListStrategy.class));
|
|
|
+ strategies.add(ServiceBeanFactory.getBean(TagStrategy.class));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ return strategies;
|
|
|
+ }
|
|
|
|
|
|
}
|