Przeglądaj źródła

并发调用检查视频审核通过

wangyunpeng 15 godzin temu
rodzic
commit
344df5797a

+ 1 - 1
core/src/main/java/com/tzld/videoVector/api/VideoApiService.java

@@ -217,7 +217,7 @@ public class VideoApiService {
      * @return 审核未通过的视频ID集合
      */
     public Set<Long> getNotAuditPassedVideoIds(Set<Long> videoIdList) {
-        Map<Long, VideoDetail> videoDetails = getVideoDetail(videoIdList);
+        Map<Long, VideoDetail> videoDetails = getVideoDetailConcurrent(videoIdList, 5);
         Set<Long> notPassedIds = new HashSet<>();
 
         for (Map.Entry<Long, VideoDetail> entry : videoDetails.entrySet()) {

+ 13 - 33
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -841,16 +841,13 @@ public class VideoVectorJob {
 
             log.info("配置 {} 开始检查审核状态,共 {} 个视频", configCode, allStoredIds.size());
 
-            // 分批检查审核状态(数据库层 selectAllVideoIdsByConfigCode 已返回 DISTINCT video_id
+            // 分批调用 getNotAuditPassedVideoIds,每批100条(内部通过 getVideoDetailConcurrent 自动按20条分批并发请求
             List<Long> videoIdList = new ArrayList<>(allStoredIds);
+            List<List<Long>> batches = Lists.partition(videoIdList, 100);
             int totalRemoved = 0;
 
-            for (int i = 0; i < videoIdList.size(); i += VectorConstants.AUDIT_CHECK_BATCH_SIZE) {
-                int end = Math.min(i + VectorConstants.AUDIT_CHECK_BATCH_SIZE, videoIdList.size());
-                Set<Long> batchIds = new HashSet<>(videoIdList.subList(i, end));
-
-                // 获取审核未通过的视频ID
-                Set<Long> notPassedIds = videoApiService.getNotAuditPassedVideoIds(batchIds);
+            for (List<Long> batch : batches) {
+                Set<Long> notPassedIds = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
 
                 if (!notPassedIds.isEmpty()) {
                     vectorStoreService.deleteBatch(configCode, notPassedIds);
@@ -878,34 +875,17 @@ public class VideoVectorJob {
             return Collections.emptyList();
         }
 
-        List<List<Long>> batches = Lists.partition(videoIds, VectorConstants.AUDIT_CHECK_BATCH_SIZE);
-        // 并发提交所有批次的审核查询(API 限制每批 20 条,需用更多线程提升吞吐)
-        int parallelism = Math.min(batches.size(), 10);
-        ExecutorService executor = Executors.newFixedThreadPool(parallelism);
-        Set<Long> notPassedIds = ConcurrentHashMap.newKeySet();
+        // 每批100条调用 getNotAuditPassedVideoIds(内部通过 getVideoDetailConcurrent 按20条分批并发请求)
+        List<List<Long>> batches = Lists.partition(videoIds, 100);
+        Set<Long> notPassedIds = new HashSet<>();
 
-        try {
-            List<Future<?>> futures = new ArrayList<>();
-            for (List<Long> batch : batches) {
-                futures.add(executor.submit(() -> {
-                    try {
-                        Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
-                        notPassedIds.addAll(batchNotPassed);
-                    } catch (Exception e) {
-                        log.error("审核状态查询失败,batch={}, error={}", batch, e.getMessage(), e);
-                    }
-                }));
-            }
-            // 等待所有任务完成
-            for (Future<?> future : futures) {
-                try {
-                    future.get(30, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("审核查询任务等待异常: {}", e.getMessage(), e);
-                }
+        for (List<Long> batch : batches) {
+            try {
+                Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
+                notPassedIds.addAll(batchNotPassed);
+            } catch (Exception e) {
+                log.error("审核状态查询失败,batch size={}, error={}", batch.size(), e.getMessage(), e);
             }
-        } finally {
-            executor.shutdown();
         }
 
         if (notPassedIds.isEmpty()) {