Explorar o código

configs并发执行

wangyunpeng hai 1 semana
pai
achega
24f15c7e61
Modificáronse 1 ficheiros con 194 adicións e 144 borrados
  1. 194 144
      core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

+ 194 - 144
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -29,6 +29,7 @@ import org.springframework.util.StringUtils;
 import javax.annotation.Resource;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 
@@ -84,8 +85,8 @@ public class VideoVectorJob {
             }
             log.info("审核清理完成,开始分页向量化处理");
 
-            int totalSuccessCount = 0;
-            int totalFailCount = 0;
+            AtomicInteger totalSuccessCount = new AtomicInteger(0);
+            AtomicInteger totalFailCount = new AtomicInteger(0);
             int pageNum = 0;
 
             while (true) {
@@ -109,64 +110,81 @@ public class VideoVectorJob {
                 }
                 log.info("第 {} 页审核通过 {} 个视频", pageNum, auditPassedIds.size());
 
-                // 4. 对每个配置进行处理
+                // 4. 对每个配置并发处理
+                ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
+                List<Future<?>> configFutures = new ArrayList<>();
                 for (DeconstructVectorConfig config : configs) {
-                    String configCode = config.getConfigCode();
-
-                    // 4.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
-                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
-                    // 4.2 过滤出需要处理的 videoId(排除已有向量的)
-                    List<Long> needProcessIds = auditPassedIds.stream()
-                            .filter(id -> !existingVideoIds.contains(id))
-                            .collect(Collectors.toList());
-                    
-                    if (needProcessIds.isEmpty()) {
-                        log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
-                        continue;
-                    }
-                    log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
-
-                    // 4.3 批量查询需要处理的视频 raw_result
-                    for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
-                        Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
-                        if (videoRawResults.isEmpty()) {
-                            log.warn("配置 {} 未查询到任何 raw_result", configCode);
-                            continue;
-                        }
-
-                        // 4.4 逐个处理
-                        for (Long videoId : partition) {
-                            try {
-                                String rawResult = videoRawResults.get(videoId);
-                                if (!StringUtils.hasText(rawResult)) {
-                                    log.debug("videoId={} raw_result 为空,跳过", videoId);
-                                    totalFailCount++;
-                                    continue;
-                                }
+                    configFutures.add(configExecutor.submit(() -> {
+                        String configCode = config.getConfigCode();
+                        try {
+                            // 4.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
+                            Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                            // 4.2 过滤出需要处理的 videoId(排除已有向量的)
+                            List<Long> needProcessIds = auditPassedIds.stream()
+                                    .filter(id -> !existingVideoIds.contains(id))
+                                    .collect(Collectors.toList());
+
+                            if (needProcessIds.isEmpty()) {
+                                log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
+                                return;
+                            }
+                            log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-                                // 根据配置提取文本(支持置信度过滤)
-                                List<String> texts = extractTextsFromRawResult(rawResult, config);
-                                if (CollectionUtils.isEmpty(texts)) {
-                                    log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
-                                    totalFailCount++;
+                            // 4.3 批量查询需要处理的视频 raw_result
+                            for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
+                                Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
+                                if (videoRawResults.isEmpty()) {
+                                    log.warn("配置 {} 未查询到任何 raw_result", configCode);
                                     continue;
                                 }
 
-                                // 向量化并存储(多点模式返回成功数>0即为成功)
-                                int storeCount = vectorizeAndStore(config, videoId, texts);
-                                if (storeCount > 0) {
-                                    totalSuccessCount++;
-                                } else {
-                                    totalFailCount++;
+                                // 4.4 逐个处理
+                                for (Long videoId : partition) {
+                                    try {
+                                        String rawResult = videoRawResults.get(videoId);
+                                        if (!StringUtils.hasText(rawResult)) {
+                                            log.debug("videoId={} raw_result 为空,跳过", videoId);
+                                            totalFailCount.incrementAndGet();
+                                            continue;
+                                        }
+
+                                        // 根据配置提取文本(支持置信度过滤)
+                                        List<String> texts = extractTextsFromRawResult(rawResult, config);
+                                        if (CollectionUtils.isEmpty(texts)) {
+                                            log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
+                                            totalFailCount.incrementAndGet();
+                                            continue;
+                                        }
+
+                                        // 向量化并存储(多点模式返回成功数>0即为成功)
+                                        int storeCount = vectorizeAndStore(config, videoId, texts);
+                                        if (storeCount > 0) {
+                                            totalSuccessCount.incrementAndGet();
+                                        } else {
+                                            totalFailCount.incrementAndGet();
+                                        }
+
+                                    } catch (Exception e) {
+                                        log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                                        totalFailCount.incrementAndGet();
+                                    }
                                 }
-
-                            } catch (Exception e) {
-                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
-                                totalFailCount++;
                             }
+                        } catch (Exception e) {
+                            log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
                         }
+                    }));
+                }
+                // 等待所有配置处理完成
+                for (Future<?> future : configFutures) {
+                    try {
+                        future.get(10, TimeUnit.MINUTES);
+                    } catch (Exception e) {
+                        log.error("配置并发任务等待异常: {}", e.getMessage(), e);
                     }
                 }
+                configExecutor.shutdown();
+
                 // 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
                 if (videoIds.size() < VectorConstants.PAGE_SIZE) {
                     log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
@@ -174,7 +192,7 @@ public class VideoVectorJob {
                 }
                 pageNum++;
             }
-            log.info("视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount, totalFailCount, pageNum + 1);
+            log.info("视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount.get(), totalFailCount.get(), pageNum + 1);
             return ReturnT.SUCCESS;
         } catch (Exception e) {
             log.error("视频向量化任务执行失败: {}", e.getMessage(), e);
@@ -507,65 +525,81 @@ public class VideoVectorJob {
             }
             log.info("审核通过 {} 个视频", auditPassedIds.size());
 
-            int totalSuccessCount = 0;
-            int totalFailCount = 0;
+            AtomicInteger totalSuccessCount = new AtomicInteger(0);
+            AtomicInteger totalFailCount = new AtomicInteger(0);
 
-            // 5. 对每个配置进行处理
+            // 5. 对每个配置并发处理
+            ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
+            List<Future<?>> configFutures = new ArrayList<>();
             for (DeconstructVectorConfig config : configs) {
-                String configCode = config.getConfigCode();
-
-                // 5.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
-                Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
-                List<Long> needProcessIds = auditPassedIds.stream()
-                        .filter(id -> !existingVideoIds.contains(id))
-                        .collect(Collectors.toList());
-                if (needProcessIds.isEmpty()) {
-                    log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
-                    continue;
-                }
-                log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
-
-                // 5.2 逐个调用 detail 接口,提取选题并向量化存储
-                for (Long videoId : needProcessIds) {
+                configFutures.add(configExecutor.submit(() -> {
+                    String configCode = config.getConfigCode();
                     try {
-                        Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
-                        if (taskInstanceId == null) {
-                            log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
-                            totalFailCount++;
-                            continue;
+                        // 5.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
+                        Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                        List<Long> needProcessIds = auditPassedIds.stream()
+                                .filter(id -> !existingVideoIds.contains(id))
+                                .collect(Collectors.toList());
+                        if (needProcessIds.isEmpty()) {
+                            log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
+                            return;
                         }
+                        log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-                        // 调用 detail 接口获取 dataContent(解构详情)
-                        JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
-                        if (dataContent == null) {
-                            log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
-                            totalFailCount++;
-                            continue;
-                        }
+                        // 5.2 逐个调用 detail 接口,提取选题并向量化存储
+                        for (Long videoId : needProcessIds) {
+                            try {
+                                Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
+                                if (taskInstanceId == null) {
+                                    log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
+                                    totalFailCount.incrementAndGet();
+                                    continue;
+                                }
 
-                        // 从 dataContent 中提取文本(支持置信度过滤)
-                        List<String> texts = extractTextsFromDataContent(dataContent, config);
-                        if (CollectionUtils.isEmpty(texts)) {
-                            log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
-                            totalFailCount++;
-                            continue;
-                        }
+                                // 调用 detail 接口获取 dataContent(解构详情
+                                JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
+                                if (dataContent == null) {
+                                    log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
+                                    totalFailCount.incrementAndGet();
+                                    continue;
+                                }
 
-                        // 向量化并存储(多点模式返回成功数>0即为成功)
-                        int storeCount = vectorizeAndStore(config, videoId, texts);
-                        if (storeCount > 0) {
-                            totalSuccessCount++;
-                        } else {
-                            totalFailCount++;
+                                // 从 dataContent 中提取文本(支持置信度过滤)
+                                List<String> texts = extractTextsFromDataContent(dataContent, config);
+                                if (CollectionUtils.isEmpty(texts)) {
+                                    log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
+                                    totalFailCount.incrementAndGet();
+                                    continue;
+                                }
+
+                                // 向量化并存储(多点模式返回成功数>0即为成功)
+                                int storeCount = vectorizeAndStore(config, videoId, texts);
+                                if (storeCount > 0) {
+                                    totalSuccessCount.incrementAndGet();
+                                } else {
+                                    totalFailCount.incrementAndGet();
+                                }
+                            } catch (Exception e) {
+                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                                totalFailCount.incrementAndGet();
+                            }
                         }
                     } catch (Exception e) {
-                        log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
-                        totalFailCount++;
+                        log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
                     }
+                }));
+            }
+            // 等待所有配置处理完成
+            for (Future<?> future : configFutures) {
+                try {
+                    future.get(30, TimeUnit.MINUTES);
+                } catch (Exception e) {
+                    log.error("配置并发任务等待异常: {}", e.getMessage(), e);
                 }
             }
+            configExecutor.shutdown();
 
-            log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount, totalFailCount);
+            log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount.get(), totalFailCount.get());
             return ReturnT.SUCCESS;
         } catch (Exception e) {
             log.error("AIGC 来源视频向量化任务执行失败: {}", e.getMessage(), e);
@@ -824,8 +858,8 @@ public class VideoVectorJob {
             }
             log.info("审核清理完成,开始分页向量化处理");
 
-            int totalSuccessCount = 0;
-            int totalFailCount = 0;
+            AtomicInteger totalSuccessCount = new AtomicInteger(0);
+            AtomicInteger totalFailCount = new AtomicInteger(0);
             int pageNum = 0;
 
             while (true) {
@@ -849,60 +883,76 @@ public class VideoVectorJob {
                 }
                 log.info("第 {} 页审核通过 {} 个视频", pageNum, auditPassedIds.size());
 
-                // 5. 对每个配置进行处理
+                // 5. 对每个配置并发处理
+                ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
+                List<Future<?>> configFutures = new ArrayList<>();
                 for (DeconstructVectorConfig config : configs) {
-                    String configCode = config.getConfigCode();
-
-                    // 5.1 已向量化过滤
-                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
-                    List<Long> needProcessIds = auditPassedIds.stream()
-                            .filter(id -> !existingVideoIds.contains(id))
-                            .collect(Collectors.toList());
-                    if (needProcessIds.isEmpty()) {
-                        log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
-                        continue;
-                    }
-                    log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
-
-                    // 5.2 分批查询 result_log 的 data 字段并向量化
-                    for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
-                        Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
-                        if (videoDataMap.isEmpty()) {
-                            log.warn("配置 {} 未查询到任何 result_log data", configCode);
-                            continue;
-                        }
+                    configFutures.add(configExecutor.submit(() -> {
+                        String configCode = config.getConfigCode();
+                        try {
+                            // 5.1 已向量化过滤
+                            Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                            List<Long> needProcessIds = auditPassedIds.stream()
+                                    .filter(id -> !existingVideoIds.contains(id))
+                                    .collect(Collectors.toList());
+                            if (needProcessIds.isEmpty()) {
+                                log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
+                                return;
+                            }
+                            log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-                        for (Long videoId : partition) {
-                            try {
-                                String data = videoDataMap.get(videoId);
-                                if (!StringUtils.hasText(data)) {
-                                    log.debug("videoId={} result_log data 为空,跳过", videoId);
-                                    totalFailCount++;
+                            // 5.2 分批查询 result_log 的 data 字段并向量化
+                            for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
+                                Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
+                                if (videoDataMap.isEmpty()) {
+                                    log.warn("配置 {} 未查询到任何 result_log data", configCode);
                                     continue;
                                 }
 
-                                // 从 data JSON 中根据配置提取文本
-                                List<String> texts = extractTextsFromResultLogData(data, config);
-                                if (CollectionUtils.isEmpty(texts)) {
-                                    log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
-                                    totalFailCount++;
-                                    continue;
+                                for (Long videoId : partition) {
+                                    try {
+                                        String data = videoDataMap.get(videoId);
+                                        if (!StringUtils.hasText(data)) {
+                                            log.debug("videoId={} result_log data 为空,跳过", videoId);
+                                            totalFailCount.incrementAndGet();
+                                            continue;
+                                        }
+
+                                        // 从 data JSON 中根据配置提取文本
+                                        List<String> texts = extractTextsFromResultLogData(data, config);
+                                        if (CollectionUtils.isEmpty(texts)) {
+                                            log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
+                                            totalFailCount.incrementAndGet();
+                                            continue;
+                                        }
+
+                                        // 向量化并存储
+                                        int storeCount = vectorizeAndStore(config, videoId, texts);
+                                        if (storeCount > 0) {
+                                            totalSuccessCount.incrementAndGet();
+                                        } else {
+                                            totalFailCount.incrementAndGet();
+                                        }
+                                    } catch (Exception e) {
+                                        log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                                        totalFailCount.incrementAndGet();
+                                    }
                                 }
-
-                                // 向量化并存储
-                                int storeCount = vectorizeAndStore(config, videoId, texts);
-                                if (storeCount > 0) {
-                                    totalSuccessCount++;
-                                } else {
-                                    totalFailCount++;
-                                }
-                            } catch (Exception e) {
-                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
-                                totalFailCount++;
                             }
+                        } catch (Exception e) {
+                            log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
                         }
+                    }));
+                }
+                // 等待所有配置处理完成
+                for (Future<?> future : configFutures) {
+                    try {
+                        future.get(10, TimeUnit.MINUTES);
+                    } catch (Exception e) {
+                        log.error("配置并发任务等待异常: {}", e.getMessage(), e);
                     }
                 }
+                configExecutor.shutdown();
 
                 if (videoIds.size() < VectorConstants.PAGE_SIZE) {
                     log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
@@ -911,7 +961,7 @@ public class VideoVectorJob {
                 pageNum++;
             }
 
-            log.info("result_log 来源视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount, totalFailCount, pageNum + 1);
+            log.info("result_log 来源视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount.get(), totalFailCount.get(), pageNum + 1);
             return ReturnT.SUCCESS;
         } catch (Exception e) {
             log.error("result_log 来源视频向量化任务执行失败: {}", e.getMessage(), e);