فهرست منبع

匹配并发可配置

wangyunpeng 2 روز پیش
والد
کامیت
90c0d97857
1فایلهای تغییر یافته به همراه17 افزوده شده و 7 حذف شده
  1. 17 7
      core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

+ 17 - 7
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
 
 import com.tzld.videoVector.util.Md5Util;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
@@ -130,13 +131,12 @@ public class ChannelDemandMatchJob {
     private Map<String, Map<String, Double>> topRovFilterConfig;
 
     /**
-     * 多渠道配置并发执行线程池
+     * 多渠道配置并发执行线程池(核心线程数)
      */
-    private static final ExecutorService CONFIG_EXECUTOR = new ThreadPoolExecutor(
-            4, 8, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(32),
-            new ThreadPoolExecutor.CallerRunsPolicy()
-    );
+    @Value("${channel.demand.config-executor.size:8}")
+    private int configExecutorSize;
+
+    private ExecutorService configExecutor;
 
     /**
      * channel_demand_match_result 保留天数(默认14天)
@@ -155,6 +155,16 @@ public class ChannelDemandMatchJob {
         POINT_TYPE_CONFIG_CODE_MAP.put("目的点", "VIDEO_PURPOSE");
     }
 
+    @PostConstruct
+    public void initConfigExecutor() {
+        this.configExecutor = new ThreadPoolExecutor(
+                configExecutorSize, configExecutorSize,
+                60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadPoolExecutor.CallerRunsPolicy()
+        );
+    }
+
     /**
      * 渠道需求匹配任务
      * param格式: dt=20260508 (可选,不传默认取前一天)
@@ -183,7 +193,7 @@ public class ChannelDemandMatchJob {
             // 3. 多渠道配置并发执行
             List<CompletableFuture<Void>> futures = configs.stream()
                     .map(config -> CompletableFuture.runAsync(() ->
-                            processChannelConfig(config, dt, totalDemands, totalMatched, totalFailed), CONFIG_EXECUTOR)
+                            processChannelConfig(config, dt, totalDemands, totalMatched, totalFailed), configExecutor)
                     )
                     .collect(Collectors.toList());