Explorar o código

embed 并发调整

wangyunpeng hai 1 semana
pai
achega
4967c5b188

+ 0 - 35
core/src/main/java/com/tzld/videoVector/api/DashScopeEmbeddingApiService.java

@@ -13,7 +13,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * 阿里云 DashScope 多模态 Embedding API 服务
@@ -35,19 +34,8 @@ public class DashScopeEmbeddingApiService {
     @Value("${dashscope.embedding.dashscope.timeout:60}")
     private int timeout;
 
-    /**
-     * 触发限流后暂停时间(秒)
-     */
-    @Value("${dashscope.embedding.dashscope.pause-seconds:20}")
-    private long pauseSeconds;
-
     private OkHttpClient client;
 
-    /**
-     * 限流恢复时间戳(毫秒),当前时间超过此值才允许请求
-     */
-    private final AtomicLong throttleUntil = new AtomicLong(0);
-
     @PostConstruct
     public void init() {
         client = new OkHttpClient.Builder()
@@ -70,21 +58,6 @@ public class DashScopeEmbeddingApiService {
             return Collections.emptyList();
         }
 
-        // 如果处于限流暂停期,等待恢复
-        long resumeAt = throttleUntil.get();
-        long now = System.currentTimeMillis();
-        if (now < resumeAt) {
-            long waitMs = resumeAt - now;
-            log.info("DashScope Embedding 处于限流暂停期,还需等待{}ms后恢复", waitMs);
-            try {
-                Thread.sleep(waitMs);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.error("DashScope Embedding 限流等待被中断", e);
-                return Collections.emptyList();
-            }
-        }
-
         try {
             // 构造请求体:contents 数组中放 text 对象
             JSONObject contentItem = new JSONObject();
@@ -114,14 +87,6 @@ public class DashScopeEmbeddingApiService {
                     .build();
 
             try (Response response = client.newCall(request).execute()) {
-                if (response.code() == 429) {
-                    String errorBody = response.body() != null ? response.body().string() : "无";
-                    // 触发限流,设置暂停时间
-                    long pauseUntil = System.currentTimeMillis() + pauseSeconds * 1000;
-                    throttleUntil.set(pauseUntil);
-                    log.error("DashScope Embedding 触发限流(429),暂停{}秒后恢复,错误: {}", pauseSeconds, errorBody);
-                    return Collections.emptyList();
-                }
                 if (!response.isSuccessful()) {
                     String errorBody = response.body() != null ? response.body().string() : "无";
                     log.error("DashScope Embedding 请求失败,HTTP状态码: {}, 错误信息: {}", response.code(), errorBody);

+ 0 - 6
core/src/main/java/com/tzld/videoVector/common/constant/VectorConstants.java

@@ -58,12 +58,6 @@ public interface VectorConstants {
 
     // ========================== 并发参数 ==========================
 
-    /** Embedding 最大在途任务数(Semaphore 控制,防止OOM) */
-    int MAX_EMBEDDING_IN_FLIGHT = 100;
-
-    /** Embedding 并发数 */
-    int EMBEDDING_PARALLELISM = 10;
-
     /** AIGC API 并发数 */
     int AIGC_DETAIL_PARALLELISM = 10;
 

+ 75 - 122
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -76,7 +76,9 @@ public class VideoVectorJob {
     @Resource
     private VideoDeconstructResultMapperExt videoDeconstructResultMapperExt;
 
-    /** 本次 Job 执行中已缓存 decode 的 videoId,避免多配置下重复写入 */
+    /**
+     * 本次 Job 执行中已缓存 decode 的 videoId,避免多配置下重复写入
+     */
     private final Set<Long> decodeCachedInThisRun = ConcurrentHashMap.newKeySet();
 
 
@@ -182,26 +184,17 @@ public class VideoVectorJob {
             }
             log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-            // 3. 从本地DB批量查询解构结果并并发embedding
-            ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
-            Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
-            List<Future<?>> futures = new ArrayList<>();
-
-            try {
-                for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
-                    List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
-                            .selectResultsByVideoIds("result_json", batchIds);
-                    for (VideoDeconstructResult r : results) {
-                        if (!StringUtils.hasText(r.getResult())) {
-                            continue;
-                        }
-                        submitLocalEmbeddingTask(r.getVideoId(), r.getResult(), config,
-                                embedExecutor, inFlightLimiter, futures,
-                                totalSuccessCount, totalFailCount, "raw_result");
+            // 3. 从本地DB批量查询解构结果并顺序embedding
+            for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
+                List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
+                        .selectResultsByVideoIds("result_json", batchIds);
+                for (VideoDeconstructResult r : results) {
+                    if (!StringUtils.hasText(r.getResult())) {
+                        continue;
                     }
+                    executeEmbeddingTask(r.getVideoId(), r.getResult(), config,
+                            totalSuccessCount, totalFailCount, "raw_result");
                 }
-            } finally {
-                awaitAndShutdown(futures, embedExecutor, 30, "embedding");
             }
         } catch (Exception e) {
             log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
@@ -209,46 +202,33 @@ public class VideoVectorJob {
     }
 
     /**
-     * 提交单条记录的 embedding 任务到线程池(基于本地DB数据)
-     * 使用 Semaphore 控制在途并发数
+     * 顺序执行单条记录的 embedding 任务(基于本地DB数据)
      */
-    private void submitLocalEmbeddingTask(Long videoId, String rawData, DeconstructVectorConfig config,
-                                          ExecutorService executor, Semaphore inFlightLimiter,
-                                          List<Future<?>> futures, AtomicInteger successCount,
-                                          AtomicInteger failCount, String dataType) {
+    private void executeEmbeddingTask(Long videoId, String rawData, DeconstructVectorConfig config,
+                                      AtomicInteger successCount, AtomicInteger failCount, String dataType) {
         if (videoId == null || !StringUtils.hasText(rawData)) {
             return;
         }
-        try {
-            inFlightLimiter.acquire();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            return;
-        }
         String configCode = config.getConfigCode();
-        futures.add(executor.submit(() -> {
-            try {
-                List<String> texts = "result_log".equals(dataType)
-                        ? extractTextsFromResultLogData(rawData, config)
-                        : extractTextsFromRawResult(rawData, config);
-                if (CollectionUtils.isEmpty(texts)) {
-                    log.info("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
-                    failCount.incrementAndGet();
-                    return;
-                }
-                int storeCount = vectorizeAndStore(config, videoId, texts);
-                if (storeCount > 0) {
-                    successCount.incrementAndGet();
-                } else {
-                    failCount.incrementAndGet();
-                }
-            } catch (Exception e) {
-                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+        try {
+            List<String> texts = "result_log".equals(dataType)
+                    ? extractTextsFromResultLogData(rawData, config)
+                    : extractTextsFromRawResult(rawData, config);
+            if (CollectionUtils.isEmpty(texts)) {
+                log.info("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
+                failCount.incrementAndGet();
+                return;
+            }
+            int storeCount = vectorizeAndStore(config, videoId, texts);
+            if (storeCount > 0) {
+                successCount.incrementAndGet();
+            } else {
                 failCount.incrementAndGet();
-            } finally {
-                inFlightLimiter.release();
             }
-        }));
+        } catch (Exception e) {
+            log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+            failCount.incrementAndGet();
+        }
     }
 
     /**
@@ -322,8 +302,8 @@ public class VideoVectorJob {
      * - 数组路径(以 [*] 结尾):从数组中提取满足置信度条件的文本
      * - 单对象路径(不以 [*] 结尾):对单个对象进行置信度检查后提取文本
      *
-     * @param json       原始JSON
-     * @param sourcePath 路径(如 $.keypoint_final.最终关键点列表[*] 或 $.最终选题)
+     * @param json        原始JSON
+     * @param sourcePath  路径(如 $.keypoint_final.最终关键点列表[*] 或 $.最终选题)
      * @param extractRule 提取规则JSON(如 {"text_field":"关键点","confidence_field":"置信度","confidence_threshold":0.8})
      * @return 满足置信度条件的文本列表
      */
@@ -516,7 +496,8 @@ public class VideoVectorJob {
 
     /**
      * 分页查询 videoId 列表(从本地解构结果表查询 result_json 来源)
-     * @param pageNum 页码(从0开始)
+     *
+     * @param pageNum  页码(从0开始)
      * @param pageSize 每页数量
      * @return videoId 列表
      */
@@ -617,57 +598,37 @@ public class VideoVectorJob {
             }
             log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-            // 3. 从本地DB批量查询解构结果并并发embedding
-            ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
-            Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
-            List<Future<?>> futures = new ArrayList<>();
-
-            try {
-                for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
-                    List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
-                            .selectResultsByVideoIds("aigc_deconstruct", batchIds);
-                    for (VideoDeconstructResult r : results) {
-                        if (!StringUtils.hasText(r.getResult())) {
-                            continue;
-                        }
-                        Long videoId = r.getVideoId();
-                        JSONObject dataContent = JSON.parseObject(r.getResult());
-                        if (dataContent == null) {
-                            continue;
-                        }
-                        tryCacheDecodeResult(videoId);
+            // 3. 从本地DB批量查询解构结果并顺序embedding
+            List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
+                    .selectResultsByVideoIds("aigc_deconstruct", needProcessIds);
+            for (VideoDeconstructResult r : results) {
+                if (!StringUtils.hasText(r.getResult())) {
+                    continue;
+                }
+                Long videoId = r.getVideoId();
+                JSONObject dataContent = JSON.parseObject(r.getResult());
+                if (dataContent == null) {
+                    continue;
+                }
+                tryCacheDecodeResult(videoId);
 
-                        try {
-                            inFlightLimiter.acquire();
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            return;
-                        }
-                        futures.add(embedExecutor.submit(() -> {
-                            try {
-                                List<String> texts = extractTextsFromDataContent(dataContent, config);
-                                if (CollectionUtils.isEmpty(texts)) {
-                                    log.info("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
-                                    totalFailCount.incrementAndGet();
-                                    return;
-                                }
-                                int storeCount = vectorizeAndStore(config, videoId, texts);
-                                if (storeCount > 0) {
-                                    totalSuccessCount.incrementAndGet();
-                                } else {
-                                    totalFailCount.incrementAndGet();
-                                }
-                            } catch (Exception e) {
-                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
-                                totalFailCount.incrementAndGet();
-                            } finally {
-                                inFlightLimiter.release();
-                            }
-                        }));
+                try {
+                    List<String> texts = extractTextsFromDataContent(dataContent, config);
+                    if (CollectionUtils.isEmpty(texts)) {
+                        log.info("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
+                        totalFailCount.incrementAndGet();
+                        continue;
+                    }
+                    int storeCount = vectorizeAndStore(config, videoId, texts);
+                    if (storeCount > 0) {
+                        totalSuccessCount.incrementAndGet();
+                    } else {
+                        totalFailCount.incrementAndGet();
                     }
+                } catch (Exception e) {
+                    log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                    totalFailCount.incrementAndGet();
                 }
-            } finally {
-                awaitAndShutdown(futures, embedExecutor, 30, "AIGC embedding");
             }
         } catch (Exception e) {
             log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
@@ -709,7 +670,7 @@ public class VideoVectorJob {
 
     /**
      * 从解构数据中按 点类型 + 置信度 + 实质/形式 + 贡献度 提取向量化文本
-     *
+     * <p>
      * 提取流程:
      * 1. 从 final_result_path 获取最终点列表,按 confidence_field >= confidence_threshold 过滤
      * 2. 对通过的点,从主数组 point_array_path 中匹配对应点的详细解构
@@ -895,6 +856,7 @@ public class VideoVectorJob {
 
     /**
      * 从 dataContent 中提取选题文本(向后兼容)
+     *
      * @deprecated 请使用 extractTextsFromDataContent(dataContent, config)
      */
     @Deprecated
@@ -1212,26 +1174,17 @@ public class VideoVectorJob {
             }
             log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-            // 3. 从本地DB批量查询解构结果并并发embedding
-            ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
-            Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
-            List<Future<?>> futures = new ArrayList<>();
-
-            try {
-                for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
-                    List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
-                            .selectResultsByVideoIds("result_log", batchIds);
-                    for (VideoDeconstructResult r : results) {
-                        if (!StringUtils.hasText(r.getResult())) {
-                            continue;
-                        }
-                        submitLocalEmbeddingTask(r.getVideoId(), r.getResult(), config,
-                                embedExecutor, inFlightLimiter, futures,
-                                totalSuccessCount, totalFailCount, "result_log");
+            // 3. 从本地DB批量查询解构结果并顺序embedding
+            for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
+                List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
+                        .selectResultsByVideoIds("result_log", batchIds);
+                for (VideoDeconstructResult r : results) {
+                    if (!StringUtils.hasText(r.getResult())) {
+                        continue;
                     }
+                    executeEmbeddingTask(r.getVideoId(), r.getResult(), config,
+                            totalSuccessCount, totalFailCount, "result_log");
                 }
-            } finally {
-                awaitAndShutdown(futures, embedExecutor, 30, "embedding");
             }
         } catch (Exception e) {
             log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);