Просмотр исходного кода

channelDemandMatchJob 并发执行

wangyunpeng 1 день назад
Родитель
Сommit
0024cb3c5a

+ 27 - 7
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -26,10 +26,9 @@ import javax.annotation.Resource;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * 渠道需求匹配Job
@@ -55,10 +54,19 @@ public class ChannelDemandMatchJob {
     private VideoSearchService videoSearchService;
 
     /**
-     * 需求匹配并发线程数
+     * 需求匹配并发线程数(单个渠道内部的需求匹配)
      */
     private static final int MATCH_THREAD_POOL_SIZE = 5;
 
+    /**
+     * 多渠道配置并发执行线程池
+     */
+    private static final ExecutorService CONFIG_EXECUTOR = new ThreadPoolExecutor(
+            4, 8, 60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(32),
+            new ThreadPoolExecutor.CallerRunsPolicy()
+    );
+
     /**
      * 点类型 → 向量配置编码映射
      */
@@ -95,9 +103,21 @@ public class ChannelDemandMatchJob {
             AtomicInteger totalMatched = new AtomicInteger(0);
             AtomicInteger totalFailed = new AtomicInteger(0);
 
-            // 3. 逐个渠道配置处理
-            for (ChannelDemandMatchConfig config : configs) {
-                processChannelConfig(config, dt, totalDemands, totalMatched, totalFailed);
+            // 3. 多渠道配置并发执行
+            List<CompletableFuture<Void>> futures = configs.stream()
+                    .map(config -> CompletableFuture.runAsync(() ->
+                            processChannelConfig(config, dt, totalDemands, totalMatched, totalFailed), CONFIG_EXECUTOR)
+                    )
+                    .collect(Collectors.toList());
+
+            // 等待所有渠道处理完成,单个渠道超时60分钟
+            CompletableFuture<Void> allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+            try {
+                allFuture.get(60, TimeUnit.MINUTES);
+            } catch (TimeoutException e) {
+                log.error("多渠道并发执行超时,部分渠道可能未完成");
+            } catch (ExecutionException e) {
+                log.error("多渠道并发执行异常: {}", e.getCause().getMessage(), e.getCause());
             }
 
             log.info("渠道需求匹配任务完成,总需求: {}, 已匹配: {}, 失败: {}",