|
@@ -1,175 +1,29 @@
|
|
|
package com.tzld.piaoquan.recommend.server.service.filter;
|
|
|
|
|
|
-import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
-import com.tzld.piaoquan.recommend.server.service.PreViewedService;
|
|
|
import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
|
|
|
-import com.tzld.piaoquan.recommend.server.service.ViewedService;
|
|
|
import com.tzld.piaoquan.recommend.server.service.filter.strategy.*;
|
|
|
-import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
-import org.apache.commons.collections4.MapUtils;
|
|
|
-import org.apache.commons.lang.math.NumberUtils;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
-import java.util.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author dyp
|
|
|
*/
|
|
|
@Slf4j
|
|
|
public abstract class AbstractFilterService {
|
|
|
- @Autowired
|
|
|
- private PreViewedService preViewedService;
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private ViewedService viewedService;
|
|
|
|
|
|
private final ExecutorService pool = ThreadPoolFactory.filterPool();
|
|
|
|
|
|
- @Value("${newFilterGlobalSwitch:false}")
|
|
|
- private boolean newFilterGlobalSwitch;
|
|
|
- @Value("${newFilterAbExpCode:}")
|
|
|
- private String newFilterAbExpCode;
|
|
|
-
|
|
|
- @ApolloJsonValue("${supply.exp.list:[6]}")
|
|
|
- private Set<Integer> supplyExps;
|
|
|
-
|
|
|
- @Value("${supply.exp.id:666}")
|
|
|
- private int supplyExpId;
|
|
|
-
|
|
|
- @ApolloJsonValue("${supply.exp.video.list:[]}")
|
|
|
- private Set<Long> supplyExpVideos;
|
|
|
- @ApolloJsonValue("${not.supply.exp.video.list:[]}")
|
|
|
- private Set<Long> notSupplyExpVideos;
|
|
|
-
|
|
|
-
|
|
|
- private List<Long> viewFilterOld(FilterParam param) {
|
|
|
- // 风险过滤
|
|
|
- List<Long> videoIds = filterWithRiskVideo(param.getRiskFilterFlag(),
|
|
|
- param.getAppType(), param.getRegionCode(), param.getAppRegionFiltered(), param.getVideosWithRisk(),
|
|
|
- param.getVideoIds(), param.getForceTruncation());
|
|
|
-
|
|
|
- videoIds = filterBySupplyExp(param.getAppType(), param.getExpIdMap(), videoIds);
|
|
|
-
|
|
|
- if (param.isNotUsePreView()) {
|
|
|
-
|
|
|
- } else {
|
|
|
- videoIds = filterByPreViewed(param.getAppType(), param.getMid(), videoIds);
|
|
|
- }
|
|
|
- if (param.isConcurrent()) {
|
|
|
- videoIds = filterByViewedConcurrent(param, videoIds);
|
|
|
- } else {
|
|
|
- videoIds = filterByViewed(param, videoIds);
|
|
|
- }
|
|
|
-// log.info("viewFilterOld after {}", JSONUtils.toJson(videoIds));
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
-
|
|
|
- private List<Long> filterBySupplyExp(int appType, Map<String, String> expIdMap, List<Long> videoIds) {
|
|
|
- if (!supplyExps.contains(appType)) {
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
- if (MapUtils.isEmpty(expIdMap)) {
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
- // 供给实验
|
|
|
- if (supplyExpId == NumberUtils.toInt(expIdMap.get("supply"), -1)) {
|
|
|
- // 对照组视频只在对照组出
|
|
|
- return videoIds.stream()
|
|
|
- .filter(l -> !notSupplyExpVideos.contains(l))
|
|
|
- .collect(Collectors.toList());
|
|
|
- } else {
|
|
|
- // 实验组视频只在实验组出
|
|
|
- return videoIds.stream()
|
|
|
- .filter(l -> !supplyExpVideos.contains(l))
|
|
|
- .collect(Collectors.toList());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<Long> filterByViewedConcurrent(FilterParam param, List<Long> videoIds) {
|
|
|
-// if (StringUtils.isBlank(param.getMid())
|
|
|
-// || CollectionUtils.isEmpty(videoIds)) {
|
|
|
-// return videoIds;
|
|
|
-// }
|
|
|
- if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
-
|
|
|
- int chunkSize = 20;
|
|
|
- List<List<Long>> chunks = new ArrayList<>();
|
|
|
- int size = videoIds.size();
|
|
|
-
|
|
|
- for (int i = 0; i < size; i += chunkSize) {
|
|
|
- int endIndex = Math.min(i + chunkSize, size);
|
|
|
- List<Long> chunk = videoIds.subList(i, endIndex);
|
|
|
- chunks.add(chunk);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- CountDownLatch cdl = new CountDownLatch(chunks.size());
|
|
|
- List<Future<List<Long>>> futures = new ArrayList<>();
|
|
|
- for (final List<Long> ids : chunks) {
|
|
|
- Future<List<Long>> future = pool.submit(() ->
|
|
|
- viewedService.filterViewedVideo(param.getAppType(), param.getMid(), param.getUid(), ids, param.getCityCode(),
|
|
|
- param.getAbExpCodes(), param.getHotSceneType(), param.getClientIp()));
|
|
|
- futures.add(future);
|
|
|
- }
|
|
|
- try {
|
|
|
- cdl.await(150, TimeUnit.MILLISECONDS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- log.error("filter error", e);
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- List<Long> result = new ArrayList<>();
|
|
|
- for (Future<List<Long>> f : futures) {
|
|
|
- try {
|
|
|
- result.addAll(f.get());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("future get error ", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return result;
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private List<Long> filterByViewed(FilterParam param, List<Long> videoIds) {
|
|
|
- // TODO uid为空时,还需要过滤么?
|
|
|
-// if (StringUtils.isBlank(param.getMid())
|
|
|
-// || CollectionUtils.isEmpty(videoIds)) {
|
|
|
-// return videoIds;
|
|
|
-// }
|
|
|
- if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
- return viewedService.filterViewedVideo(param.getAppType(), param.getMid(), param.getUid(), videoIds, param.getCityCode(),
|
|
|
- param.getAbExpCodes(), param.getHotSceneType(), param.getClientIp());
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private List<Long> filterByPreViewed(int appType, String mid, List<Long> videoIds) {
|
|
|
-
|
|
|
- if (StringUtils.isBlank(mid)) {
|
|
|
- return videoIds;
|
|
|
- }
|
|
|
- Set<Long> preViewedVideoIds = preViewedService.getVideoIds(appType, mid);
|
|
|
- return videoIds.stream()
|
|
|
- .filter(l -> !preViewedVideoIds.contains(l))
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- }
|
|
|
|
|
|
private List<Long> filterWithRiskVideo(boolean riskFlag,
|
|
|
int appType,
|
|
@@ -210,17 +64,6 @@ public abstract class AbstractFilterService {
|
|
|
}
|
|
|
|
|
|
protected List<Long> viewFilter(FilterParam param) {
|
|
|
- boolean hit = newFilterGlobalSwitch
|
|
|
- || CommonCollectionUtils.contains(param.getAbExpCodes(), newFilterAbExpCode);
|
|
|
- if (hit) {
|
|
|
- return viewFilterNew(param);
|
|
|
- } else {
|
|
|
- return viewFilterOld(param);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<Long> viewFilterNew(FilterParam param) {
|
|
|
-
|
|
|
// hardcode : 风险过滤会做截断,所以先同步调用
|
|
|
List<Long> riskVideoIds = filterWithRiskVideo(param.getRiskFilterFlag(),
|
|
|
param.getAppType(), param.getRegionCode(), param.getAppRegionFiltered(), param.getVideosWithRisk(),
|