wangyunpeng 1 неделя назад
Родитель
Сommit
8d13930a55

+ 32 - 23
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/common/ThreadPoolFactory.java

@@ -11,41 +11,50 @@ import java.util.concurrent.TimeUnit;
  * @author dyp
  */
 public final class ThreadPoolFactory {
+
     private final static ExecutorService DEFAULT = new CommonThreadPoolExecutor(
-            32,
-            256,
-            0L, TimeUnit.SECONDS,
+            4,                                    // 核心线程:与4C匹配
+            16,                                   // 最大线程:峰值保护
+            60L, TimeUnit.SECONDS,               // 空闲回收
             new LinkedBlockingQueue<>(200000),
             new ThreadFactoryBuilder().setNameFormat("DEFAULT-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝时调用者执行
+    
+    // 召回线程池:处理内容召回,IO密集型(DB查询)
     public final static ExecutorService RECALL = new CommonThreadPoolExecutor(
-            32,
-            128,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
+            4,                                    // 核心线程:与4C匹配
+            8,                                    // 最大线程:避免过度竞争
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000),
             new ThreadFactoryBuilder().setNameFormat("RecallService-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    
+    // 过滤线程池:并发执行多个过滤策略
     private final static ExecutorService FILTER = new CommonThreadPoolExecutor(
-            128,
-            256,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
+            8,                                    // 核心线程:策略并发数
+            16,                                   // 最大线程
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000),
             new ThreadFactoryBuilder().setNameFormat("FilterService-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    
+    // 评分线程池:计算密集型,线程数不宜过多
     private final static ExecutorService SCORE = new CommonThreadPoolExecutor(
-            32,
-            128,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
+            4,                                    // 核心线程:计算密集型保守
+            8,                                    // 最大线程
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(2000),
             new ThreadFactoryBuilder().setNameFormat("ScoreService-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    
+    // 去重线程池:固定线程,避免创建销毁开销
     private final static ExecutorService DeDuplicate = new CommonThreadPoolExecutor(
-            128,
-            128,
-            0L, TimeUnit.SECONDS,
+            8,                                    // 固定8线程
+            8,                                    // 固定大小
+            0L, TimeUnit.SECONDS,                // 不回收
             new LinkedBlockingQueue<>(),
             new ThreadFactoryBuilder().setNameFormat("DeDuplicate-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
 
     public static ExecutorService defaultPool() {
         return DEFAULT;

+ 22 - 9
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/filter/FilterService.java

@@ -86,20 +86,34 @@ public class FilterService {
             return null;
         }
 
-        List<List<String>> contentIdsList = new ArrayList<>();
+        // 使用Set集合优化交集计算,减少内存占用
+        Set<String> resultContentIds = null;
         List<Content> filterContents = new ArrayList<>();
         Set<String> filterContentIds = new HashSet<>();
+        
         for (Future<FilterResult> f : futures) {
             try {
                 if (Objects.isNull(f)) {
                     continue;
                 }
                 FilterResult filterResult = f.get();
-                contentIdsList.add(filterResult.getContentIds());
+                List<String> contentIds = filterResult.getContentIds();
+                
+                // 使用Set进行交集计算,避免List的retainAll性能问题
+                if (resultContentIds == null) {
+                    resultContentIds = new HashSet<>(contentIds);
+                } else {
+                    resultContentIds.retainAll(contentIds);
+                }
+                
+                // 及时清理帮助GC
+                if (contentIds instanceof ArrayList) {
+                    ((ArrayList<?>) contentIds).clear();
+                }
+                
                 if (CollectionUtils.isNotEmpty(filterResult.getFilterContent())) {
                     for (Content content : filterResult.getFilterContent()) {
-                        if (!filterContentIds.contains(content.getId())) {
-                            filterContentIds.add(content.getId());
+                        if (filterContentIds.add(content.getId())) {
                             filterContents.add(content);
                         }
                     }
@@ -108,13 +122,12 @@ public class FilterService {
                 log.error("future get error ", e);
             }
         }
-        if (CollectionUtils.isEmpty(contentIdsList)) {
+        if (resultContentIds == null || resultContentIds.isEmpty()) {
             return result;
         }
-        List<String> contentIds = param.getContents().stream().map(Content::getId).collect(Collectors.toList());
-        for (int i = 0; i < contentIdsList.size(); ++i) {
-            contentIds.retainAll(contentIdsList.get(i));
-        }
+        
+        // 转换为List返回
+        List<String> contentIds = new ArrayList<>(resultContentIds);
         result.setContentIds(contentIds);
         result.setFilterContent(filterContents);
         Long cost = System.currentTimeMillis() - start;

+ 23 - 8
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/filter/strategy/DeDuplicationStrategy.java

@@ -11,7 +11,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.stereotype.Component;
 
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -27,18 +27,21 @@ public class DeDuplicationStrategy implements FilterStrategy {
     public FilterResult filter(FilterParam param) {
         FilterResult filterResult = new FilterResult();
         List<String> result;
-        List<Content> filterContents = new CopyOnWriteArrayList<>();
+        // 使用ConcurrentHashMap替代CopyOnWriteArrayList,提高并发写入性能
+        Set<String> filterContentIdSet = ConcurrentHashMap.newKeySet();
+        List<Content> filterContents = new ArrayList<>();
         // 先对内容池内部去重
-        List<Content> middleContent = innerDeduplication(param.getContents(), filterContents);
+        List<Content> middleContent = innerDeduplication(param.getContents(), filterContentIdSet, filterContents);
         // 内容池间进行去重
-        result = groupDeduplication(middleContent, filterContents);
+        result = groupDeduplication(middleContent, filterContentIdSet, filterContents);
 
         filterResult.setContentIds(result);
         filterResult.setFilterContent(filterContents);
         return filterResult;
     }
 
-    private List<Content> innerDeduplication(List<Content> contentList, List<Content> filterContents) {
+    private List<Content> innerDeduplication(List<Content> contentList, Set<String> filterContentIdSet, 
+            List<Content> filterContents) {
         List<Content> result = new ArrayList<>();
         Map<String, List<Content>> contentMap = contentList.stream().collect(Collectors.groupingBy(Content::getContentPoolType));
         List<Future<List<Content>>> futures = new ArrayList<>();
@@ -53,15 +56,24 @@ public class DeDuplicationStrategy implements FilterStrategy {
                     }
                     List<Content> res = new ArrayList<>(contents.size());
                     Set<String> titles = new HashSet<>(contents.size());
+                    List<Content> localFilterContents = new ArrayList<>();
                     for (Content c : contents) {
                         if (titles.contains(c.getTitle())) {
                             c.setFilterReason("重复文章");
-                            filterContents.add(c);
+                            localFilterContents.add(c);
                         } else {
                             res.add(c);
                             titles.add(c.getTitle());
                         }
                     }
+                    // 批量添加到全局filterContents,减少锁竞争
+                    synchronized (filterContents) {
+                        for (Content c : localFilterContents) {
+                            if (filterContentIdSet.add(c.getId())) {
+                                filterContents.add(c);
+                            }
+                        }
+                    }
                     return res;
                 } finally {
                     cdl.countDown();
@@ -86,7 +98,8 @@ public class DeDuplicationStrategy implements FilterStrategy {
         return result;
     }
 
-    private List<String> groupDeduplication(List<Content> contentList, List<Content> filterContents) {
+    private List<String> groupDeduplication(List<Content> contentList, Set<String> filterContentIdSet,
+            List<Content> filterContents) {
         List<String> result = new ArrayList<>(contentList.size());
         Set<String> titles = new HashSet<>(contentList.size());
         Map<String, List<Content>> contentMap = contentList.stream().collect(Collectors.groupingBy(Content::getContentPoolType));
@@ -105,7 +118,9 @@ public class DeDuplicationStrategy implements FilterStrategy {
             for (Content content : contents) {
                 if (titles.contains(content.getTitle())) {
                     content.setFilterReason("重复文章");
-                    filterContents.add(content);
+                    if (filterContentIdSet.add(content.getId())) {
+                        filterContents.add(content);
+                    }
                 } else {
                     result.add(content.getId());
                     titles.add(content.getTitle());

+ 26 - 17
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/filter/strategy/InfiniteHisTitleStrategy.java

@@ -1,5 +1,6 @@
 package com.tzld.longarticle.recommend.server.service.recommend.filter.strategy;
 
+import com.google.common.collect.Lists;
 import com.tzld.longarticle.recommend.server.common.ThreadPoolFactory;
 import com.tzld.longarticle.recommend.server.common.enums.recommend.ArticleTypeEnum;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
@@ -44,25 +45,31 @@ public class InfiniteHisTitleStrategy implements FilterStrategy {
         List<String> allTitleList = allArticleList.stream().map(Article::getTitle).distinct().collect(Collectors.toList());
         List<Article> qunfaArticleList = articleListRemoteService.articleList(param.getGhId(), firstSecondIndex, ArticleTypeEnum.QUNFA.getVal());
         List<String> qunfaTitleList = qunfaArticleList.stream().map(Article::getTitle).distinct().collect(Collectors.toList());
-        List<Future<Content>> futures = new ArrayList<>();
-        CountDownLatch cdl = new CountDownLatch(param.getContents().size());
-        for (Content content : param.getContents()) {
-            Future<Content> future = pool.submit(() -> {
+        // 优化:分批处理,避免创建过多任务导致线程池队列溢出
+        List<List<Content>> partitions = Lists.partition(param.getContents(), 500);
+        List<Future<List<Content>>> futures = new ArrayList<>();
+        CountDownLatch cdl = new CountDownLatch(partitions.size());
+        
+        for (List<Content> partition : partitions) {
+            Future<List<Content>> future = pool.submit(() -> {
                 try {
-                    boolean isDuplicate = TitleSimilarCheckUtil.isDuplicateContent(content.getTitle(), qunfaTitleList, TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
-                    if (!isDuplicate) {
-                        isDuplicate = TitleSimilarCheckUtil.isDuplicateContent(content.getTitle(), allTitleList, TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
+                    for (Content content : partition) {
+                        boolean isDuplicate = TitleSimilarCheckUtil.isDuplicateContent(content.getTitle(), qunfaTitleList, TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
+                        if (!isDuplicate) {
+                            isDuplicate = TitleSimilarCheckUtil.isDuplicateContent(content.getTitle(), allTitleList, TitleSimilarCheckUtil.SIMILARITY_THRESHOLD);
+                        }
+                        if (isDuplicate) {
+                            content.setFilterReason("历史已发布文章");
+                        }
                     }
-                    if (isDuplicate) {
-                        content.setFilterReason("历史已发布文章");
-                    }
-                    return content;
+                    return partition;
                 } finally {
                     cdl.countDown();
                 }
             });
             futures.add(future);
         }
+        
         try {
             cdl.await();
         } catch (InterruptedException e) {
@@ -70,13 +77,15 @@ public class InfiniteHisTitleStrategy implements FilterStrategy {
             return null;
         }
 
-        for (Future<Content> f : futures) {
+        for (Future<List<Content>> f : futures) {
             try {
-                Content content = f.get();
-                if (StringUtils.hasText(content.getFilterReason())) {
-                    filterContents.add(content);
-                } else {
-                    result.add(content.getId());
+                List<Content> partition = f.get();
+                for (Content content : partition) {
+                    if (StringUtils.hasText(content.getFilterReason())) {
+                        filterContents.add(content);
+                    } else {
+                        result.add(content.getId());
+                    }
                 }
             } catch (Exception e) {
                 log.error("future get error ", e);

+ 117 - 52
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/recall/RecallService.java

@@ -202,16 +202,13 @@ public class RecallService implements ApplicationContextAware {
 
     private List<Content> getAllContent(RecallParam param) {
         long t1 = System.currentTimeMillis();
+        // 使用流式获取,避免一次性加载所有数据到内存
         List<Content> content = aigcWaitingPublishContentService.getAllContentByDB(param.getPlanId(), param.getAccountId());
         long t2 = System.currentTimeMillis();
         CostMonitor.logCost("Recall", "GetAllContents", t2 - t1);
-        ContentCountMonitor.logCount("Recall", "GetAllContents", Objects.isNull(content) ? 0 : content.size());
-//            // 临时过滤文章视频不匹配content
-//            filterNotMatchContent(content);
-        // 过滤仅保留审核通过content
-        // 已通过aigc提前过滤视频审核通过 暂时取消判断
-//        filterAuditPassContent(content);
-//        ContentCountMonitor.logCount("Recall", "Filter", Objects.isNull(content) ? 0 : content.size());
+        int originalSize = Objects.isNull(content) ? 0 : content.size();
+        ContentCountMonitor.logCount("Recall", "GetAllContents", originalSize);
+        
         if (CollectionUtils.isEmpty(content)) {
             FeishuMessageSender.sendWebHookMessage(FeishuRobotIdEnum.RECOMMEND.getRobotId(),
                     "内容召回失败\n"
@@ -220,13 +217,23 @@ public class RecallService implements ApplicationContextAware {
                             + "账号名称: " + param.getAccountName());
             return content;
         }
+        
+        // 如果数据量过大,分批处理并触发GC
+        if (originalSize > 10000) {
+            log.warn("Large content list detected: {}, processing in stream mode", originalSize);
+        }
+        
         long t3 = System.currentTimeMillis();
-        // 标题历史均值
-        setTitleAvgViewCount(content, param.getGhId(), param.getType(), param.getStrategy());
+        // 标题历史均值 - 流式处理,减少内存峰值
+        setTitleAvgViewCountOptimized(content, param.getGhId(), param.getType(), param.getStrategy());
         long t4 = System.currentTimeMillis();
         CostMonitor.logCost("Recall", "SetAvgViewCount", t4 - t3);
-//        // 视频内容池查询抓取时间
-//        setVideoContent(content);
+        
+        // 主动触发GC,回收大对象
+        if (originalSize > 20000) {
+            System.gc();
+        }
+        
         return content;
     }
 
@@ -339,63 +346,121 @@ public class RecallService implements ApplicationContextAware {
         }
     }
 
-    public void setTitleAvgViewCount(List<Content> contentList, String ghId, String type, String strategy) {
+    /**
+     * 优化版:流式处理,减少大对象内存占用
+     */
+    public void setTitleAvgViewCountOptimized(List<Content> contentList, String ghId, String type, String strategy) {
         long start = System.currentTimeMillis();
-        contentList.forEach(content -> content.setTitleMd5(Md5Util.encoderByMd5(content.getTitle())));
-        List<String> sourceIdList = contentList.stream().map(Content::getSourceId).distinct().collect(Collectors.toList());
-        // 根据sourceId查询数据库获取数据
-        List<ArticleTitleHisCache> articleTitleHisCacheList = new ArrayList<>(sourceIdList.size());
+        int totalSize = contentList.size();
+        
+        // 分批计算titleMd5,避免并行流的大内存开销
+        int batchSize = Math.min(1000, totalSize);
+        for (int i = 0; i < totalSize; i += batchSize) {
+            List<Content> batch = contentList.subList(i, Math.min(i + batchSize, totalSize));
+            for (Content content : batch) {
+                content.setTitleMd5(Md5Util.encoderByMd5(content.getTitle()));
+            }
+        }
+        
+        // 使用流式处理获取sourceId,避免创建大List
+        Set<String> sourceIdSet = new HashSet<>((int)(totalSize / 0.75f) + 1);
+        for (Content content : contentList) {
+            sourceIdSet.add(content.getSourceId());
+        }
+        
+        // 根据sourceId查询数据库获取数据 - 分批查询,及时释放中间结果
+        Map<String, ArticleTitleHisCache> articleTitleHisCacheMap = new HashMap<>((int)(sourceIdSet.size() / 0.75f) + 1);
+        List<String> sourceIdList = new ArrayList<>(sourceIdSet);
+        
         for (List<String> partition : Lists.partition(sourceIdList, 1000)) {
-            articleTitleHisCacheList.addAll(articleTitleHisCacheRepository.getBySourceIdInAndType(partition, type));
+            List<ArticleTitleHisCache> caches = articleTitleHisCacheRepository.getBySourceIdInAndType(partition, type);
+            for (ArticleTitleHisCache cache : caches) {
+                articleTitleHisCacheMap.put(cache.getSourceId(), cache);
+            }
+            // 及时清理,帮助GC
+            caches.clear();
         }
-        Map<String, ArticleTitleHisCache> articleTitleHisCacheMap = articleTitleHisCacheList.stream()
-                .collect(Collectors.toMap(ArticleTitleHisCache::getSourceId, Function.identity()));
-        // sourceId 进行过滤 排除缓存中数据 重新走下方查询
-        sourceIdList.removeIf(articleTitleHisCacheMap::containsKey);
+        
         // 获取账号相关性
         List<AccountCorrelation> accountCorrelationList = accountCorrelationRepository.findByGhIdAndStatus(ghId, 1);
-        Map<String, Double> accountCorrelationMap = accountCorrelationList.stream().collect(
-                Collectors.toMap(AccountCorrelation::getRelGhId, AccountCorrelation::getCorrelation));
+        Map<String, Double> accountCorrelationMap = new HashMap<>((int)(accountCorrelationList.size() / 0.75f) + 1);
+        for (AccountCorrelation correlation : accountCorrelationList) {
+            accountCorrelationMap.put(correlation.getRelGhId(), correlation.getCorrelation());
+        }
+        // 及时清理
+        accountCorrelationList.clear();
+        
         Account account = accountRepository.getByGhId(ghId);
+        
+        // 流式处理内容列表,减少内存峰值
         List<Content> newCacheSaveList = new ArrayList<>();
         Set<String> newCacheSourceIdSet = new HashSet<>();
-        for (Content content : contentList) {
-            if (articleTitleHisCacheMap.containsKey(content.getSourceId())) {
-                ArticleTitleHisCache cache = articleTitleHisCacheMap.get(content.getSourceId());
-                List<ContentHisPublishArticle> hisPublishArticleList =
-                        JSONArray.parseArray(cache.getHisPublishArticleList(), ContentHisPublishArticle.class);
-                if (CollectionUtils.isNotEmpty(hisPublishArticleList)) {
-                    for (ContentHisPublishArticle article : hisPublishArticleList) {
-                        article.setCorrelation(Optional.ofNullable(accountCorrelationMap.get(article.getGhId())).orElse(0.0));
+        
+        // 分批处理,每批处理后及时清理
+        int processBatchSize = 500;
+        for (int i = 0; i < totalSize; i += processBatchSize) {
+            List<Content> batch = contentList.subList(i, Math.min(i + processBatchSize, totalSize));
+            
+            for (Content content : batch) {
+                if (articleTitleHisCacheMap.containsKey(content.getSourceId())) {
+                    ArticleTitleHisCache cache = articleTitleHisCacheMap.get(content.getSourceId());
+                    // 直接解析并设置,避免中间List
+                    String hisListJson = cache.getHisPublishArticleList();
+                    if (StringUtils.hasText(hisListJson)) {
+                        List<ContentHisPublishArticle> hisPublishArticleList = 
+                                JSONArray.parseArray(hisListJson, ContentHisPublishArticle.class);
+                        if (CollectionUtils.isNotEmpty(hisPublishArticleList)) {
+                            for (ContentHisPublishArticle article : hisPublishArticleList) {
+                                article.setCorrelation(accountCorrelationMap.getOrDefault(article.getGhId(), 0.0));
+                            }
+                        }
+                        content.setHisPublishArticleList(hisPublishArticleList);
+                    }
+                    
+                    if (StringUtils.hasText(cache.getCategory())) {
+                        content.setCategory(JSONArray.parseArray(cache.getCategory(), String.class));
+                    }
+                    content.setProducePlanId(cache.getPlanId());
+                    content.setKimiSafeScore(cache.getKimiSafeScore());
+                    content.setRootPublishTimestamp(cache.getRootPublishTimestamp());
+                    content.setSourceCreateTimestamp(cache.getCreateTimestamp());
+                    content.setSourceAuditTimestamp(cache.getAuditTimestamp());
+                    setT0Data(content, account);
+                } else {
+                    if (!newCacheSourceIdSet.contains(content.getSourceId())
+                            && Arrays.asList(PublishPlanInputSourceTypesEnum.producePlan.getVal(),
+                            PublishPlanInputSourceTypesEnum.produceContent.getVal(),
+                            PublishPlanInputSourceTypesEnum.longArticleVideoPoolSource.getVal()
+                    ).contains(content.getSourceType())) {
+                        newCacheSaveList.add(content);
+                        newCacheSourceIdSet.add(content.getSourceId());
                     }
                 }
-                if (StringUtils.hasText(cache.getCategory())) {
-                    content.setCategory(JSONArray.parseArray(cache.getCategory(), String.class));
-                }
-                content.setProducePlanId(cache.getPlanId());
-                content.setKimiSafeScore(cache.getKimiSafeScore());
-                content.setRootPublishTimestamp(cache.getRootPublishTimestamp());
-                content.setHisPublishArticleList(hisPublishArticleList);
-                content.setSourceCreateTimestamp(cache.getCreateTimestamp());
-                content.setSourceAuditTimestamp(cache.getAuditTimestamp());
-                setT0Data(content, account);
-            } else {
-                if (!newCacheSourceIdSet.contains(content.getSourceId())
-                        && Arrays.asList(PublishPlanInputSourceTypesEnum.producePlan.getVal(),
-                        PublishPlanInputSourceTypesEnum.produceContent.getVal(),
-                        PublishPlanInputSourceTypesEnum.longArticleVideoPoolSource.getVal()
-                ).contains(content.getSourceType())) {
-                    newCacheSaveList.add(content);
-                    newCacheSourceIdSet.add(content.getSourceId());
+                if (Objects.isNull(content.getContentPoolType()) && RankStrategyEnum.FWH_STRATEGY.getStrategy().equals(strategy)) {
+                    content.setContentPoolType(ContentPoolEnum.autoArticlePoolLevel1.getContentPool());
                 }
             }
-            if (Objects.isNull(content.getContentPoolType()) && RankStrategyEnum.FWH_STRATEGY.getStrategy().equals(strategy)) {
-                content.setContentPoolType(ContentPoolEnum.autoArticlePoolLevel1.getContentPool());
+            
+            // 每处理一批,如果数据量大,提示GC
+            if (totalSize > 20000 && i > 0 && i % 2000 == 0) {
+                Thread.yield(); // 让出CPU,给GC机会
             }
         }
+        
+        // 及时清理大Map
+        articleTitleHisCacheMap.clear();
+        accountCorrelationMap.clear();
+        
         // 写入缓存
         saveArticleTitleHisCache(newCacheSaveList, type);
-        log.info("setTitleAvgViewCount cost:{}", System.currentTimeMillis() - start);
+        log.info("setTitleAvgViewCountOptimized cost:{}, size:{}", System.currentTimeMillis() - start, totalSize);
+    }
+    
+    /**
+     * 原方法保留,用于兼容性
+     */
+    public void setTitleAvgViewCount(List<Content> contentList, String ghId, String type, String strategy) {
+        setTitleAvgViewCountOptimized(contentList, ghId, type, strategy);
     }
 
     private void saveArticleTitleHisCache(List<Content> saveList, String type) {

+ 38 - 8
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/score/strategy/CategoryStrategy.java

@@ -2,6 +2,8 @@ package com.tzld.longarticle.recommend.server.service.recommend.score.strategy;
 
 import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.tzld.longarticle.recommend.server.common.enums.StatusEnum;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.AccountCategory;
@@ -19,7 +21,9 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 
+import javax.annotation.PostConstruct;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Slf4j
@@ -43,7 +47,39 @@ public class CategoryStrategy implements ScoreStrategy {
 
     @ApolloJsonValue("${category.strategy.add.score:{}}")
     private Map<String, Double> categoryAddScoreConfig;
+    
+    // 本地缓存,缓存账号品类配置,5分钟过期
+    private Cache<String, AccountCategory> accountCategoryCache;
 
+    @PostConstruct
+    public void init() {
+        accountCategoryCache = CacheBuilder.newBuilder()
+                .maximumSize(10000)
+                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .build();
+    }
+    
+    /**
+     * 从缓存获取账号品类配置
+     */
+    private AccountCategory getAccountCategoryFromCache(String ghId, Integer version) {
+        String cacheKey = ghId + "_" + version;
+        AccountCategory category = accountCategoryCache.getIfPresent(cacheKey);
+        if (category == null) {
+            List<AccountCategory> accountCategoryList = accountCategoryRepository.getByGhIdAndStatusAndVersion(
+                    ghId, StatusEnum.ONE.getCode(), version);
+            if (CollectionUtils.isNotEmpty(accountCategoryList)) {
+                category = accountCategoryList.stream()
+                        .sorted(Comparator.comparing(AccountCategory::getDt, Comparator.reverseOrder()))
+                        .findFirst().orElse(null);
+                if (category != null) {
+                    accountCategoryCache.put(cacheKey, category);
+                }
+            }
+        }
+        return category;
+    }
+    
     @Override
     public List<Score> score(ScoreParam param) {
         long start = System.currentTimeMillis();
@@ -51,14 +87,8 @@ public class CategoryStrategy implements ScoreStrategy {
         if (CollectionUtils.isEmpty(param.getContents())) {
             return scores;
         }
-        List<AccountCategory> accountCategoryList = accountCategoryRepository.getByGhIdAndStatusAndVersion(
-                param.getGhId(), StatusEnum.ONE.getCode(), activeVersion);
-        AccountCategory accountCategory = null;
-        if (CollectionUtils.isNotEmpty(accountCategoryList)) {
-            accountCategory = accountCategoryList.stream()
-                    .sorted(Comparator.comparing(AccountCategory::getDt, Comparator.reverseOrder()))
-                    .findFirst().get();
-        }
+        // 从缓存获取账号品类配置,避免每次循环查询DB
+        AccountCategory accountCategory = getAccountCategoryFromCache(param.getGhId(), activeVersion);
         for (Content content : param.getContents()) {
             if (CollectionUtils.isEmpty(content.getCategory())) {
                 continue;

+ 174 - 106
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/score/strategy/ViewCountRateStrategy.java

@@ -1,6 +1,8 @@
 package com.tzld.longarticle.recommend.server.service.recommend.score.strategy;
 
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.collect.Lists;
+import com.tzld.longarticle.recommend.server.common.ThreadPoolFactory;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
 import com.tzld.longarticle.recommend.server.model.dto.ContentHisPublishArticle;
 import com.tzld.longarticle.recommend.server.model.entity.crawler.AccountAvgInfo;
@@ -14,13 +16,14 @@ import com.tzld.longarticle.recommend.server.util.MathUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 @Component
 @Slf4j
@@ -36,9 +39,10 @@ public class ViewCountRateStrategy implements ScoreStrategy {
     @ApolloJsonValue("${view.count.rate.account.his.filter.days:{}}")
     private Map<String, Integer> accountHisFilterDaysConfig;
 
+    private final ExecutorService pool = ThreadPoolFactory.scorePool();
+
     @Override
     public List<Score> score(ScoreParam param) {
-        List<Score> scores = new ArrayList<>();
         String[] contentPools = accountContentPoolConfigService.getContentPools(param.getAccountName());
         List<AccountAvgInfo> avgInfoList = accountAvgInfoRepository.getAllByGhIdEqualsAndStatusEquals(param.getGhId(), 1);
         double avgViewCountFirst = accountIndexAvgViewCountService.getAvgReadCountByDB(avgInfoList, param.getGhId(), 1);
@@ -46,123 +50,187 @@ public class ViewCountRateStrategy implements ScoreStrategy {
         if (avgViewCountFirst < 10) {
             avgViewCountFirst = 20000D;
         }
-        for (Content content : param.getContents()) {
-            for (int i = 0; i < contentPools.length; i++) {
-                if (!contentPools[i].equals(content.getContentPoolType())) {
-                    continue;
-                }
-                double avgViewCountPos = accountIndexAvgViewCountService.getAvgReadCountByDB(avgInfoList, param.getGhId(), i + 1);
-                // 缺省头条均值设置为2w,次条为1w
-                if (avgViewCountPos < 10) {
-                    if (i == 0) {
-                        avgViewCountPos = 20000D;
-                    } else if (i == 1) {
-                        avgViewCountPos = 10000D;
-                    } else {
-                        avgViewCountPos = 400D;
+
+        // 数据量较小时直接串行处理,避免线程切换开销
+        if (param.getContents().size() <= 1000) {
+            return calculateScoresSequential(param.getContents(), contentPools, avgInfoList,
+                    param.getGhId(), avgViewCountFirst, param);
+        }
+
+        // 数据量较大时并行处理
+        return calculateScoresParallel(param.getContents(), contentPools, avgInfoList,
+                param.getGhId(), avgViewCountFirst, param);
+    }
+
+    /**
+     * 串行计算评分
+     */
+    private List<Score> calculateScoresSequential(List<Content> contents, String[] contentPools,
+                                                  List<AccountAvgInfo> avgInfoList, String ghId, double avgViewCountFirst, ScoreParam param) {
+        List<Score> scores = new ArrayList<>();
+        for (Content content : contents) {
+            Score score = calculateSingleContentScore(content, contentPools, avgInfoList,
+                    ghId, avgViewCountFirst, param);
+            if (score != null) {
+                scores.add(score);
+            }
+        }
+        return scores;
+    }
+
+    /**
+     * 并行计算评分
+     */
+    private List<Score> calculateScoresParallel(List<Content> contents, String[] contentPools,
+                                                List<AccountAvgInfo> avgInfoList, String ghId, double avgViewCountFirst, ScoreParam param) {
+        List<List<Content>> batches = Lists.partition(contents, 500);
+        List<Future<List<Score>>> futures = new ArrayList<>();
+
+        for (List<Content> batch : batches) {
+            Future<List<Score>> future = pool.submit(() -> {
+                List<Score> batchScores = new ArrayList<>();
+                for (Content content : batch) {
+                    Score score = calculateSingleContentScore(content, contentPools, avgInfoList,
+                            ghId, avgViewCountFirst, param);
+                    if (score != null) {
+                        batchScores.add(score);
                     }
                 }
-                // 阅读量之和
-                double showViewCountSum = 0D;
-                // 阅读均值置信区间上限之和
-                double readAvgCiUpperSum = 0D;
-                // 头条阅读量之和
-                double showViewCountSumFirst = 0D;
-                // 头条阅读均值置信区间上限之和
-                double readAvgCiUpperSumFirst = 0D;
-                // 次条阅读量之和
-                double showViewCountSumSecond = 0D;
-                // 次条阅读均值置信区间上限之和
-                double readAvgCiUpperSumSecond = 0D;
-                // 最大阅读均值置信区间上限
-                double maxReadAvgCiUpper = 0D;
-                if (CollectionUtils.isEmpty(content.getHisPublishArticleList())) {
+                return batchScores;
+            });
+            futures.add(future);
+        }
+
+        List<Score> scores = new ArrayList<>();
+        for (Future<List<Score>> future : futures) {
+            try {
+                scores.addAll(future.get());
+            } catch (Exception e) {
+                log.error("ViewCountRateStrategy batch process error", e);
+            }
+        }
+        return scores;
+    }
+
+    /**
+     * 计算单个内容的评分
+     */
+    private Score calculateSingleContentScore(Content content, String[] contentPools,
+                                              List<AccountAvgInfo> avgInfoList, String ghId, double avgViewCountFirst, ScoreParam param) {
+        for (int i = 0; i < contentPools.length; i++) {
+            if (!contentPools[i].equals(content.getContentPoolType())) {
+                continue;
+            }
+            double avgViewCountPos = accountIndexAvgViewCountService.getAvgReadCountByDB(avgInfoList, param.getGhId(), i + 1);
+            // 缺省头条均值设置为2w,次条为1w
+            if (avgViewCountPos < 10) {
+                if (i == 0) {
+                    avgViewCountPos = 20000D;
+                } else if (i == 1) {
+                    avgViewCountPos = 10000D;
+                } else {
+                    avgViewCountPos = 400D;
+                }
+            }
+            // 阅读量之和
+            double showViewCountSum = 0D;
+            // 阅读均值置信区间上限之和
+            double readAvgCiUpperSum = 0D;
+            // 头条阅读量之和
+            double showViewCountSumFirst = 0D;
+            // 头条阅读均值置信区间上限之和
+            double readAvgCiUpperSumFirst = 0D;
+            // 次条阅读量之和
+            double showViewCountSumSecond = 0D;
+            // 次条阅读均值置信区间上限之和
+            double readAvgCiUpperSumSecond = 0D;
+            // 最大阅读均值置信区间上限
+            double maxReadAvgCiUpper = 0D;
+            if (CollectionUtils.isEmpty(content.getHisPublishArticleList())) {
+                continue;
+            }
+            for (ContentHisPublishArticle hisItem : content.getHisPublishArticleList()) {
+                // 过滤掉发布时间晚于19点数据
+                if (ScoreStrategy.hisContentLateFilter(hisItem.getPublishTimestamp())) {
                     continue;
                 }
-                for (ContentHisPublishArticle hisItem : content.getHisPublishArticleList()) {
-                    // 过滤掉发布时间晚于19点数据
-                    if (ScoreStrategy.hisContentLateFilter(hisItem.getPublishTimestamp())) {
-                        continue;
-                    }
-                    // 过滤掉历史数据中,阅读量为0的文章
-                    Integer hisFilterDays = accountHisFilterDaysConfig.get(param.getGhId());
-                    if (Objects.nonNull(hisFilterDays)
-                            && hisItem.getPublishTimestamp() < System.currentTimeMillis() / 1000 - hisFilterDays * 24 * 60 * 60) {
-                        continue;
-                    }
-                    if (hisItem.isInnerAccount() && Objects.nonNull(hisItem.getViewCount())
-                            && hisItem.getViewCount() > 0 && Objects.nonNull(hisItem.getReadAvgCiUpper())
-                            && hisItem.getReadAvgCiUpper() > 0) {
-                        maxReadAvgCiUpper = Math.max(maxReadAvgCiUpper, hisItem.getReadAvgCiUpper());
-                        if (hisItem.getItemIndex() == 1) {
-                            showViewCountSumFirst += hisItem.getViewCount();
-                            readAvgCiUpperSumFirst += hisItem.getReadAvgCiUpper();
-                        } else if (hisItem.getItemIndex() == 2) {
-                            if (Objects.nonNull(hisItem.getFirstViewCount()) && hisItem.getFirstViewCount() > 0 &&
-                                    Objects.nonNull(hisItem.getFirstViewCountRate()) && hisItem.getFirstViewCountRate() > 0) {
-                                showViewCountSumSecond += hisItem.getViewCount();
-                                if (hisItem.getFirstViewCountRate() > 1) {
-                                    // 对于头条均值倍数大于1的情况,次条均值线性增加,用于debias;
-                                    // TODO: 对于小于1的情况,是否要减去?
-                                    readAvgCiUpperSumSecond += hisItem.getReadAvgCiUpper() * hisItem.getFirstViewCountRate();
-                                } else {
-                                    readAvgCiUpperSumSecond += hisItem.getReadAvgCiUpper();
-                                }
+                // 过滤掉历史数据中,阅读量为0的文章
+                Integer hisFilterDays = accountHisFilterDaysConfig.get(param.getGhId());
+                if (Objects.nonNull(hisFilterDays)
+                        && hisItem.getPublishTimestamp() < System.currentTimeMillis() / 1000 - hisFilterDays * 24 * 60 * 60) {
+                    continue;
+                }
+                if (hisItem.isInnerAccount() && Objects.nonNull(hisItem.getViewCount())
+                        && hisItem.getViewCount() > 0 && Objects.nonNull(hisItem.getReadAvgCiUpper())
+                        && hisItem.getReadAvgCiUpper() > 0) {
+                    maxReadAvgCiUpper = Math.max(maxReadAvgCiUpper, hisItem.getReadAvgCiUpper());
+                    if (hisItem.getItemIndex() == 1) {
+                        showViewCountSumFirst += hisItem.getViewCount();
+                        readAvgCiUpperSumFirst += hisItem.getReadAvgCiUpper();
+                    } else if (hisItem.getItemIndex() == 2) {
+                        if (Objects.nonNull(hisItem.getFirstViewCount()) && hisItem.getFirstViewCount() > 0 &&
+                                Objects.nonNull(hisItem.getFirstViewCountRate()) && hisItem.getFirstViewCountRate() > 0) {
+                            showViewCountSumSecond += hisItem.getViewCount();
+                            if (hisItem.getFirstViewCountRate() > 1) {
+                                // 对于头条均值倍数大于1的情况,次条均值线性增加,用于debias;
+                                // TODO: 对于小于1的情况,是否要减去?
+                                readAvgCiUpperSumSecond += hisItem.getReadAvgCiUpper() * hisItem.getFirstViewCountRate();
+                            } else {
+                                readAvgCiUpperSumSecond += hisItem.getReadAvgCiUpper();
                             }
-                        } else {
-                            if (Objects.nonNull(hisItem.getFirstViewCount()) && hisItem.getFirstViewCount() > 0
-                                    && Objects.nonNull(hisItem.getFirstViewCountRate()) && hisItem.getFirstViewCountRate() > 0) {
-                                showViewCountSum += hisItem.getViewCount();
-                                if (hisItem.getFirstViewCountRate() > 1) {
-                                    // 对于头条均值倍数大于1的情况,次条均值线性增加,用于debias;
-                                    // TODO: 对于小于1的情况,是否要减去?
-                                    readAvgCiUpperSum += hisItem.getReadAvgCiUpper() * hisItem.getFirstViewCountRate();
-                                } else {
-                                    readAvgCiUpperSum += hisItem.getReadAvgCiUpper();
-                                }
+                        }
+                    } else {
+                        if (Objects.nonNull(hisItem.getFirstViewCount()) && hisItem.getFirstViewCount() > 0
+                                && Objects.nonNull(hisItem.getFirstViewCountRate()) && hisItem.getFirstViewCountRate() > 0) {
+                            showViewCountSum += hisItem.getViewCount();
+                            if (hisItem.getFirstViewCountRate() > 1) {
+                                // 对于头条均值倍数大于1的情况,次条均值线性增加,用于debias;
+                                // TODO: 对于小于1的情况,是否要减去?
+                                readAvgCiUpperSum += hisItem.getReadAvgCiUpper() * hisItem.getFirstViewCountRate();
+                            } else {
+                                readAvgCiUpperSum += hisItem.getReadAvgCiUpper();
                             }
                         }
                     }
                 }
-                double viewCountRate = 0D; // 设置默认值
-                double bigRateW = 1D;
-                // 如果有头条反馈数据,优先选取头条反馈数据;
-                if (showViewCountSumFirst > 0) {
-                    showViewCountSum = showViewCountSumFirst;
-                    readAvgCiUpperSum = readAvgCiUpperSumFirst;
-                } else if (showViewCountSumSecond > 0) {
-                    showViewCountSum = showViewCountSumSecond;
-                    readAvgCiUpperSum = readAvgCiUpperSumSecond;
-                    // 如果是大号头条,则降权
-                    if (avgViewCountFirst >= 3000 && i == 0) {
-                        bigRateW = 0.001D;
-                    }
-                }
-                // 均值倍数
-                if (readAvgCiUpperSum > 0) {
-                    viewCountRate = showViewCountSum / readAvgCiUpperSum;
+            }
+            double viewCountRate = 0D; // 设置默认值
+            double bigRateW = 1D;
+            // 如果有头条反馈数据,优先选取头条反馈数据;
+            if (showViewCountSumFirst > 0) {
+                showViewCountSum = showViewCountSumFirst;
+                readAvgCiUpperSum = readAvgCiUpperSumFirst;
+            } else if (showViewCountSumSecond > 0) {
+                showViewCountSum = showViewCountSumSecond;
+                readAvgCiUpperSum = readAvgCiUpperSumSecond;
+                // 如果是大号头条,则降权
+                if (avgViewCountFirst >= 3000 && i == 0) {
+                    bigRateW = 0.001D;
                 }
-                // 置信度
-                double viewCountRateW = MathUtils.sigmoid(readAvgCiUpperSum, 0.002, avgViewCountPos);
-                double viewCountRateScore = 0;
+            }
+            // 均值倍数
+            if (readAvgCiUpperSum > 0) {
+                viewCountRate = showViewCountSum / readAvgCiUpperSum;
+            }
+            // 置信度
+            double viewCountRateW = MathUtils.sigmoid(readAvgCiUpperSum, 0.002, avgViewCountPos);
+            double viewCountRateScore = 0;
 
-                if (viewCountRate > 0) {
-                    // 最终分数 = 置信度 * 均值倍数
-                    if (viewCountRate > 1 && bigRateW < 1) {
-                        // 如果是大号头条,则降权
-                        viewCountRateScore = viewCountRateW * ((viewCountRate - 1) * bigRateW + 1);
-                    } else {
-                        viewCountRateScore = viewCountRateW * viewCountRate;
-                    }
+            if (viewCountRate > 0) {
+                // 最终分数 = 置信度 * 均值倍数
+                if (viewCountRate > 1 && bigRateW < 1) {
+                    // 如果是大号头条,则降权
+                    viewCountRateScore = viewCountRateW * ((viewCountRate - 1) * bigRateW + 1);
+                } else {
+                    viewCountRateScore = viewCountRateW * viewCountRate;
                 }
-                Score score = new Score();
-                score.setStrategy(this);
-                score.setContentId(content.getId());
-                score.setScore(viewCountRateScore);
-                scores.add(score);
-                break;
             }
+            Score score = new Score();
+            score.setStrategy(this);
+            score.setContentId(content.getId());
+            score.setScore(viewCountRateScore);
+            return score;
         }
-        return scores;
+        return null;
     }
 }