Переглянути джерело

审核先过滤 调大审核过滤并发数

wangyunpeng 1 тиждень тому
батько
коміт
215b96d232

+ 79 - 49
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -28,6 +28,7 @@ import org.springframework.util.StringUtils;
 
 
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 import java.util.*;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 
 
@@ -96,15 +97,26 @@ public class VideoVectorJob {
                 }
                 }
                 log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
                 log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
 
 
-                // 3. 对每个配置进行处理
+                // 3. 先进行审核过滤(每页只过滤一次,避免在 config 循环内重复调用)
+                List<Long> auditPassedIds = filterAuditPassedIds(videoIds);
+                if (auditPassedIds.isEmpty()) {
+                    log.info("第 {} 页所有视频均未通过审核,跳过", pageNum);
+                    if (videoIds.size() < VectorConstants.PAGE_SIZE) {
+                        break;
+                    }
+                    pageNum++;
+                    continue;
+                }
+                log.info("第 {} 页审核通过 {} 个视频", pageNum, auditPassedIds.size());
+
+                // 4. 对每个配置进行处理
                 for (DeconstructVectorConfig config : configs) {
                 for (DeconstructVectorConfig config : configs) {
                     String configCode = config.getConfigCode();
                     String configCode = config.getConfigCode();
 
 
-                    // 3.0 审核清理已移至分页外,此处仅进行向量存在性检查
-                    // 3.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
-                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, videoIds);
-                    // 3.2 过滤出需要处理的 videoId(排除已有向量的)
-                    List<Long> needProcessIds = videoIds.stream()
+                    // 4.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
+                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                    // 4.2 过滤出需要处理的 videoId(排除已有向量的)
+                    List<Long> needProcessIds = auditPassedIds.stream()
                             .filter(id -> !existingVideoIds.contains(id))
                             .filter(id -> !existingVideoIds.contains(id))
                             .collect(Collectors.toList());
                             .collect(Collectors.toList());
                     
                     
@@ -114,15 +126,7 @@ public class VideoVectorJob {
                     }
                     }
                     log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
                     log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
 
