|
@@ -32,6 +32,7 @@ import javax.annotation.Resource;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
+import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
@@ -122,105 +123,11 @@ public class VideoVectorJob {
|
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
|
for (DeconstructVectorConfig config : configs) {
|
|
for (DeconstructVectorConfig config : configs) {
|
|
|
- configFutures.add(configExecutor.submit(() -> {
|
|
|
|
|
- String configCode = config.getConfigCode();
|
|
|
|
|
- try {
|
|
|
|
|
- // 4.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
|
|
|
|
|
- Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
-
|
|
|
|
|
- // 4.1.1 检查已存在记录中 text 为空的,删除后重新向量化
|
|
|
|
|
- if (!existingVideoIds.isEmpty()) {
|
|
|
|
|
- Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
|
|
|
|
|
- if (!emptyTextIds.isEmpty()) {
|
|
|
|
|
- log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
|
|
|
|
|
- vectorStoreService.deleteBatch(configCode, emptyTextIds);
|
|
|
|
|
- existingVideoIds.removeAll(emptyTextIds);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 4.2 过滤出需要处理的 videoId(排除已有向量的)
|
|
|
|
|
- List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
- .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
|
-
|
|
|
|
|
- if (needProcessIds.isEmpty()) {
|
|
|
|
|
- log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
-
|
|
|
|
|
- // 4.3 流式查询 raw_result,Semaphore 控制并发在途数(不阻塞流式读取,避免OOM)
|
|
|
|
|
- ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
|
|
- Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
|
|
- List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
-
|
|
|
|
|
- try {
|
|
|
|
|
- // 分批查询,防止 IN 子句 ID 过多导致 ODPS SQL 超长
|
|
|
|
|
- for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
- String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT content_id, raw_result FROM videoods.content_profile " +
|
|
|
|
|
- "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);", idsStr);
|
|
|
|
|
-
|
|
|
|
|
- OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
- String rawResult = record.getString(1);
|
|
|
|
|
- if (videoId == null || !StringUtils.hasText(rawResult)) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- try {
|
|
|
|
|
- inFlightLimiter.acquire();
|
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- futures.add(embedExecutor.submit(() -> {
|
|
|
|
|
- try {
|
|
|
|
|
- List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
- log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (storeCount > 0) {
|
|
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
|
|
- } else {
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- } finally {
|
|
|
|
|
- inFlightLimiter.release();
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
- } finally {
|
|
|
|
|
- for (Future<?> future : futures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(30, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("embedding 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- embedExecutor.shutdown();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
- }
|
|
|
|
|
- // 等待所有配置处理完成
|
|
|
|
|
- for (Future<?> future : configFutures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(10, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ configFutures.add(configExecutor.submit(() ->
|
|
|
|
|
+ processConfigForRawResult(config, auditPassedIds, totalSuccessCount, totalFailCount)
|
|
|
|
|
+ ));
|
|
|
}
|
|
}
|
|
|
- configExecutor.shutdown();
|
|
|
|
|
|
|
+ awaitAndShutdown(configFutures, configExecutor, 10, "配置并发");
|
|
|
|
|
|
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
|
if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
@@ -238,6 +145,129 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理单个配置下的 raw_result 视频向量化
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、流式查询ODPS并并发embedding
|
|
|
|
|
+ */
|
|
|
|
|
+ private void processConfigForRawResult(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
|
|
+ AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 查询已有向量并清理空text记录
|
|
|
|
|
+ Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
+ cleanEmptyTextRecords(configCode, existingVideoIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 过滤出需要处理的 videoId
|
|
|
|
|
+ List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
+ .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ if (needProcessIds.isEmpty()) {
|
|
|
|
|
+ log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 流式查询并并发embedding
|
|
|
|
|
+ ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
|
|
+ Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT content_id, raw_result FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);", idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record ->
|
|
|
|
|
+ submitEmbeddingTask(record, config, embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
+ totalSuccessCount, totalFailCount, "raw_result")
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 提交单条记录的 embedding 任务到线程池
|
|
|
|
|
+ * 使用 Semaphore 控制在途并发数
|
|
|
|
|
+ */
|
|
|
|
|
+ private void submitEmbeddingTask(com.aliyun.odps.data.Record record, DeconstructVectorConfig config,
|
|
|
|
|
+ ExecutorService executor, Semaphore inFlightLimiter,
|
|
|
|
|
+ List<Future<?>> futures, AtomicInteger successCount,
|
|
|
|
|
+ AtomicInteger failCount, String dataType) {
|
|
|
|
|
+ Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
+ String rawData = record.getString(1);
|
|
|
|
|
+ if (videoId == null || !StringUtils.hasText(rawData)) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ inFlightLimiter.acquire();
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ futures.add(executor.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<String> texts = "result_log".equals(dataType)
|
|
|
|
|
+ ? extractTextsFromResultLogData(rawData, config)
|
|
|
|
|
+ : extractTextsFromRawResult(rawData, config);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (storeCount > 0) {
|
|
|
|
|
+ successCount.incrementAndGet();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ inFlightLimiter.release();
|
|
|
|
|
+ }
|
|
|
|
|
+ }));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 检查已存在记录中 text 为空的,删除后重新向量化
|
|
|
|
|
+ * 会从 existingVideoIds 中移除被清理的ID
|
|
|
|
|
+ */
|
|
|
|
|
+ private void cleanEmptyTextRecords(String configCode, Set<Long> existingVideoIds) {
|
|
|
|
|
+ if (existingVideoIds.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
|
|
|
|
|
+ if (!emptyTextIds.isEmpty()) {
|
|
|
|
|
+ log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
|
|
|
|
|
+ vectorStoreService.deleteBatch(configCode, emptyTextIds);
|
|
|
|
|
+ existingVideoIds.removeAll(emptyTextIds);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 等待所有 Future 完成并关闭 ExecutorService
|
|
|
|
|
+ */
|
|
|
|
|
+ private void awaitAndShutdown(List<Future<?>> futures, ExecutorService executor,
|
|
|
|
|
+ long timeoutMinutes, String taskDesc) {
|
|
|
|
|
+ for (Future<?> future : futures) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ future.get(timeoutMinutes, TimeUnit.MINUTES);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("{} 并发任务等待异常: {}", taskDesc, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ executor.shutdown();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 根据配置从 raw_result 中提取文本
|
|
* 根据配置从 raw_result 中提取文本
|
|
|
* 当配置了 extract_rule 时,启用置信度过滤逻辑
|
|
* 当配置了 extract_rule 时,启用置信度过滤逻辑
|
|
@@ -560,95 +590,12 @@ public class VideoVectorJob {
|
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
|
for (DeconstructVectorConfig config : configs) {
|
|
for (DeconstructVectorConfig config : configs) {
|
|
|
- configFutures.add(configExecutor.submit(() -> {
|
|
|
|
|
- String configCode = config.getConfigCode();
|
|
|
|
|
- try {
|
|
|
|
|
- // 5.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
|
|
|
|
|
- Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
-
|
|
|
|
|
- // 5.1.1 检查已存在记录中 text 为空的,删除后重新向量化
|
|
|
|
|
- if (!existingVideoIds.isEmpty()) {
|
|
|
|
|
- Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
|
|
|
|
|
- if (!emptyTextIds.isEmpty()) {
|
|
|
|
|
- log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
|
|
|
|
|
- vectorStoreService.deleteBatch(configCode, emptyTextIds);
|
|
|
|
|
- existingVideoIds.removeAll(emptyTextIds);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
- .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
|
- if (needProcessIds.isEmpty()) {
|
|
|
|
|
- log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
-
|
|
|
|
|
- // 5.2 并发调用 detail 接口,提取选题并向量化存储
|
|
|
|
|
- ExecutorService detailExecutor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
|
|
- List<Future<?>> detailFutures = new ArrayList<>();
|
|
|
|
|
- for (Long videoId : needProcessIds) {
|
|
|
|
|
- detailFutures.add(detailExecutor.submit(() -> {
|
|
|
|
|
- try {
|
|
|
|
|
- Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
|
|
- if (taskInstanceId == null) {
|
|
|
|
|
- log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 调用 detail 接口获取 dataContent(解构详情)
|
|
|
|
|
- JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
|
|
- if (dataContent == null) {
|
|
|
|
|
- log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 从 dataContent 中提取文本(支持置信度过滤)
|
|
|
|
|
- List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
- log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 向量化并存储(多点模式返回成功数>0即为成功)
|
|
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (storeCount > 0) {
|
|
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
|
|
- } else {
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
- }
|
|
|
|
|
- for (Future<?> future : detailFutures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(30, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("AIGC detail 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- detailExecutor.shutdown();
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
- }
|
|
|
|
|
- // 等待所有配置处理完成
|
|
|
|
|
- for (Future<?> future : configFutures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(30, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ configFutures.add(configExecutor.submit(() ->
|
|
|
|
|
+ processConfigForAigc(config, auditPassedIds, videoIdToTaskInstanceId,
|
|
|
|
|
+ totalSuccessCount, totalFailCount)
|
|
|
|
|
+ ));
|
|
|
}
|
|
}
|
|
|
- configExecutor.shutdown();
|
|
|
|
|
|
|
+ awaitAndShutdown(configFutures, configExecutor, 30, "配置并发");
|
|
|
|
|
|
|
|
log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount.get(), totalFailCount.get());
|
|
log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount.get(), totalFailCount.get());
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
@@ -658,6 +605,85 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理单个配置下的 AIGC 视频向量化
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、并发调用detail接口并向量化
|
|
|
|
|
+ */
|
|
|
|
|
+ private void processConfigForAigc(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
|
|
+ Map<Long, Long> videoIdToTaskInstanceId,
|
|
|
|
|
+ AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 查询已有向量并清理空text记录
|
|
|
|
|
+ Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
+ cleanEmptyTextRecords(configCode, existingVideoIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 过滤出需要处理的 videoId
|
|
|
|
|
+ List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
+ .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ if (needProcessIds.isEmpty()) {
|
|
|
|
|
+ log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 并发调用 detail 接口,提取文本并向量化
|
|
|
|
|
+ ExecutorService detailExecutor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
|
|
+ List<Future<?>> detailFutures = new ArrayList<>();
|
|
|
|
|
+ for (Long videoId : needProcessIds) {
|
|
|
|
|
+ detailFutures.add(detailExecutor.submit(() ->
|
|
|
|
|
+ processAigcSingleVideo(videoId, config, videoIdToTaskInstanceId,
|
|
|
|
|
+ totalSuccessCount, totalFailCount)
|
|
|
|
|
+ ));
|
|
|
|
|
+ }
|
|
|
|
|
+ awaitAndShutdown(detailFutures, detailExecutor, 30, "AIGC detail");
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理单个 AIGC 视频的向量化
|
|
|
|
|
+ */
|
|
|
|
|
+ private void processAigcSingleVideo(Long videoId, DeconstructVectorConfig config,
|
|
|
|
|
+ Map<Long, Long> videoIdToTaskInstanceId,
|
|
|
|
|
+ AtomicInteger successCount, AtomicInteger failCount) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
|
|
+ if (taskInstanceId == null) {
|
|
|
|
|
+ log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
|
|
+ if (dataContent == null) {
|
|
|
|
|
+ log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (storeCount > 0) {
|
|
|
|
|
+ successCount.incrementAndGet();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 从 dataContent 中提取文本
|
|
* 从 dataContent 中提取文本
|
|
|
* 根据配置的 extract_rule 决定是否进行置信度过滤
|
|
* 根据配置的 extract_rule 决定是否进行置信度过滤
|
|
@@ -949,103 +975,11 @@ public class VideoVectorJob {
|
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
|
for (DeconstructVectorConfig config : configs) {
|
|
for (DeconstructVectorConfig config : configs) {
|
|
|
- configFutures.add(configExecutor.submit(() -> {
|
|
|
|
|
- String configCode = config.getConfigCode();
|
|
|
|
|
- try {
|
|
|
|
|
- // 5.1 已向量化过滤
|
|
|
|
|
- Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
-
|
|
|
|
|
- // 5.1.1 检查已存在记录中 text 为空的,删除后重新向量化
|
|
|
|
|
- if (!existingVideoIds.isEmpty()) {
|
|
|
|
|
- Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
|
|
|
|
|
- if (!emptyTextIds.isEmpty()) {
|
|
|
|
|
- log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
|
|
|
|
|
- vectorStoreService.deleteBatch(configCode, emptyTextIds);
|
|
|
|
|
- existingVideoIds.removeAll(emptyTextIds);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
- .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
|
- if (needProcessIds.isEmpty()) {
|
|
|
|
|
- log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
-
|
|
|
|
|
- // 5.2 流式查询 result_log data,Semaphore 控制并发在途数(不阻塞流式读取,避免OOM)
|
|
|
|
|
- ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
|
|
- Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
|
|
- List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
-
|
|
|
|
|
- try {
|
|
|
|
|
- // 分批查询,防止 IN 子句 ID 过多导致 ODPS SQL 超长
|
|
|
|
|
- for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
- String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
|
|
- "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
|
|
-
|
|
|
|
|
- OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
- String data = record.getString(1);
|
|
|
|
|
- if (videoId == null || !StringUtils.hasText(data)) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- try {
|
|
|
|
|
- inFlightLimiter.acquire();
|
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- futures.add(embedExecutor.submit(() -> {
|
|
|
|
|
- try {
|
|
|
|
|
- List<String> texts = extractTextsFromResultLogData(data, config);
|
|
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
- log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (storeCount > 0) {
|
|
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
|
|
- } else {
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
|
|
- } finally {
|
|
|
|
|
- inFlightLimiter.release();
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
- } finally {
|
|
|
|
|
- for (Future<?> future : futures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(30, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("embedding 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- embedExecutor.shutdown();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }));
|
|
|
|
|
|
|
+ configFutures.add(configExecutor.submit(() ->
|
|
|
|
|
+ processConfigForResultLog(config, auditPassedIds, totalSuccessCount, totalFailCount)
|
|
|
|
|
+ ));
|
|
|
}
|
|
}
|
|
|
- // 等待所有配置处理完成
|
|
|
|
|
- for (Future<?> future : configFutures) {
|
|
|
|
|
- try {
|
|
|
|
|
- future.get(10, TimeUnit.MINUTES);
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置并发任务等待异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- configExecutor.shutdown();
|
|
|
|
|
|
|
+ awaitAndShutdown(configFutures, configExecutor, 10, "配置并发");
|
|
|
|
|
|
|
|
if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
|
|
log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
|
|
@@ -1112,6 +1046,53 @@ public class VideoVectorJob {
|
|
|
return videoIds;
|
|
return videoIds;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理单个配置下的 result_log 视频向量化
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、流式查询ODPS并并发embedding
|
|
|
|
|
+ */
|
|
|
|
|
+ private void processConfigForResultLog(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
|
|
+ AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 查询已有向量并清理空text记录
|
|
|
|
|
+ Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
|
|
|
|
|
+ cleanEmptyTextRecords(configCode, existingVideoIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 过滤出需要处理的 videoId
|
|
|
|
|
+ List<Long> needProcessIds = auditPassedIds.stream()
|
|
|
|
|
+ .filter(id -> !existingVideoIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ if (needProcessIds.isEmpty()) {
|
|
|
|
|
+ log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 流式查询并并发embedding
|
|
|
|
|
+ ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
|
|
+ Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
|
|
+ "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record ->
|
|
|
|
|
+ submitEmbeddingTask(record, config, embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
+ totalSuccessCount, totalFailCount, "result_log")
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 从 result_log 的 data 字段中提取文本
|
|
* 从 result_log 的 data 字段中提取文本
|
|
|
* data 结构参考 r.json,直接按 sourcePath 提取单点文本值
|
|
* data 结构参考 r.json,直接按 sourcePath 提取单点文本值
|
|
@@ -1168,75 +1149,50 @@ public class VideoVectorJob {
|
|
|
public ReturnT<String> allVideoVectorJob(String param) {
|
|
public ReturnT<String> allVideoVectorJob(String param) {
|
|
|
log.info("开始执行聚合视频向量化任务, param: {}", param);
|
|
log.info("开始执行聚合视频向量化任务, param: {}", param);
|
|
|
|
|
|
|
|
|
|
+ // 定义子任务列表:名称 + 执行逻辑
|
|
|
|
|
+ List<Map.Entry<String, Supplier<ReturnT<String>>>> subTasks = Arrays.asList(
|
|
|
|
|
+ new AbstractMap.SimpleEntry<>("vectorVideoJob", () -> vectorVideoJob(param)),
|
|
|
|
|
+ new AbstractMap.SimpleEntry<>("aigcVideoVectorJob", () -> aigcVideoVectorJob(param)),
|
|
|
|
|
+ new AbstractMap.SimpleEntry<>("resultLogVideoVectorJob", () -> resultLogVideoVectorJob(param)),
|
|
|
|
|
+ new AbstractMap.SimpleEntry<>("videoTitleVectorJob", () -> videoTitleVectorJob.videoTitleVectorJob(param))
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
boolean hasFailure = false;
|
|
boolean hasFailure = false;
|
|
|
StringBuilder failMessages = new StringBuilder();
|
|
StringBuilder failMessages = new StringBuilder();
|
|
|
|
|
|
|
|
- // 1. 执行 vectorVideoJob
|
|
|
|
|
- try {
|
|
|
|
|
- log.info("===== 开始执行子任务: vectorVideoJob =====");
|
|
|
|
|
- ReturnT<String> result = vectorVideoJob(param);
|
|
|
|
|
- if (result.getCode() != ReturnT.SUCCESS_CODE) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("vectorVideoJob失败: ").append(result.getMsg()).append("; ");
|
|
|
|
|
- }
|
|
|
|
|
- log.info("===== 子任务 vectorVideoJob 执行完成, code={} =====", result.getCode());
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("vectorVideoJob异常: ").append(e.getMessage()).append("; ");
|
|
|
|
|
- log.error("子任务 vectorVideoJob 执行异常: {}", e.getMessage(), e);
|
|
|
|
|
|
|
+ for (Map.Entry<String, Supplier<ReturnT<String>>> task : subTasks) {
|
|
|
|
|
+ hasFailure |= executeSubTask(task.getKey(), task.getValue(), failMessages);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 2. 执行 aigcVideoVectorJob
|
|
|
|
|
- try {
|
|
|
|
|
- log.info("===== 开始执行子任务: aigcVideoVectorJob =====");
|
|
|
|
|
- ReturnT<String> result = aigcVideoVectorJob(param);
|
|
|
|
|
- if (result.getCode() != ReturnT.SUCCESS_CODE) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("aigcVideoVectorJob失败: ").append(result.getMsg()).append("; ");
|
|
|
|
|
- }
|
|
|
|
|
- log.info("===== 子任务 aigcVideoVectorJob 执行完成, code={} =====", result.getCode());
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("aigcVideoVectorJob异常: ").append(e.getMessage()).append("; ");
|
|
|
|
|
- log.error("子任务 aigcVideoVectorJob 执行异常: {}", e.getMessage(), e);
|
|
|
|
|
|
|
+ if (hasFailure) {
|
|
|
|
|
+ log.warn("聚合视频向量化任务完成,部分子任务失败: {}", failMessages);
|
|
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "部分子任务失败: " + failMessages);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 3. 执行 resultLogVideoVectorJob
|
|
|
|
|
- try {
|
|
|
|
|
- log.info("===== 开始执行子任务: resultLogVideoVectorJob =====");
|
|
|
|
|
- ReturnT<String> result = resultLogVideoVectorJob(param);
|
|
|
|
|
- if (result.getCode() != ReturnT.SUCCESS_CODE) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("resultLogVideoVectorJob失败: ").append(result.getMsg()).append("; ");
|
|
|
|
|
- }
|
|
|
|
|
- log.info("===== 子任务 resultLogVideoVectorJob 执行完成, code={} =====", result.getCode());
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("resultLogVideoVectorJob异常: ").append(e.getMessage()).append("; ");
|
|
|
|
|
- log.error("子任务 resultLogVideoVectorJob 执行异常: {}", e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ log.info("聚合视频向量化任务全部完成");
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 4. 执行 videoTitleVectorJob
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 执行单个子任务,捕获异常并记录失败信息
|
|
|
|
|
+ *
|
|
|
|
|
+ * @return true 表示该子任务失败
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean executeSubTask(String taskName, Supplier<ReturnT<String>> taskSupplier,
|
|
|
|
|
+ StringBuilder failMessages) {
|
|
|
try {
|
|
try {
|
|
|
- log.info("===== 开始执行子任务: videoTitleVectorJob =====");
|
|
|
|
|
- ReturnT<String> result = videoTitleVectorJob.videoTitleVectorJob(param);
|
|
|
|
|
|
|
+ log.info("===== 开始执行子任务: {} =====", taskName);
|
|
|
|
|
+ ReturnT<String> result = taskSupplier.get();
|
|
|
|
|
+ log.info("===== 子任务 {} 执行完成, code={} =====", taskName, result.getCode());
|
|
|
if (result.getCode() != ReturnT.SUCCESS_CODE) {
|
|
if (result.getCode() != ReturnT.SUCCESS_CODE) {
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("videoTitleVectorJob失败: ").append(result.getMsg()).append("; ");
|
|
|
|
|
|
|
+ failMessages.append(taskName).append("失败: ").append(result.getMsg()).append("; ");
|
|
|
|
|
+ return true;
|
|
|
}
|
|
}
|
|
|
- log.info("===== 子任务 videoTitleVectorJob 执行完成, code={} =====", result.getCode());
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- hasFailure = true;
|
|
|
|
|
- failMessages.append("videoTitleVectorJob异常: ").append(e.getMessage()).append("; ");
|
|
|
|
|
- log.error("子任务 videoTitleVectorJob 执行异常: {}", e.getMessage(), e);
|
|
|
|
|
|
|
+ failMessages.append(taskName).append("异常: ").append(e.getMessage()).append("; ");
|
|
|
|
|
+ log.error("子任务 {} 执行异常: {}", taskName, e.getMessage(), e);
|
|
|
|
|
+ return true;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- if (hasFailure) {
|
|
|
|
|
- log.warn("聚合视频向量化任务完成,部分子任务失败: {}", failMessages);
|
|
|
|
|
- return new ReturnT<>(ReturnT.FAIL_CODE, "部分子任务失败: " + failMessages);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- log.info("聚合视频向量化任务全部完成");
|
|
|
|
|
- return ReturnT.SUCCESS;
|
|
|
|
|
|
|
+ return false;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|