wangyunpeng 14 часов назад
Родитель
Сommit
15c48407ab

+ 72 - 3
core/src/main/java/com/tzld/videoVector/api/VideoApiService.java

@@ -13,7 +13,7 @@ import org.springframework.util.CollectionUtils;
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.*;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 
 /**
 /**
  * 视频 API 服务
  * 视频 API 服务
@@ -34,6 +34,20 @@ public class VideoApiService {
      */
      */
     private static final int BATCH_SIZE = 20;
     private static final int BATCH_SIZE = 20;
 
 
+    /**
+     * 视频详情查询并行线程池,核心线程8,最多支持 8 批并行调用
+     */
+    private static final ExecutorService VIDEO_DETAIL_EXECUTOR = new ThreadPoolExecutor(
+            8, 16, 60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(64),
+            r -> {
+                Thread t = new Thread(r, "video-detail-pool-" + r.hashCode());
+                t.setDaemon(true);
+                return t;
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy()
+    );
+
     @Value("${video.api.timeout:30}")
     @Value("${video.api.timeout:30}")
     private int timeout;
     private int timeout;
 
 
@@ -48,8 +62,8 @@ public class VideoApiService {
     }
     }
 
 
     /**
     /**
-     * 批量获取视频详情信息
-     * 自动分批处理,每批次最多20个videoId
+     * 批量获取视频详情信息(串行版本)
+     * 自动分批处理,每批次最多20个videoId,批次间串行执行
      *
      *
      * @param videoIdList 视频ID列表
      * @param videoIdList 视频ID列表
      * @return videoId -> VideoDetail 映射
      * @return videoId -> VideoDetail 映射
@@ -141,6 +155,61 @@ public class VideoApiService {
         return new HashMap<>();
         return new HashMap<>();
     }
     }
 
 
+    /**
+     * 并发批量获取视频详情信息(带最大并发控制)
+     * 自动分批处理,每批次最多20个videoId,通过 Semaphore 控制最大并行批次数
+     *
+     * @param videoIdList    视频ID列表
+     * @param maxConcurrency 最大并发批次数(控制对下游 API 的并行请求数)
+     * @return videoId -> VideoDetail 映射
+     */
+    public Map<Long, VideoDetail> getVideoDetailConcurrent(Set<Long> videoIdList, int maxConcurrency) {
+        if (CollectionUtils.isEmpty(videoIdList)) {
+            return Collections.emptyMap();
+        }
+        if (maxConcurrency <= 0) {
+            maxConcurrency = 8;
+        }
+
+        List<Long> videoIds = new ArrayList<>(videoIdList);
+        List<List<Long>> partitions = Lists.partition(videoIds, BATCH_SIZE);
+        Semaphore semaphore = new Semaphore(maxConcurrency);
+
+        // 并行执行所有批次,通过 Semaphore 限制最大并发
+        List<CompletableFuture<Map<Long, VideoDetail>>> futures = new ArrayList<>();
+        for (List<Long> batch : partitions) {
+            CompletableFuture<Map<Long, VideoDetail>> future = CompletableFuture.supplyAsync(() -> {
+                try {
+                    semaphore.acquire();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return Collections.<Long, VideoDetail>emptyMap();
+                }
+                try {
+                    return getVideoDetailBatch(new HashSet<>(batch));
+                } catch (Exception e) {
+                    log.error("并发获取视频详情失败, batch={}, error={}", batch, e.getMessage(), e);
+                    return Collections.<Long, VideoDetail>emptyMap();
+                } finally {
+                    semaphore.release();
+                }
+            }, VIDEO_DETAIL_EXECUTOR);
+            futures.add(future);
+        }
+
+        // 等待所有批次完成并汇总结果
+        Map<Long, VideoDetail> result = new HashMap<>();
+        for (CompletableFuture<Map<Long, VideoDetail>> future : futures) {
+            try {
+                result.putAll(future.get(timeout, TimeUnit.SECONDS));
+            } catch (Exception e) {
+                log.error("并发获取视频详情批次超时或异常: {}", e.getMessage(), e);
+            }
+        }
+
+        return result;
+    }
+
     /**
     /**
      * 获取审核未通过的视频ID列表
      * 获取审核未通过的视频ID列表
      *
      *

+ 2 - 2
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -948,8 +948,8 @@ public class VideoSearchServiceImpl implements VideoSearchService {
                 .map(VideoMatch::getVideoId)
                 .map(VideoMatch::getVideoId)
                 .collect(Collectors.toSet());
                 .collect(Collectors.toSet());
 
 
-        // 批量获取视频详情
-        Map<Long, VideoDetail> videoDetails = videoApiService.getVideoDetail(videoIds);
+        // 批量获取视频详情(并发版本,仅match接口使用)
+        Map<Long, VideoDetail> videoDetails = videoApiService.getVideoDetailConcurrent(videoIds, 8);
 
 
         // 过滤审核通过的视频
         // 过滤审核通过的视频
         List<VideoMatch> filteredMatches = matches.stream()
         List<VideoMatch> filteredMatches = matches.stream()