|
@@ -0,0 +1,133 @@
|
|
|
|
+package com.tzld.longarticle.recommend.server.service.filter.strategy;
|
|
|
|
+
|
|
|
|
+import com.tzld.longarticle.recommend.server.common.ThreadPoolFactory;
|
|
|
|
+import com.tzld.longarticle.recommend.server.common.enums.ContentPoolEnum;
|
|
|
|
+import com.tzld.longarticle.recommend.server.model.Content;
|
|
|
|
+import com.tzld.longarticle.recommend.server.service.filter.FilterParam;
|
|
|
|
+import com.tzld.longarticle.recommend.server.service.filter.FilterResult;
|
|
|
|
+import com.tzld.longarticle.recommend.server.service.filter.FilterStrategy;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
+
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.Future;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+@Component
|
|
|
|
+@Slf4j
|
|
|
|
+public class DeDuplicationStrategy implements FilterStrategy {
|
|
|
|
+
|
|
|
|
+ private final ExecutorService pool = ThreadPoolFactory.deDuplicatePool();
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public FilterResult filter(FilterParam param) {
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+ FilterResult filterResult = new FilterResult();
|
|
|
|
+ List<String> result;
|
|
|
|
+ List<Content> filterContents = new CopyOnWriteArrayList<>();
|
|
|
|
+ // 先对内容池内部去重
|
|
|
|
+ List<Content> middleContent = innerDeduplication(param.getContents(), filterContents);
|
|
|
|
+ // 内容池间进行去重
|
|
|
|
+ result = groupDeduplication(middleContent, filterContents);
|
|
|
|
+
|
|
|
|
+ filterResult.setContentIds(result);
|
|
|
|
+ filterResult.setFilterContent(filterContents);
|
|
|
|
+ log.info("DeDuplicationStrategy cost:{}", System.currentTimeMillis() - start);
|
|
|
|
+ return filterResult;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private List<Content> innerDeduplication(List<Content> contentList, List<Content> filterContents) {
|
|
|
|
+ List<Content> result = new ArrayList<>();
|
|
|
|
+ Map<String, List<Content>> contentMap = contentList.stream().collect(Collectors.groupingBy(Content::getContentPoolType));
|
|
|
|
+ List<Future<List<Content>>> futures = new ArrayList<>();
|
|
|
|
+ List<String> contentPoolList = ContentPoolEnum.getOrderContentPool();
|
|
|
|
+ CountDownLatch cdl = new CountDownLatch(contentPoolList.size());
|
|
|
|
+ for (String contentPool : contentPoolList) {
|
|
|
|
+ List<Content> contents = contentMap.get(contentPool);
|
|
|
|
+ Future<List<Content>> future = pool.submit(() -> {
|
|
|
|
+ try {
|
|
|
|
+ List<Content> res = new ArrayList<>();
|
|
|
|
+ List<String> titles = new ArrayList<>();
|
|
|
|
+ if (CollectionUtils.isEmpty(contents)) {
|
|
|
|
+ return new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+ for (Content c : contents) {
|
|
|
|
+ if (titles.contains(c.getTitle())) {
|
|
|
|
+ c.setFilterReason("重复文章");
|
|
|
|
+ filterContents.add(c);
|
|
|
|
+ } else {
|
|
|
|
+ res.add(c);
|
|
|
|
+ titles.add(c.getTitle());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return res;
|
|
|
|
+ } finally {
|
|
|
|
+ cdl.countDown();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ futures.add(future);
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ cdl.await();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ log.error("filter error", e);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ for (Future<List<Content>> f : futures) {
|
|
|
|
+ try {
|
|
|
|
+ List<Content> res = f.get();
|
|
|
|
+ result.addAll(res);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("future get error ", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private List<String> groupDeduplication(List<Content> contentList, List<Content> filterContents) {
|
|
|
|
+ List<String> result = new CopyOnWriteArrayList<>();
|
|
|
|
+ List<String> titles = new CopyOnWriteArrayList<>();
|
|
|
|
+ Map<String, List<Content>> contentMap = contentList.stream().collect(Collectors.groupingBy(Content::getContentPoolType));
|
|
|
|
+
|
|
|
|
+ List<String> contentPoolList = ContentPoolEnum.getOrderContentPool();
|
|
|
|
+ for (String contentPool : contentPoolList) {
|
|
|
|
+ List<Content> contents = contentMap.get(contentPool);
|
|
|
|
+ if (CollectionUtils.isEmpty(titles) && CollectionUtils.isNotEmpty(contents)) {
|
|
|
|
+ result.addAll(contents.stream().map(Content::getId).collect(Collectors.toList()));
|
|
|
|
+ titles.addAll(contents.stream().map(Content::getTitle).collect(Collectors.toList()));
|
|
|
|
+ }
|
|
|
|
+ if (CollectionUtils.isEmpty(contents)) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ CountDownLatch cdl = new CountDownLatch(contents.size());
|
|
|
|
+ for (Content content : contents) {
|
|
|
|
+ pool.submit(() -> {
|
|
|
|
+ try {
|
|
|
|
+ if (titles.contains(content.getTitle())) {
|
|
|
|
+ content.setFilterReason("重复文章");
|
|
|
|
+ filterContents.add(content);
|
|
|
|
+ } else {
|
|
|
|
+ result.add(content.getId());
|
|
|
|
+ titles.add(content.getTitle());
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ cdl.countDown();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ cdl.await();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ log.error("filter error", e);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+}
|