wangyunpeng před 4 dny
rodič
revize
9e885523f5

+ 8 - 5
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -336,13 +336,17 @@ public class ChannelDemandMatchJob {
             }
             totalFilteredCount += filteredDemands.size();
 
-            // 2c. 并发匹配该批过滤后的需求,完成后释放引用进入下一维度
-            ExecutorService matchExecutor = Executors.newFixedThreadPool(matchThreadPoolSize);
-            List<Future<?>> matchFutures = new ArrayList<>();
+            // 2c. 并发匹配,有界队列 + CallerRunsPolicy 形成背压,防止内存溢出
+            // 注意:移除 Future 列表避免同时持有大量 Future 对象;队列满时由调用线程执行形成自然限流
+            ThreadPoolExecutor matchExecutor = new ThreadPoolExecutor(
+                    matchThreadPoolSize, matchThreadPoolSize,
+                    60L, TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<>(matchThreadPoolSize),
+                    new ThreadPoolExecutor.CallerRunsPolicy());
             for (ChannelDemandMatchResult demand : filteredDemands) {
                 totalDemands.incrementAndGet();
                 ChannelDemandMatchResult finalDemand = demand;
-                Future<?> future = matchExecutor.submit(() -> {
+                matchExecutor.submit(() -> {
                     try {
                         matchDemandAndSave(finalDemand, topN);
                         totalMatched.incrementAndGet();
@@ -354,7 +358,6 @@ public class ChannelDemandMatchJob {
                         totalFailed.incrementAndGet();
                     }
                 });
-                matchFutures.add(future);
             }
 
             matchExecutor.shutdown();