-                    // 3.3 审核状态过滤:排除审核未通过的视频
-                    needProcessIds = filterAuditPassedIds(needProcessIds);
-                    if (needProcessIds.isEmpty()) {
-                        log.info("配置 {} 待处理视频均未通过审核,跳过", configCode);
-                        continue;
-                    }
-                    log.info("配置 {} 审核通过后需处理 {} 个视频", configCode, needProcessIds.size());
-
-                    // 3.4 批量查询需要处理的视频 raw_result
+                    // 4.3 批量查询需要处理的视频 raw_result
                     for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
                     for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
                         Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
                         Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
                         if (videoRawResults.isEmpty()) {
                         if (videoRawResults.isEmpty()) {
@@ -130,7 +134,7 @@ public class VideoVectorJob {
                             continue;
                             continue;
                         }
                         }
 
 
-                        // 3.5 逐个处理
+                        // 4.4 逐个处理
                         for (Long videoId : partition) {
                         for (Long videoId : partition) {
                             try {
                             try {
                                 String rawResult = videoRawResults.get(videoId);
                                 String rawResult = videoRawResults.get(videoId);
@@ -495,16 +499,24 @@ public class VideoVectorJob {
             List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
             List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
             log.info("共 {} 个有效 videoId", allVideoIds.size());
             log.info("共 {} 个有效 videoId", allVideoIds.size());
 
 
+            // 4. 先进行审核过滤(只过滤一次,避免在 config 循环内重复调用)
+            List<Long> auditPassedIds = filterAuditPassedIds(allVideoIds);
+            if (auditPassedIds.isEmpty()) {
+                log.info("所有视频均未通过审核,任务结束");
+                return ReturnT.SUCCESS;
+            }
+            log.info("审核通过 {} 个视频", auditPassedIds.size());
+
             int totalSuccessCount = 0;
             int totalSuccessCount = 0;
             int totalFailCount = 0;
             int totalFailCount = 0;
 
 
-            // 4. 对每个配置进行处理
+            // 5. 对每个配置进行处理
             for (DeconstructVectorConfig config : configs) {
             for (DeconstructVectorConfig config : configs) {
                 String configCode = config.getConfigCode();
                 String configCode = config.getConfigCode();
 
 
-                // 4.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
-                Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, allVideoIds);
-                List<Long> needProcessIds = allVideoIds.stream()
+                // 5.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
+                Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                List<Long> needProcessIds = auditPassedIds.stream()
                         .filter(id -> !existingVideoIds.contains(id))
                         .filter(id -> !existingVideoIds.contains(id))
                         .collect(Collectors.toList());
                         .collect(Collectors.toList());
                 if (needProcessIds.isEmpty()) {
                 if (needProcessIds.isEmpty()) {
@@ -513,15 +525,7 @@ public class VideoVectorJob {
                 }
                 }
                 log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
                 log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
 
-                // 4.2 审核状态过滤:排除审核未通过的视频
-                needProcessIds = filterAuditPassedIds(needProcessIds);
-                if (needProcessIds.isEmpty()) {
-                    log.info("配置 {} 待处理视频均未通过审核,跳过", configCode);
-                    continue;
-                }
-                log.info("配置 {} 审核通过后需处理 {} 个视频", configCode, needProcessIds.size());
-
-                // 4.3 逐个调用 detail 接口,提取选题并向量化存储
+                // 5.2 逐个调用 detail 接口,提取选题并向量化存储
                 for (Long videoId : needProcessIds) {
                 for (Long videoId : needProcessIds) {
                     try {
                     try {
                         Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
                         Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
@@ -751,15 +755,37 @@ public class VideoVectorJob {
         if (CollectionUtils.isEmpty(videoIds)) {
         if (CollectionUtils.isEmpty(videoIds)) {
             return Collections.emptyList();
             return Collections.emptyList();
         }
         }
-        Set<Long> notPassedIds = new HashSet<>();
-        for (List<Long> batch : Lists.partition(videoIds, VectorConstants.AUDIT_CHECK_BATCH_SIZE)) {
-            try {
-                Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
-                notPassedIds.addAll(batchNotPassed);
-            } catch (Exception e) {
-                log.error("审核状态查询失败,batch={}, error={}", batch, e.getMessage(), e);
+
+        List<List<Long>> batches = Lists.partition(videoIds, VectorConstants.AUDIT_CHECK_BATCH_SIZE);
+        // 并发提交所有批次的审核查询(API 限制每批 20 条,需用更多线程提升吞吐)
+        int parallelism = Math.min(batches.size(), 10);
+        ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+        Set<Long> notPassedIds = ConcurrentHashMap.newKeySet();
+
+        try {
+            List<Future<?>> futures = new ArrayList<>();
+            for (List<Long> batch : batches) {
+                futures.add(executor.submit(() -> {
+                    try {
+                        Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
+                        notPassedIds.addAll(batchNotPassed);
+                    } catch (Exception e) {
+                        log.error("审核状态查询失败,batch={}, error={}", batch, e.getMessage(), e);
+                    }
+                }));
             }
             }
+            // 等待所有任务完成
+            for (Future<?> future : futures) {
+                try {
+                    future.get(30, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.error("审核查询任务等待异常: {}", e.getMessage(), e);
+                }
+            }
+        } finally {
+            executor.shutdown();
         }
         }
+
         if (notPassedIds.isEmpty()) {
         if (notPassedIds.isEmpty()) {
             return videoIds;
             return videoIds;
         }
         }
@@ -811,13 +837,25 @@ public class VideoVectorJob {
                 }
                 }
                 log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
                 log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
 
 
-                // 4. 对每个配置进行处理
+                // 4. 先进行审核过滤(每页只过滤一次,避免在 config 循环内重复调用)
+                List<Long> auditPassedIds = filterAuditPassedIds(videoIds);
+                if (auditPassedIds.isEmpty()) {
+                    log.info("第 {} 页所有视频均未通过审核,跳过", pageNum);
+                    if (videoIds.size() < VectorConstants.PAGE_SIZE) {
+                        break;
+                    }
+                    pageNum++;
+                    continue;
+                }
+                log.info("第 {} 页审核通过 {} 个视频", pageNum, auditPassedIds.size());
+
+                // 5. 对每个配置进行处理
                 for (DeconstructVectorConfig config : configs) {
                 for (DeconstructVectorConfig config : configs) {
                     String configCode = config.getConfigCode();
                     String configCode = config.getConfigCode();
 
 
-                    // 4.1 已向量化过滤
-                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, videoIds);
-                    List<Long> needProcessIds = videoIds.stream()
+                    // 5.1 已向量化过滤
+                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+                    List<Long> needProcessIds = auditPassedIds.stream()
                             .filter(id -> !existingVideoIds.contains(id))
                             .filter(id -> !existingVideoIds.contains(id))
                             .collect(Collectors.toList());
                             .collect(Collectors.toList());
                     if (needProcessIds.isEmpty()) {
                     if (needProcessIds.isEmpty()) {
@@ -826,15 +864,7 @@ public class VideoVectorJob {
                     }
                     }
                     log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
                     log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
 
-                    // 4.2 审核状态过滤
-                    needProcessIds = filterAuditPassedIds(needProcessIds);
-                    if (needProcessIds.isEmpty()) {
-                        log.info("配置 {} 待处理视频均未通过审核,跳过", configCode);
-                        continue;
-                    }
-                    log.info("配置 {} 审核通过后需处理 {} 个视频", configCode, needProcessIds.size());
-
-                    // 4.3 分批查询 result_log 的 data 字段并向量化
+                    // 5.2 分批查询 result_log 的 data 字段并向量化
                     for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
                     for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
                         Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
                         Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
                         if (videoDataMap.isEmpty()) {
                         if (videoDataMap.isEmpty()) {