|
|
@@ -260,7 +260,6 @@ public class MaterialVectorJob {
|
|
|
AtomicInteger totalProcessed = new AtomicInteger(0);
|
|
|
int pageNum = 0;
|
|
|
|
|
|
- outer:
|
|
|
while (true) {
|
|
|
int offset = pageNum * VectorConstants.PAGE_SIZE;
|
|
|
int limit = VectorConstants.PAGE_SIZE;
|
|
|
@@ -278,12 +277,15 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
log.info("第 {} 页查询到 {} 个 materialId", pageNum, materialIds.size());
|
|
|
|
|
|
- // 2. 对每个配置并发处理
|
|
|
+ // 2. 一次性拉取并解析本页所有素材的 result,多个 config 共享解析结果
|
|
|
+ Map<String, ParsedMaterial> parsedById = loadParsedMaterials(materialIds);
|
|
|
+
|
|
|
+ // 3. 对每个配置并发处理
|
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
List<Future<?>> configFutures = new ArrayList<>();
|
|
|
for (DeconstructVectorConfig config : configs) {
|
|
|
configFutures.add(configExecutor.submit(() ->
|
|
|
- processConfigForMaterial(config, materialIds, totalSuccessCount, totalFailCount)
|
|
|
+ processConfigForMaterial(config, materialIds, parsedById, totalSuccessCount, totalFailCount)
|
|
|
));
|
|
|
}
|
|
|
awaitAndShutdown(configFutures, configExecutor, 30, "素材向量化配置并发");
|
|
|
@@ -293,7 +295,7 @@ public class MaterialVectorJob {
|
|
|
if (maxMaterialCount != null && maxMaterialCount > 0
|
|
|
&& totalProcessed.get() >= maxMaterialCount) {
|
|
|
log.info("已达到 maxMaterialCount={} 限制,结束扫描", maxMaterialCount);
|
|
|
- break outer;
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
if (materialIds.size() < limit) {
|
|
|
@@ -312,11 +314,36 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 一次性拉取并解析本页所有素材的解构结果,避免每个 config 重复 JSON 解析。
|
|
|
+ */
|
|
|
+ private Map<String, ParsedMaterial> loadParsedMaterials(List<String> materialIds) {
|
|
|
+ List<MaterialDeconstructResult> results = materialDeconstructResultMapperExt
|
|
|
+ .selectResultsByMaterialIds(SOURCE_AIGC, materialIds);
|
|
|
+ Map<String, ParsedMaterial> map = new HashMap<>(results.size() * 2);
|
|
|
+ for (MaterialDeconstructResult r : results) {
|
|
|
+ if (r == null || !StringUtils.hasText(r.getResult())) continue;
|
|
|
+ JSONObject dataContent;
|
|
|
+ try {
|
|
|
+ dataContent = JSON.parseObject(r.getResult());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("materialId={} result JSON 解析失败: {}", r.getMaterialId(), e.getMessage());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (dataContent == null) continue;
|
|
|
+ Short sourceType = r.getSourceType() != null ? r.getSourceType() : defaultSourceType;
|
|
|
+ map.put(r.getMaterialId(), new ParsedMaterial(dataContent, sourceType));
|
|
|
+ }
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 处理单个配置下的素材向量化
|
|
|
- * 包含:过滤已有向量、批量查询解构结果并提取文本向量化
|
|
|
+ * <p>
|
|
|
+ * 计数粒度:素材级。multiPoint 模式下,单素材只要有至少一个点向量化成功就计入 success。
|
|
|
*/
|
|
|
private void processConfigForMaterial(DeconstructVectorConfig config, List<String> materialIds,
|
|
|
+ Map<String, ParsedMaterial> parsedById,
|
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
String configCode = config.getConfigCode();
|
|
|
try {
|
|
|
@@ -330,24 +357,22 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个素材", configCode, needProcessIds.size());
|
|
|
|
|
|
- List<MaterialDeconstructResult> results = materialDeconstructResultMapperExt
|
|
|
- .selectResultsByMaterialIds(SOURCE_AIGC, needProcessIds);
|
|
|
- for (MaterialDeconstructResult r : results) {
|
|
|
- if (!StringUtils.hasText(r.getResult())) continue;
|
|
|
- String materialId = r.getMaterialId();
|
|
|
- JSONObject dataContent = JSON.parseObject(r.getResult());
|
|
|
- if (dataContent == null) continue;
|
|
|
-
|
|
|
- Short sourceType = r.getSourceType() != null ? r.getSourceType() : defaultSourceType;
|
|
|
+ for (String materialId : needProcessIds) {
|
|
|
+ ParsedMaterial parsed = parsedById.get(materialId);
|
|
|
+ if (parsed == null) {
|
|
|
+ log.info("materialId={} 配置 {} 无解构结果,跳过", materialId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
try {
|
|
|
- List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
+ List<String> texts = extractTextsFromDataContent(parsed.dataContent, config);
|
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
|
log.info("materialId={} 配置 {} 未提取到文本,跳过", materialId, configCode);
|
|
|
totalFailCount.incrementAndGet();
|
|
|
continue;
|
|
|
}
|
|
|
- int storeCount = vectorizeAndStoreMaterial(config, materialId, texts, sourceType);
|
|
|
- if (storeCount > 0) {
|
|
|
+ boolean ok = vectorizeAndStoreMaterial(config, materialId, texts, parsed.sourceType);
|
|
|
+ if (ok) {
|
|
|
totalSuccessCount.incrementAndGet();
|
|
|
} else {
|
|
|
totalFailCount.incrementAndGet();
|
|
|
@@ -362,30 +387,51 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private int vectorizeAndStoreMaterial(DeconstructVectorConfig config, String materialId,
|
|
|
- List<String> texts, Short sourceType) {
|
|
|
+ /**
|
|
|
+ * 单素材向量化 + 存储。
|
|
|
+ *
|
|
|
+ * @return true 表示本素材至少有一条点成功入库;false 表示全部失败 / 无有效文本
|
|
|
+ */
|
|
|
+ private boolean vectorizeAndStoreMaterial(DeconstructVectorConfig config, String materialId,
|
|
|
+ List<String> texts, Short sourceType) {
|
|
|
String configCode = config.getConfigCode();
|
|
|
Integer maxLength = config.getMaxLength();
|
|
|
boolean multiPoint = VectorUtils.isMultiPointConfig(config);
|
|
|
|
|
|
if (multiPoint) {
|
|
|
- int successCount = 0;
|
|
|
- for (int i = 0; i < texts.size(); i++) {
|
|
|
- String text = texts.get(i);
|
|
|
- if (!StringUtils.hasText(text)) continue;
|
|
|
+ // 1) 先压缩掉空文本,pointIndex 用紧凑下标
|
|
|
+ // 2) 全部 embed 成功后再统一 save,避免出现"部分点写入、existsByIds 误判已完成"的中间态
|
|
|
+ // (existsByIds 仅按 materialId 判存,留下"洞"后下一轮会跳过整个素材)
|
|
|
+ List<String> validTexts = new ArrayList<>(texts.size());
|
|
|
+ for (String raw : texts) {
|
|
|
+ if (StringUtils.hasText(raw)) validTexts.add(raw);
|
|
|
+ }
|
|
|
+ if (validTexts.isEmpty()) {
|
|
|
+ log.info("materialId={} 配置 {} 无有效文本", materialId, configCode);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ List<List<Float>> vectors = new ArrayList<>(validTexts.size());
|
|
|
+ List<String> truncated = new ArrayList<>(validTexts.size());
|
|
|
+ for (int i = 0; i < validTexts.size(); i++) {
|
|
|
+ String text = validTexts.get(i);
|
|
|
if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
text = text.substring(0, maxLength);
|
|
|
}
|
|
|
List<Float> vector = getOrEmbed(text, config);
|
|
|
if (vector == null || vector.isEmpty()) {
|
|
|
- log.error("materialId={} 配置 {} 第{}个文本向量化失败", materialId, configCode, i);
|
|
|
- continue;
|
|
|
+ // 整素材本轮放弃,留待下次重跑(不会留下洞)
|
|
|
+ log.error("materialId={} 配置 {} 第{}个文本向量化失败,本素材本轮放弃",
|
|
|
+ materialId, configCode, i);
|
|
|
+ return false;
|
|
|
}
|
|
|
- materialVectorStoreService.save(configCode, materialId, i, vector, text, sourceType);
|
|
|
- log.info("materialId={} 配置 {} 第{}个点向量化存储成功", materialId, configCode, i);
|
|
|
- successCount++;
|
|
|
+ vectors.add(vector);
|
|
|
+ truncated.add(text);
|
|
|
}
|
|
|
- return successCount;
|
|
|
+ for (int i = 0; i < vectors.size(); i++) {
|
|
|
+ materialVectorStoreService.save(configCode, materialId, i, vectors.get(i), truncated.get(i), sourceType);
|
|
|
+ }
|
|
|
+ log.info("materialId={} 配置 {} 多点向量化存储成功,共 {} 个点", materialId, configCode, vectors.size());
|
|
|
+ return true;
|
|
|
} else {
|
|
|
String text = null;
|
|
|
for (String t : texts) {
|
|
|
@@ -396,7 +442,7 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
if (text == null) {
|
|
|
log.info("materialId={} 配置 {} 无有效文本,跳过", materialId, configCode);
|
|
|
- return 0;
|
|
|
+ return false;
|
|
|
}
|
|
|
if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
text = text.substring(0, maxLength);
|
|
|
@@ -404,11 +450,11 @@ public class MaterialVectorJob {
|
|
|
List<Float> vector = getOrEmbed(text, config);
|
|
|
if (vector == null || vector.isEmpty()) {
|
|
|
log.error("materialId={} 配置 {} 文本向量化失败", materialId, configCode);
|
|
|
- return 0;
|
|
|
+ return false;
|
|
|
}
|
|
|
materialVectorStoreService.save(configCode, materialId, vector, text, sourceType);
|
|
|
log.info("materialId={} 配置 {} 向量化存储成功", materialId, configCode);
|
|
|
- return 1;
|
|
|
+ return true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -725,4 +771,15 @@ public class MaterialVectorJob {
|
|
|
}
|
|
|
return bizUniqueId.trim();
|
|
|
}
|
|
|
+
|
|
|
+ /** 一页素材解析后的 dataContent + sourceType,多 config 共享。 */
|
|
|
+ private static final class ParsedMaterial {
|
|
|
+ final JSONObject dataContent;
|
|
|
+ final Short sourceType;
|
|
|
+
|
|
|
+ ParsedMaterial(JSONObject dataContent, Short sourceType) {
|
|
|
+ this.dataContent = dataContent;
|
|
|
+ this.sourceType = sourceType;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|