|
|
@@ -3,6 +3,7 @@ package com.tzld.videoVector.job;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.aliyun.odps.data.Record;
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.tzld.videoVector.api.AigcApiService;
|
|
|
import com.tzld.videoVector.api.VideoApiService;
|
|
|
@@ -59,6 +60,9 @@ public class VideoVectorJob {
|
|
|
private AigcApiService aigcApiService;
|
|
|
|
|
|
|
|
|
+ @ApolloJsonValue("${aigc.deconstruct.task.ids:[46, 57, 58]}")
|
|
|
+ private List<Integer> aigcDeconstructTaskIds;
|
|
|
+
|
|
|
/**
|
|
|
* 视频向量化
|
|
|
* 根据配置对解构内容进行向量化
|
|
|
@@ -130,45 +134,63 @@ public class VideoVectorJob {
|
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
- // 4.3 批量查询需要处理的视频 raw_result
|
|
|
- for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
|
|
|
- Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
|
|
|
- if (videoRawResults.isEmpty()) {
|
|
|
- log.warn("配置 {} 未查询到任何 raw_result", configCode);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // 4.4 逐个处理
|
|
|
- for (Long videoId : partition) {
|
|
|
- try {
|
|
|
- String rawResult = videoRawResults.get(videoId);
|
|
|
- if (!StringUtils.hasText(rawResult)) {
|
|
|
- log.debug("videoId={} raw_result 为空,跳过", videoId);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // 4.3 流式查询 raw_result,Semaphore 控制并发在途数(不阻塞流式读取,避免OOM)
|
|
|
+ ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
+ Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
|
- // 根据配置提取文本(支持置信度过滤)
|
|
|
- List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
- log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
+ try {
|
|
|
+ // 分批查询,防止 IN 子句 ID 过多导致 ODPS SQL 超长
|
|
|
+ for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
+ String sql = String.format(
|
|
|
+ "SELECT content_id, raw_result FROM videoods.content_profile " +
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);", idsStr);
|
|
|
+
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
+ Long videoId = Long.valueOf(record.getString(0));
|
|
|
+ String rawResult = record.getString(1);
|
|
|
+ if (videoId == null || !StringUtils.hasText(rawResult)) {
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
- // 向量化并存储(多点模式返回成功数>0即为成功)
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
- if (storeCount > 0) {
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
- } else {
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
+ try {
|
|
|
+ inFlightLimiter.acquire();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
+ futures.add(embedExecutor.submit(() -> {
|
|
|
+ try {
|
|
|
+ List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
+ if (storeCount > 0) {
|
|
|
+ totalSuccessCount.incrementAndGet();
|
|
|
+ } else {
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ } finally {
|
|
|
+ inFlightLimiter.release();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ });
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ for (Future<?> future : futures) {
|
|
|
+ try {
|
|
|
+ future.get(30, TimeUnit.MINUTES);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
+ log.error("embedding 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ embedExecutor.shutdown();
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
@@ -201,41 +223,6 @@ public class VideoVectorJob {
|
|
|
}
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * 批量查询视频的 raw_result
|
|
|
- */
|
|
|
- private Map<Long, String> batchQueryVideoRawResults(List<Long> videoIds) {
|
|
|
- if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
-
|
|
|
- // 构建IN查询
|
|
|
- String idsStr = videoIds.stream()
|
|
|
- .map(String::valueOf)
|
|
|
- .collect(Collectors.joining(","));
|
|
|
-
|
|
|
- String sql = String.format(
|
|
|
- "SELECT content_id, raw_result " +
|
|
|
- "FROM videoods.content_profile " +
|
|
|
- "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);",
|
|
|
- idsStr);
|
|
|
-
|
|
|
- List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
- if (records == null || records.isEmpty()) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
-
|
|
|
- Map<Long, String> result = new HashMap<>();
|
|
|
- for (Record record : records) {
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
- String rawResult = record.getString(1);
|
|
|
- if (videoId != null && rawResult != null) {
|
|
|
- result.put(videoId, rawResult);
|
|
|
- }
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 根据配置从 raw_result 中提取文本
|
|
|
* 当配置了 extract_rule 时,启用置信度过滤逻辑
|
|
|
@@ -492,22 +479,24 @@ public class VideoVectorJob {
|
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
|
|
|
|
- // 2. 从 AIGC API 获取任务输入列表(bizUniqueId 为视频 ID)
|
|
|
- List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(46);
|
|
|
- if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
- log.info("AIGC API 未返回任务输入数据");
|
|
|
- return ReturnT.SUCCESS;
|
|
|
- }
|
|
|
- log.info("获取到 {} 条 AIGC 任务输入数据", taskInputList.size());
|
|
|
-
|
|
|
- // 3. 构建 videoId -> taskInstanceId 映射
|
|
|
+ // 2. 从 AIGC API 获取任务输入列表(bizUniqueId 为视频 ID),循环所有配置的任务ID
|
|
|
Map<Long, Long> videoIdToTaskInstanceId = new HashMap<>();
|
|
|
- for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
- try {
|
|
|
- Long videoId = Long.parseLong(input.getBizUniqueId());
|
|
|
- videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
|
|
|
- } catch (NumberFormatException e) {
|
|
|
- log.warn("bizUniqueId 格式非法,跳过: {}", input.getBizUniqueId());
|
|
|
+ for (Integer taskId : aigcDeconstructTaskIds) {
|
|
|
+ List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(taskId);
|
|
|
+ if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
+ log.info("AIGC API taskId={} 未返回任务输入数据", taskId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ log.info("taskId={} 获取到 {} 条 AIGC 任务输入数据", taskId, taskInputList.size());
|
|
|
+
|
|
|
+ // 构建 videoId -> taskInstanceId 映射
|
|
|
+ for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
+ try {
|
|
|
+ Long videoId = Long.parseLong(input.getBizUniqueId());
|
|
|
+ videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.warn("bizUniqueId 格式非法,跳过: {}", input.getBizUniqueId());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (videoIdToTaskInstanceId.isEmpty()) {
|
|
|
@@ -515,7 +504,7 @@ public class VideoVectorJob {
|
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
|
List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
|
|
|
- log.info("共 {} 个有效 videoId", allVideoIds.size());
|
|
|
+ log.info("共 {} 个有效 videoId(来自 {} 个任务)", allVideoIds.size(), aigcDeconstructTaskIds.size());
|
|
|
|
|
|
// 4. 先进行审核过滤(只过滤一次,避免在 config 循环内重复调用)
|
|
|
List<Long> auditPassedIds = filterAuditPassedIds(allVideoIds);
|
|
|
@@ -546,44 +535,56 @@ public class VideoVectorJob {
|
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
- // 5.2 逐个调用 detail 接口,提取选题并向量化存储
|
|
|
+ // 5.2 并发调用 detail 接口,提取选题并向量化存储
|
|
|
+ ExecutorService detailExecutor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
+ List<Future<?>> detailFutures = new ArrayList<>();
|
|
|
for (Long videoId : needProcessIds) {
|
|
|
- try {
|
|
|
- Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
- if (taskInstanceId == null) {
|
|
|
- log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
- }
|
|
|
+ detailFutures.add(detailExecutor.submit(() -> {
|
|
|
+ try {
|
|
|
+ Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
+ if (taskInstanceId == null) {
|
|
|
+ log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 调用 detail 接口获取 dataContent(解构详情)
|
|
|
- JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
- if (dataContent == null) {
|
|
|
- log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // 调用 detail 接口获取 dataContent(解构详情)
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
+ if (dataContent == null) {
|
|
|
+ log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 从 dataContent 中提取文本(支持置信度过滤)
|
|
|
- List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
- log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // 从 dataContent 中提取文本(支持置信度过滤)
|
|
|
+ List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 向量化并存储(多点模式返回成功数>0即为成功)
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
- if (storeCount > 0) {
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
- } else {
|
|
|
+ // 向量化并存储(多点模式返回成功数>0即为成功)
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
+ if (storeCount > 0) {
|
|
|
+ totalSuccessCount.incrementAndGet();
|
|
|
+ } else {
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
totalFailCount.incrementAndGet();
|
|
|
}
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ for (Future<?> future : detailFutures) {
|
|
|
+ try {
|
|
|
+ future.get(30, TimeUnit.MINUTES);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
+ log.error("AIGC detail 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ detailExecutor.shutdown();
|
|
|
} catch (Exception e) {
|
|
|
log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
}
|
|
|
@@ -655,9 +656,8 @@ public class VideoVectorJob {
|
|
|
public ReturnT<String> retryDeconstructJob(String param) {
|
|
|
log.info("开始执行解构任务重试, param: {}", param);
|
|
|
|
|
|
- int totalRetryCount = 0;
|
|
|
- int successCount = 0;
|
|
|
- int failCount = 0;
|
|
|
+ AtomicInteger successCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
|
|
|
try {
|
|
|
// 1. 查询超时的任务(创建时间超过1小时,状态为PENDING或RUNNING)
|
|
|
@@ -673,62 +673,74 @@ public class VideoVectorJob {
|
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
|
|
|
|
- log.info("查询到 {} 个超时的解构任务", timeoutTasks.size());
|
|
|
+ int totalRetryCount = timeoutTasks.size();
|
|
|
+ log.info("查询到 {} 个超时的解构任务", totalRetryCount);
|
|
|
|
|
|
- // 2. 逐个重新查询解构结果
|
|
|
+ // 2. 并发重新查询解构结果(getDeconstructResult 是外部HTTP调用,并发可大幅提升速度)
|
|
|
+ ExecutorService retryExecutor = Executors.newFixedThreadPool(VectorConstants.RETRY_PARALLELISM);
|
|
|
+ List<Future<?>> retryFutures = new ArrayList<>();
|
|
|
for (DeconstructContent content : timeoutTasks) {
|
|
|
- totalRetryCount++;
|
|
|
- String taskId = content.getTaskId();
|
|
|
+ retryFutures.add(retryExecutor.submit(() -> {
|
|
|
+ String taskId = content.getTaskId();
|
|
|
|
|
|
- try {
|
|
|
- log.info("重试解构任务,taskId={}, contentId={}, createTime={}",
|
|
|
- taskId, content.getId(), content.getCreateTime());
|
|
|
+ try {
|
|
|
+ log.info("重试解构任务,taskId={}, contentId={}, createTime={}",
|
|
|
+ taskId, content.getId(), content.getCreateTime());
|
|
|
|
|
|
- // 调用API重新查询解构结果
|
|
|
- DeconstructResult result = deconstructService.getDeconstructResult(taskId);
|
|
|
+ // 调用API重新查询解构结果
|
|
|
+ DeconstructResult result = deconstructService.getDeconstructResult(taskId);
|
|
|
|
|
|
- if (result == null) {
|
|
|
- log.warn("重试解构任务失败,API返回空,taskId={}", taskId);
|
|
|
- updateContentStatus(content, (byte) 3, "API返回空");
|
|
|
- failCount++;
|
|
|
- continue;
|
|
|
- }
|
|
|
+ if (result == null) {
|
|
|
+ log.warn("重试解构任务失败,API返回空,taskId={}", taskId);
|
|
|
+ updateContentStatus(content, (byte) 3, "API返回空");
|
|
|
+ failCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 更新数据库记录
|
|
|
- if (result.isFinished()) {
|
|
|
- if (result.isSuccess()) {
|
|
|
- // 成功
|
|
|
- content.setStatus((short) 2);
|
|
|
- content.setResultJson(result.getResult());
|
|
|
- content.setPointUrl(result.getPointUrl());
|
|
|
- content.setWeightUrl(result.getWeightUrl());
|
|
|
- content.setPatternUrl(result.getPatternUrl());
|
|
|
+ // 更新数据库记录
|
|
|
+ if (result.isFinished()) {
|
|
|
+ if (result.isSuccess()) {
|
|
|
+ // 成功
|
|
|
+ content.setStatus((short) 2);
|
|
|
+ content.setResultJson(result.getResult());
|
|
|
+ content.setPointUrl(result.getPointUrl());
|
|
|
+ content.setWeightUrl(result.getWeightUrl());
|
|
|
+ content.setPatternUrl(result.getPatternUrl());
|
|
|
+ content.setUpdateTime(new Date());
|
|
|
+ deconstructContentMapper.updateByPrimaryKeySelective(content);
|
|
|
+ successCount.incrementAndGet();
|
|
|
+ log.info("重试解构任务成功,taskId={}", taskId);
|
|
|
+ } else {
|
|
|
+ // 失败
|
|
|
+ updateContentStatus(content, (short) 3, result.getReason());
|
|
|
+ failCount.incrementAndGet();
|
|
|
+ log.warn("重试解构任务失败,taskId={}, reason={}", taskId, result.getReason());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 仍在处理中,更新状态
|
|
|
+ content.setStatus(result.getStatus().shortValue());
|
|
|
content.setUpdateTime(new Date());
|
|
|
deconstructContentMapper.updateByPrimaryKeySelective(content);
|
|
|
- successCount++;
|
|
|
- log.info("重试解构任务成功,taskId={}", taskId);
|
|
|
- } else {
|
|
|
- // 失败
|
|
|
- updateContentStatus(content, (short) 3, result.getReason());
|
|
|
- failCount++;
|
|
|
- log.warn("重试解构任务失败,taskId={}, reason={}", taskId, result.getReason());
|
|
|
+ log.info("解构任务仍在处理中,taskId={}, status={}", taskId, result.getStatusDesc());
|
|
|
}
|
|
|
- } else {
|
|
|
- // 仍在处理中,更新状态
|
|
|
- content.setStatus(result.getStatus().shortValue());
|
|
|
- content.setUpdateTime(new Date());
|
|
|
- deconstructContentMapper.updateByPrimaryKeySelective(content);
|
|
|
- log.info("解构任务仍在处理中,taskId={}, status={}", taskId, result.getStatusDesc());
|
|
|
- }
|
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("重试解构任务异常,taskId={}, error={}", taskId, e.getMessage(), e);
|
|
|
+ failCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ for (Future<?> future : retryFutures) {
|
|
|
+ try {
|
|
|
+ future.get(10, TimeUnit.MINUTES);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("重试解构任务异常,taskId={}, error={}", taskId, e.getMessage(), e);
|
|
|
- failCount++;
|
|
|
+ log.error("重试并发任务等待异常: {}", e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ retryExecutor.shutdown();
|
|
|
|
|
|
log.info("解构任务重试完成,总数: {}, 成功: {}, 失败: {}",
|
|
|
- totalRetryCount, successCount, failCount);
|
|
|
+ totalRetryCount, successCount.get(), failCount.get());
|
|
|
return ReturnT.SUCCESS;
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
@@ -901,43 +913,63 @@ public class VideoVectorJob {
|
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
- // 5.2 分批查询 result_log 的 data 字段并向量化
|
|
|
- for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
|
|
|
- Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
|
|
|
- if (videoDataMap.isEmpty()) {
|
|
|
- log.warn("配置 {} 未查询到任何 result_log data", configCode);
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // 5.2 流式查询 result_log data,Semaphore 控制并发在途数(不阻塞流式读取,避免OOM)
|
|
|
+ ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
+ Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
|
- for (Long videoId : partition) {
|
|
|
- try {
|
|
|
- String data = videoDataMap.get(videoId);
|
|
|
- if (!StringUtils.hasText(data)) {
|
|
|
- log.debug("videoId={} result_log data 为空,跳过", videoId);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
+ try {
|
|
|
+ // 分批查询,防止 IN 子句 ID 过多导致 ODPS SQL 超长
|
|
|
+ for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
+ String sql = String.format(
|
|
|
+ "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
+ "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
+
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
+ Long videoId = Long.valueOf(record.getString(0));
|
|
|
+ String data = record.getString(1);
|
|
|
+ if (videoId == null || !StringUtils.hasText(data)) {
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
- // 从 data JSON 中根据配置提取文本
|
|
|
- List<String> texts = extractTextsFromResultLogData(data, config);
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
- log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // 向量化并存储
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
- if (storeCount > 0) {
|
|
|
- totalSuccessCount.incrementAndGet();
|
|
|
- } else {
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
+ try {
|
|
|
+ inFlightLimiter.acquire();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
}
|
|
|
+ futures.add(embedExecutor.submit(() -> {
|
|
|
+ try {
|
|
|
+ List<String> texts = extractTextsFromResultLogData(data, config);
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
+ if (storeCount > 0) {
|
|
|
+ totalSuccessCount.incrementAndGet();
|
|
|
+ } else {
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ } finally {
|
|
|
+ inFlightLimiter.release();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ });
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ for (Future<?> future : futures) {
|
|
|
+ try {
|
|
|
+ future.get(30, TimeUnit.MINUTES);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
- totalFailCount.incrementAndGet();
|
|
|
+ log.error("embedding 并发任务等待异常: {}", e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ embedExecutor.shutdown();
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
@@ -1019,43 +1051,6 @@ public class VideoVectorJob {
|
|
|
return videoIds;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 批量查询 result_log 的 data 字段
|
|
|
- *
|
|
|
- * @param videoIds 视频ID列表
|
|
|
- * @return videoId -> data JSON 字符串
|
|
|
- */
|
|
|
- private Map<Long, String> batchQueryResultLogData(List<Long> videoIds) {
|
|
|
- if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
-
|
|
|
- String idsStr = videoIds.stream()
|
|
|
- .map(String::valueOf)
|
|
|
- .collect(Collectors.joining(","));
|
|
|
-
|
|
|
- String sql = String.format(
|
|
|
- "SELECT video_id, data " +
|
|
|
- "FROM loghubods.result_log " +
|
|
|
- "WHERE video_id IN (%s) AND dt > 20240001;",
|
|
|
- idsStr);
|
|
|
-
|
|
|
- List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
- if (records == null || records.isEmpty()) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
-
|
|
|
- Map<Long, String> result = new HashMap<>();
|
|
|
- for (Record record : records) {
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
- String data = record.getString(1);
|
|
|
- if (videoId != null && data != null) {
|
|
|
- result.put(videoId, data);
|
|
|
- }
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 从 result_log 的 data 字段中提取文本
|
|
|
* data 结构参考 r.json,直接按 sourcePath 提取单点文本值
|