Browse Source

SensitiveStrategy 并发处理

wangyunpeng 3 months ago
parent
commit
027bf4a8c2

+ 47 - 24
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/filter/strategy/SensitiveStrategy.java

@@ -3,11 +3,13 @@ package com.tzld.longarticle.recommend.server.service.recommend.filter.strategy;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.cache.Cache;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.tzld.longarticle.recommend.server.common.CommonThreadPoolExecutor;
 import com.tzld.longarticle.recommend.server.common.enums.StatusEnum;
 import com.tzld.longarticle.recommend.server.common.enums.StatusEnum;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
 import com.tzld.longarticle.recommend.server.model.entity.crawler.ArticleSensitive;
 import com.tzld.longarticle.recommend.server.model.entity.crawler.ArticleSensitive;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.ArticleUnsafeTitle;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.ArticleUnsafeTitle;
-import com.tzld.longarticle.recommend.server.remote.ArticleSensitiveRemoteService;
 import com.tzld.longarticle.recommend.server.repository.crawler.ArticleSensitiveRepository;
 import com.tzld.longarticle.recommend.server.repository.crawler.ArticleSensitiveRepository;
 import com.tzld.longarticle.recommend.server.repository.longArticle.ArticleUnsafeTitleRepository;
 import com.tzld.longarticle.recommend.server.repository.longArticle.ArticleUnsafeTitleRepository;
 import com.tzld.longarticle.recommend.server.service.recommend.filter.FilterParam;
 import com.tzld.longarticle.recommend.server.service.recommend.filter.FilterParam;
@@ -20,15 +22,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
 import java.util.*;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 @Component
 @Component
 @Slf4j
 @Slf4j
 public class SensitiveStrategy implements FilterStrategy {
 public class SensitiveStrategy implements FilterStrategy {
 
 
-    @Autowired
-    private ArticleSensitiveRemoteService articleSensitiveRemoteService;
     @Autowired
     @Autowired
     private ArticleSensitiveRepository articleSensitiveRepository;
     private ArticleSensitiveRepository articleSensitiveRepository;
     @Autowired
     @Autowired
@@ -37,14 +37,22 @@ public class SensitiveStrategy implements FilterStrategy {
     @ApolloJsonValue("${UnSafeTitles:[]}")
     @ApolloJsonValue("${UnSafeTitles:[]}")
     private static List<String> UnSafeTitles;
     private static List<String> UnSafeTitles;
 
 
+    private final static ExecutorService pool = new CommonThreadPoolExecutor(
+            5,
+            5,
+            0L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(100),
+            new ThreadFactoryBuilder().setNameFormat("SensitiveStrategy-%d").build(),
+            new ThreadPoolExecutor.AbortPolicy());
+
     Cache<String, Boolean> similarityCache = CacheBuilder.newBuilder()
     Cache<String, Boolean> similarityCache = CacheBuilder.newBuilder()
             .expireAfterWrite(60 * 60, TimeUnit.SECONDS).build();
             .expireAfterWrite(60 * 60, TimeUnit.SECONDS).build();
 
 
     @Override
     @Override
     public FilterResult filter(FilterParam param) {
     public FilterResult filter(FilterParam param) {
         FilterResult filterResult = new FilterResult();
         FilterResult filterResult = new FilterResult();
-        List<String> result = new ArrayList<>(param.getContents().size());
-        List<Content> filterContents = new ArrayList<>();
+        List<String> result = Collections.synchronizedList(new ArrayList<>(param.getContents().size()));
+        List<Content> filterContents = Collections.synchronizedList(new ArrayList<>());
 
 
 //        Map<String, String> titleMd5Map = new HashMap<>();
 //        Map<String, String> titleMd5Map = new HashMap<>();
 //        Map<String, ArticleSensitive> articleSensitiveMap = new HashMap<>();
 //        Map<String, ArticleSensitive> articleSensitiveMap = new HashMap<>();
@@ -75,25 +83,40 @@ public class SensitiveStrategy implements FilterStrategy {
         Map<String, Boolean> similarityMap = new HashMap<>(similarityCache.getAllPresent(allTitles));
         Map<String, Boolean> similarityMap = new HashMap<>(similarityCache.getAllPresent(allTitles));
         long t4 = System.currentTimeMillis();
         long t4 = System.currentTimeMillis();
         log.info("SensitiveStrategy get unsafe title cache cost time:{}", t4 - t3);
         log.info("SensitiveStrategy get unsafe title cache cost time:{}", t4 - t3);
-        for (Content content : param.getContents()) {
-            try {
-                Boolean isSimilar;
-                if (similarityMap.containsKey(content.getTitle())) {
-                    isSimilar = similarityMap.get(content.getTitle());
-                } else {
-                    isSimilar = TitleSimilarCheckUtil.isDuplicateContentByCache(content.getTitle(), unsafeTitleCache,
-                            TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
-                    similarityCache.put(content.getTitle(), isSimilar);
-                }
-                if (isSimilar) {
-                    content.setFilterReason("安全违规");
-                    filterContents.add(content);
-                } else {
-                    result.add(content.getId());
+        List<List<Content>> partitions = Lists.partition(param.getContents(), 500);
+        CountDownLatch cdl = new CountDownLatch(partitions.size());
+        for (List<Content> partition : partitions) {
+            pool.submit(() -> {
+                try {
+                    for (Content content : partition) {
+                        try {
+                            Boolean isSimilar;
+                            if (similarityMap.containsKey(content.getTitle())) {
+                                isSimilar = similarityMap.get(content.getTitle());
+                            } else {
+                                isSimilar = TitleSimilarCheckUtil.isDuplicateContentByCache(content.getTitle(), unsafeTitleCache,
+                                        TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
+                                similarityCache.put(content.getTitle(), isSimilar);
+                            }
+                            if (isSimilar) {
+                                content.setFilterReason("安全违规");
+                                filterContents.add(content);
+                            } else {
+                                result.add(content.getId());
+                            }
+                        } catch (Exception e) {
+                            log.error("similar check error ", e);
+                        }
+                    }
+                } finally {
+                    cdl.countDown();
                 }
                 }
-            } catch (Exception e) {
-                log.error("similar check error ", e);
-            }
+            });
+        }
+        try {
+            cdl.await();
+        } catch (InterruptedException e) {
+            log.error("similar check error ", e);
         }
         }
         long t5 = System.currentTimeMillis();
         long t5 = System.currentTimeMillis();
         log.info("SensitiveStrategy filter cost time:{}", t5 - t4);
         log.info("SensitiveStrategy filter cost time:{}", t5 - t4);