Преглед на файлове

processChannelConfig 执行优化 fix

wangyunpeng преди 4 часа
родител
ревизия
044512faf8
променени са 1 файла, в които са добавени 12 реда и са изтрити 9 реда
  1. 12 9
      core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

+ 12 - 9
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -195,8 +195,8 @@ public class ChannelDemandMatchJob {
         ChannelDemandMatchResult poisonPill = new ChannelDemandMatchResult();
         AtomicInteger producedCount = new AtomicInteger(0);
 
-        // 生产者:ODPS流式读取,逐条放入队列
-        CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
+        // 生产者:用独立线程执行ODPS流式读取(避免与消费者共用CONFIG_EXECUTOR导致线程饥饿死锁)
+        Thread producerThread = new Thread(() -> {
             try {
                 OdpsUtil.getOdpsDataStream(sql, record -> {
                     try {
@@ -218,7 +218,9 @@ public class ChannelDemandMatchJob {
                     Thread.currentThread().interrupt();
                 }
             }
-        }, CONFIG_EXECUTOR);
+        }, "odps-producer-" + channelName);
+        producerThread.setDaemon(true);
+        producerThread.start();
 
         // 消费者:匹配线程池从队列中取需求并执行向量匹配
         ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
@@ -228,8 +230,8 @@ public class ChannelDemandMatchJob {
             while (true) {
                 ChannelDemandMatchResult demand = demandQueue.poll(5, TimeUnit.SECONDS);
                 if (demand == null) {
-                    // 超时检查生产者是否已完成
-                    if (producer.isDone()) {
+                    // 超时检查生产者线程是否已结束
+                    if (!producerThread.isAlive()) {
                         // 排空队列中剩余元素
                         demand = demandQueue.poll();
                         if (demand == null) {
@@ -274,11 +276,12 @@ public class ChannelDemandMatchJob {
             Thread.currentThread().interrupt();
         }
 
-        // 等待生产者确认完成(正常情况下此时已完成)
+        // 等待生产者线程确认完成(正常情况下此时已完成)
         try {
-            producer.get(1, TimeUnit.MINUTES);
-        } catch (Exception e) {
-            log.error("渠道 {} ODPS生产者异常: {}", channelName, e.getMessage());
+            producerThread.join(60_000);
+        } catch (InterruptedException e) {
+            log.error("渠道 {} 等待ODPS生产者超时", channelName);
+            Thread.currentThread().interrupt();
         }
 
         log.info("渠道 {} 流水线处理完成, 生产: {} 条需求", channelName, producedCount.get());