Преглед изворни кода

videoContentList 增加并发

wangyunpeng пре 1 дан
родитељ
комит
fec6c0cb69

+ 106 - 58
api-module/src/main/java/com/tzld/piaoquan/api/service/contentplatform/impl/ContentPlatformPlanServiceImpl.java

@@ -757,13 +757,27 @@ public class ContentPlatformPlanServiceImpl implements ContentPlatformPlanServic
             // 粉丝喜欢:
             //   公众号入口(type∈{0,1,4}):3 池 — priorScene + prior(传播头部) + growth(增长头部),每位独立等概率抽 + seed=nanoTime
             //   企微入口  (type∈{2,3}):2 池 — priorScene + prior(传播头部),严格 1:1 交替(无随机)
-            List<VideoContentItemVO> scene = fetchPriorSceneCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-            List<VideoContentItemVO> prior = fetchPriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-            if (isGzhEntryType(param.getType())) {
-                List<VideoContentItemVO> growth = fetchPriorGrowthCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-                list = interleavePriorPoolsRandom(scene, prior, growth, user);
-            } else {
-                list = interleavePriorWithScene(scene, prior);
+            boolean isGzh = isGzhEntryType(param.getType());
+            ExecutorService executor = Executors.newFixedThreadPool(isGzh ? 3 : 2);
+            try {
+                Future<List<VideoContentItemVO>> fScene = executor.submit(
+                        () -> fetchPriorSceneCandidates(param, user, DEMAND_CANDIDATE_LIMIT));
+                Future<List<VideoContentItemVO>> fPrior = executor.submit(
+                        () -> fetchPriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT));
+                Future<List<VideoContentItemVO>> fGrowth = isGzh ? executor.submit(
+                        () -> fetchPriorGrowthCandidates(param, user, DEMAND_CANDIDATE_LIMIT)) : null;
+
+                int timeoutSeconds = 30;
+                List<VideoContentItemVO> scene = getQuietly(fScene, timeoutSeconds);
+                List<VideoContentItemVO> prior = getQuietly(fPrior, timeoutSeconds);
+                if (isGzh) {
+                    List<VideoContentItemVO> growth = getQuietly(fGrowth, timeoutSeconds);
+                    list = interleavePriorPoolsRandom(scene, prior, growth, user);
+                } else {
+                    list = interleavePriorWithScene(scene, prior);
+                }
+            } finally {
+                executor.shutdown();
             }
         } else {
             list = fetchPosteriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
@@ -876,6 +890,19 @@ public class ContentPlatformPlanServiceImpl implements ContentPlatformPlanServic
         return true;
     }
 
