Przeglądaj źródła

processExternalChannel 并发执行

wangyunpeng 1 miesiąc temu
rodzic
commit
da083572f4

+ 96 - 10
api-module/src/main/java/com/tzld/piaoquan/api/job/ExternalChannelProcessJob.java

@@ -24,10 +24,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -118,6 +121,49 @@ public class ExternalChannelProcessJob {
      */
     private static final long LOCK_EXPIRE_SECONDS = 300;
 
+    /**
+     * 并发处理线程数
+     */
+    private static final int CONCURRENT_THREADS = 10;
+
+    /**
+     * 单条记录处理超时时间(秒)
+     */
+    private static final int SINGLE_RECORD_TIMEOUT_SECONDS = 60;
+
+    /**
+     * 类级别线程池,避免每次方法调用创建新线程池
+     */
+    private ExecutorService executor;
+
+    /**
+     * 初始化线程池
+     */
+    @PostConstruct
+    public void init() {
+        executor = Executors.newFixedThreadPool(CONCURRENT_THREADS);
+        log.info("ExternalChannelProcessJob线程池初始化完成, 线程数={}", CONCURRENT_THREADS);
+    }
+
+    /**
+     * 销毁时关闭线程池
+     */
+    @PreDestroy
+    public void destroy() {
+        if (executor != null && !executor.isShutdown()) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+            log.info("ExternalChannelProcessJob线程池已关闭");
+        }
+    }
+
     /**
      * 处理外部渠道待处理记录
      * 1. 先将超过 DEFAULT_MAX_RETRY_DAYS 天的待处理记录标记为 FAILED
@@ -159,15 +205,8 @@ public class ExternalChannelProcessJob {
                 lastId = pendingList.get(pendingList.size() - 1).getId();
                 log.info("第{}页, 找到{}条待处理记录, lastId={}", pageNum, pendingList.size(), lastId);
 
-                // 2. 逐条处理
-                for (ExternalChannel record : pendingList) {
-                    try {
-                        processSingleRecord(record);
-                        totalProcessed++;
-                    } catch (Exception e) {
-                        log.error("处理记录异常, id={}, rootSourceId={}", record.getId(), record.getRootSourceId(), e);
-                    }
-                }
+                // 2. 并发处理
+                totalProcessed += processBatchConcurrently(pendingList);
 
             } while (pendingList.size() >= QUERY_LIMIT);
 
@@ -179,6 +218,53 @@ public class ExternalChannelProcessJob {
         }
     }
 
+    /**
+     * 批量并发处理记录
+     * 使用类级别线程池并发处理列表中的记录,每条记录有独立的分布式锁保护
+     *
+     * @param pendingList 待处理记录列表
+     * @return 实际处理的记录数
+     */
+    private int processBatchConcurrently(List<ExternalChannel> pendingList) {
+        if (CollectionUtils.isEmpty(pendingList)) {
+            return 0;
+        }
+
+        AtomicInteger processedCount = new AtomicInteger(0);
+        List<Future<?>> futures = new ArrayList<>();
+
+        // 提交所有任务到类级别线程池
+        for (ExternalChannel record : pendingList) {
+            Future<?> future = executor.submit(() -> {
+                try {
+                    processSingleRecord(record);
+                    processedCount.incrementAndGet();
+                } catch (Exception e) {
+                    log.error("处理记录异常, id={}, rootSourceId={}",
+                            record.getId(), record.getRootSourceId(), e);
+                }
+            });
+            futures.add(future);
+        }
+
+        // 等待所有任务完成,设置超时时间
+        for (Future<?> future : futures) {
+            try {
+                future.get(SINGLE_RECORD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            } catch (TimeoutException e) {
+                log.warn("处理记录超时,取消任务");
+                future.cancel(true);
+            } catch (InterruptedException e) {
+                log.warn("处理被中断");
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e) {
+                log.error("任务执行异常", e.getCause());
+            }
+        }
+
+        return processedCount.get();
+    }
+
     /**
      * 处理单条记录
      * 根据渠道类型路由到对应的处理方法,使用分布式锁防止并发处理