+    /**
+     * Future.get 包装:异常/超时返回空列表,不阻塞整体流程。
+     * 语义与"该池无候选数据"一致——不影响其他池结果。
+     */
+    private List<VideoContentItemVO> getQuietly(Future<List<VideoContentItemVO>> f, int timeoutSeconds) {
+        try {
+            return f.get(timeoutSeconds, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.warn("fetch candidates timeout or error, fallback to empty list", e);
+            return new ArrayList<>();
+        }
+    }
+
     /**
      * 单源 hot:复用原 planMapperExt.getVideoCount + getVideoList 真分页链路。
      */
@@ -927,62 +954,83 @@ public class ContentPlatformPlanServiceImpl implements ContentPlatformPlanServic
      * 用 (userId ^ 当天日期) 作为种子,保证同一用户当天翻页顺序一致、刷新一致。
      */
     private Page<VideoContentItemVO> getInterleavedPage(VideoContentListParam param, ContentPlatformAccount user) {
-        List<VideoContentItemVO> priorScene = fetchPriorSceneCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-        List<VideoContentItemVO> prior = fetchPriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-        List<VideoContentItemVO> posterior = fetchPosteriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-        List<VideoContentItemVO> hot = fetchHotCandidates(param, user, HOT_CANDIDATE_LIMIT);
-        for (VideoContentItemVO v : priorScene) v.setSource(SOURCE_PRIOR);
-        for (VideoContentItemVO v : prior) v.setSource(SOURCE_PRIOR);
-        for (VideoContentItemVO v : posterior) v.setSource(SOURCE_POSTERIOR);
-        for (VideoContentItemVO v : hot) v.setSource(SOURCE_HOT);
-
-        List<List<VideoContentItemVO>> pools;
-        if (isGzhEntryType(param.getType())) {
-            List<VideoContentItemVO> priorGrowth = fetchPriorGrowthCandidates(param, user, DEMAND_CANDIDATE_LIMIT);
-            for (VideoContentItemVO v : priorGrowth) v.setSource(SOURCE_PRIOR);
-            pools = Arrays.asList(priorScene, prior, priorGrowth, posterior, hot);
-        } else {
-            pools = Arrays.asList(priorScene, prior, posterior, hot);
-        }
-        int N = pools.size();
-        int[] pointers = new int[N];
-        boolean[] exhausted = new boolean[N];
-        Set<Long> emittedIds = new HashSet<>();
-        Set<String> emittedTitles = new HashSet<>();
-        List<VideoContentItemVO> merged = new ArrayList<>();
-
-        long userSeed = user.getId() == null ? 0L : user.getId();
-        long seed = userSeed ^ LocalDate.now().toString().hashCode();
-        Random rng = new Random(seed);
-
-        while (true) {
-            boolean allExhausted = true;
-            for (boolean e : exhausted) {
-                if (!e) { allExhausted = false; break; }
+        boolean isGzh = isGzhEntryType(param.getType());
+        int poolCount = isGzh ? 5 : 4;
+        ExecutorService executor = Executors.newFixedThreadPool(poolCount);
+        try {
+            // 各池并行拉取,互不依赖
+            Future<List<VideoContentItemVO>> fPriorScene = executor.submit(
+                    () -> fetchPriorSceneCandidates(param, user, DEMAND_CANDIDATE_LIMIT));
+            Future<List<VideoContentItemVO>> fPrior = executor.submit(
+                    () -> fetchPriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT));
+            Future<List<VideoContentItemVO>> fPosterior = executor.submit(
+                    () -> fetchPosteriorCandidates(param, user, DEMAND_CANDIDATE_LIMIT));
+            Future<List<VideoContentItemVO>> fHot = executor.submit(
+                    () -> fetchHotCandidates(param, user, HOT_CANDIDATE_LIMIT));
+            Future<List<VideoContentItemVO>> fPriorGrowth = isGzh ? executor.submit(
+                    () -> fetchPriorGrowthCandidates(param, user, DEMAND_CANDIDATE_LIMIT)) : null;
+
+            int timeoutSeconds = 30;
+            List<VideoContentItemVO> priorScene  = getQuietly(fPriorScene, timeoutSeconds);
+            List<VideoContentItemVO> prior       = getQuietly(fPrior, timeoutSeconds);
+            List<VideoContentItemVO> posterior   = getQuietly(fPosterior, timeoutSeconds);
+            List<VideoContentItemVO> hot         = getQuietly(fHot, timeoutSeconds);
+
+            for (VideoContentItemVO v : priorScene) v.setSource(SOURCE_PRIOR);
+            for (VideoContentItemVO v : prior) v.setSource(SOURCE_PRIOR);
+            for (VideoContentItemVO v : posterior) v.setSource(SOURCE_POSTERIOR);
+            for (VideoContentItemVO v : hot) v.setSource(SOURCE_HOT);
+
+            List<List<VideoContentItemVO>> pools;
+            if (isGzh) {
+                List<VideoContentItemVO> priorGrowth = getQuietly(fPriorGrowth, timeoutSeconds);
+                for (VideoContentItemVO v : priorGrowth) v.setSource(SOURCE_PRIOR);
+                pools = Arrays.asList(priorScene, prior, priorGrowth, posterior, hot);
+            } else {
+                pools = Arrays.asList(priorScene, prior, posterior, hot);
             }
-            if (allExhausted) break;
+            int N = pools.size();
+            int[] pointers = new int[N];
+            boolean[] exhausted = new boolean[N];
+            Set<Long> emittedIds = new HashSet<>();
+            Set<String> emittedTitles = new HashSet<>();
+            List<VideoContentItemVO> merged = new ArrayList<>();
+
+            long userSeed = user.getId() == null ? 0L : user.getId();
+            long seed = userSeed ^ LocalDate.now().toString().hashCode();
+            Random rng = new Random(seed);
+
+            while (true) {
+                boolean allExhausted = true;
+                for (boolean e : exhausted) {
+                    if (!e) { allExhausted = false; break; }
+                }
+                if (allExhausted) break;
 
-            List<Integer> alive = new ArrayList<>(N);
-            for (int i = 0; i < N; i++) {
-                if (!exhausted[i]) alive.add(i);
-            }
-            int cur = alive.get(rng.nextInt(alive.size()));
+                List<Integer> alive = new ArrayList<>(N);
+                for (int i = 0; i < N; i++) {
+                    if (!exhausted[i]) alive.add(i);
+                }
+                int cur = alive.get(rng.nextInt(alive.size()));
 
-            List<VideoContentItemVO> pool = pools.get(cur);
-            while (pointers[cur] < pool.size() && shouldSkipForDedup(pool.get(pointers[cur]), emittedIds, emittedTitles)) {
-                pointers[cur]++;
-            }
-            if (pointers[cur] < pool.size()) {
-                VideoContentItemVO item = pool.get(pointers[cur]++);
-                emittedIds.add(item.getVideoId());
-                String nt = TitleNormalizer.normalize(item.getTitle());
-                if (!nt.isEmpty()) emittedTitles.add(nt);
-                merged.add(item);
-            } else {
-                exhausted[cur] = true;
+                List<VideoContentItemVO> pool = pools.get(cur);
+                while (pointers[cur] < pool.size() && shouldSkipForDedup(pool.get(pointers[cur]), emittedIds, emittedTitles)) {
+                    pointers[cur]++;
+                }
+                if (pointers[cur] < pool.size()) {
+                    VideoContentItemVO item = pool.get(pointers[cur]++);
+                    emittedIds.add(item.getVideoId());
+                    String nt = TitleNormalizer.normalize(item.getTitle());
+                    if (!nt.isEmpty()) emittedTitles.add(nt);
+                    merged.add(item);
+                } else {
+                    exhausted[cur] = true;
+                }
             }
+            return paginateCandidates(param, merged);
+        } finally {
+            executor.shutdown();
         }
-        return paginateCandidates(param, merged);
     }
 
     /**