|
|
@@ -1,7 +1,7 @@
|
|
|
package com.tzld.videoVector.job;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
+
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.collect.Lists;
|
|
|
@@ -14,6 +14,7 @@ import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
import com.tzld.videoVector.model.po.pgVector.MaterialDeconstructResult;
|
|
|
import com.tzld.videoVector.service.EmbeddingService;
|
|
|
import com.tzld.videoVector.service.MaterialVectorStoreService;
|
|
|
+import com.tzld.videoVector.util.DeconstructTextExtractor;
|
|
|
import com.tzld.videoVector.util.Md5Util;
|
|
|
import com.tzld.videoVector.util.VectorUtils;
|
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
|
@@ -35,8 +36,7 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@@ -198,38 +198,33 @@ public class MaterialVectorJob {
|
|
|
|
|
|
// 并发调 detail 接口
|
|
|
ExecutorService executor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
- try {
|
|
|
- List<Future<?>> futures = new ArrayList<>();
|
|
|
- List<MaterialDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
|
|
|
-
|
|
|
- for (String materialId : needSyncIds) {
|
|
|
- futures.add(executor.submit(() -> {
|
|
|
- try {
|
|
|
- Long taskInstanceId = materialIdToTaskInstanceId.get(materialId);
|
|
|
- if (taskInstanceId == null) return;
|
|
|
- JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
- if (dataContent != null) {
|
|
|
- MaterialDeconstructResult r = new MaterialDeconstructResult();
|
|
|
- r.setMaterialId(materialId);
|
|
|
- r.setSource(SOURCE_AIGC);
|
|
|
- r.setResult(dataContent.toJSONString());
|
|
|
- r.setSourceType(materialIdToSourceType.getOrDefault(materialId, defaultSourceType));
|
|
|
- batch.add(r);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("同步 materialId={} 失败: {}", materialId, e.getMessage());
|
|
|
+ List<MaterialDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
+ for (String materialId : needSyncIds) {
|
|
|
+ executor.submit(() -> {
|
|
|
+ try {
|
|
|
+ Long taskInstanceId = materialIdToTaskInstanceId.get(materialId);
|
|
|
+ if (taskInstanceId == null) return;
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
+ if (dataContent != null) {
|
|
|
+ MaterialDeconstructResult r = new MaterialDeconstructResult();
|
|
|
+ r.setMaterialId(materialId);
|
|
|
+ r.setSource(SOURCE_AIGC);
|
|
|
+ r.setResult(dataContent.toJSONString());
|
|
|
+ r.setSourceType(materialIdToSourceType.getOrDefault(materialId, defaultSourceType));
|
|
|
+ batch.add(r);
|
|
|
}
|
|
|
- }));
|
|
|
- }
|
|
|
- awaitAndShutdown(futures, executor, 30, "素材同步");
|
|
|
-
|
|
|
- if (!batch.isEmpty()) {
|
|
|
- for (List<MaterialDeconstructResult> subBatch : Lists.partition(batch, 200)) {
|
|
|
- insertCount.addAndGet(materialDeconstructResultMapperExt.batchInsertIgnore(subBatch));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步 materialId={} 失败: {}", materialId, e.getMessage());
|
|
|
}
|
|
|
+ });
|
|
|
+ }
|
|
|
+ VectorUtils.awaitAndShutdown(executor, 30, "素材同步");
|
|
|
+
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ for (List<MaterialDeconstructResult> subBatch : Lists.partition(batch, 200)) {
|
|
|
+ insertCount.addAndGet(materialDeconstructResultMapperExt.batchInsertIgnore(subBatch));
|
|
|
}
|
|
|
- } finally {
|
|
|
- executor.shutdownNow();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -246,7 +241,7 @@ public class MaterialVectorJob {
|
|
|
@XxlJob("vectorMaterialJob")
|
|
|
public ReturnT<String> vectorMaterialJob(String param) {
|
|
|
log.info("开始执行素材向量化任务, param: {}", param);
|
|
|
- Integer maxMaterialCount = parseMaxCount(param);
|
|
|
+ Integer maxMaterialCount = VectorUtils.parseMaxCount(param);
|
|
|
return doVectorize(maxMaterialCount);
|
|
|
}
|
|
|
|
|
|
@@ -293,17 +288,12 @@ public class MaterialVectorJob {
|
|
|
|
|
|
// 3. 对每个配置并发处理
|
|
|
ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
- try {
|
|
|
- List<Future<?>> configFutures = new ArrayList<>();
|
|
|
- for (DeconstructVectorConfig config : configs) {
|
|
|
- configFutures.add(configExecutor.submit(() ->
|
|
|
- processConfigForMaterial(config, materialIds, parsedById, totalSuccessCount, totalFailCount)
|
|
|
- ));
|
|
|
- }
|
|
|
- awaitAndShutdown(configFutures, configExecutor, 30, "素材向量化配置并发");
|
|
|
- } finally {
|
|
|
- configExecutor.shutdownNow();
|
|
|
- }
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
+ configExecutor.submit(() ->
|
|
|
+ processConfigForMaterial(config, materialIds, parsedById, totalSuccessCount, totalFailCount)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ VectorUtils.awaitAndShutdown(configExecutor, 30, "素材向量化配置并发");
|
|
|
|
|
|
totalProcessed.addAndGet(materialIds.size());
|
|
|
|
|
|
@@ -380,7 +370,7 @@ public class MaterialVectorJob {
|
|
|
continue;
|
|
|
}
|
|
|
try {
|
|
|
- List<String> texts = extractTextsFromDataContent(parsed.dataContent, config);
|
|
|
+ List<String> texts = DeconstructTextExtractor.extractTextsFromDataContent(parsed.dataContent, config);
|
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
|
log.info("materialId={} 配置 {} 未提取到文本,跳过", materialId, configCode);
|
|
|
totalFailCount.incrementAndGet();
|
|
|
@@ -414,9 +404,9 @@ public class MaterialVectorJob {
|
|
|
boolean multiPoint = VectorUtils.isMultiPointConfig(config);
|
|
|
|
|
|
if (multiPoint) {
|
|
|
- // 1) 先压缩掉空文本,pointIndex 用紧凑下标
|
|
|
- // 2) 全部 embed 成功后再统一 save,避免出现"部分点写入、existsByIds 误判已完成"的中间态
|
|
|
- // (existsByIds 仅按 materialId 判存,留下"洞"后下一轮会跳过整个素材)
|
|
|
+ // 预清理旧向量,防止上一轮 partial write 留下残缺数据导致 existsByIds 误判已完成
|
|
|
+ materialVectorStoreService.deleteAbovePointIndex(configCode, materialId, 0);
|
|
|
+
|
|
|
List<String> validTexts = new ArrayList<>(texts.size());
|
|
|
for (String raw : texts) {
|
|
|
if (StringUtils.hasText(raw)) validTexts.add(raw);
|
|
|
@@ -535,226 +525,6 @@ public class MaterialVectorJob {
|
|
|
return doVectorize(maxMaterialCount);
|
|
|
}
|
|
|
|
|
|
- // ====================================================================
|
|
|
- // TODO: 与 VideoVectorJob 的提取逻辑统一抽取到 VectorUtils / ExtractionUtils,避免两边各自维护
|
|
|
- // ====================================================================
|
|
|
-
|
|
|
- /**
|
|
|
- * 从 dataContent 中提取文本(与 VideoVectorJob 完全对称)
|
|
|
- */
|
|
|
- private List<String> extractTextsFromDataContent(JSONObject dataContent, DeconstructVectorConfig config) {
|
|
|
- if (dataContent == null) {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
- String extractRule = config.getExtractRule();
|
|
|
- if (StringUtils.hasText(extractRule)) {
|
|
|
- try {
|
|
|
- JSONObject rule = JSON.parseObject(extractRule);
|
|
|
- if ("point_decomposition".equals(rule.getString("type"))) {
|
|
|
- return extractTextsFromPointDecomposition(dataContent, rule);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- // 不是 JSON 或无 type 字段,走原有逻辑
|
|
|
- }
|
|
|
- return extractTextsWithConfidence(dataContent, config.getSourcePath(), extractRule);
|
|
|
- } else {
|
|
|
- return VectorUtils.extractFromJson(dataContent, config.getSourcePath());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<String> extractTextsWithConfidence(JSONObject json, String sourcePath, String extractRule) {
|
|
|
- List<String> texts = new ArrayList<>();
|
|
|
- try {
|
|
|
- JSONObject rule = JSON.parseObject(extractRule);
|
|
|
- String textField = rule.getString("text_field");
|
|
|
- String confidenceField = rule.getString("confidence_field");
|
|
|
- double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
- if (!StringUtils.hasText(textField) || !StringUtils.hasText(confidenceField)) {
|
|
|
- log.error("extract_rule 缺少必要字段: text_field={}, confidence_field={}", textField, confidenceField);
|
|
|
- return texts;
|
|
|
- }
|
|
|
- if (sourcePath.endsWith("[*]")) {
|
|
|
- List<JSONObject> items = VectorUtils.extractArrayItemsFromJson(json, sourcePath);
|
|
|
- for (JSONObject item : items) {
|
|
|
- if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
|
|
|
- String text = item.getString(textField);
|
|
|
- if (StringUtils.hasText(text)) {
|
|
|
- texts.add(text);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- List<String> pathValues = VectorUtils.extractFromJson(json, sourcePath);
|
|
|
- if (!pathValues.isEmpty()) {
|
|
|
- JSONObject targetObj = navigateToObject(json, sourcePath);
|
|
|
- if (targetObj != null && isConfidenceQualified(targetObj, confidenceField, confidenceThreshold)) {
|
|
|
- String text = targetObj.getString(textField);
|
|
|
- if (StringUtils.hasText(text)) {
|
|
|
- texts.add(text);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("置信度过滤提取失败: path={}, error={}", sourcePath, e.getMessage());
|
|
|
- }
|
|
|
- return texts;
|
|
|
- }
|
|
|
-
|
|
|
- private List<String> extractTextsFromPointDecomposition(JSONObject dataContent, JSONObject rule) {
|
|
|
- List<String> texts = new ArrayList<>();
|
|
|
- try {
|
|
|
- String pointArrayPath = rule.getString("point_array_path");
|
|
|
- String finalResultPath = rule.getString("final_result_path");
|
|
|
- String pointNameField = rule.getString("point_name_field");
|
|
|
- String confidenceField = rule.getString("confidence_field");
|
|
|
- double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
- String target = rule.getString("target");
|
|
|
- String contributionPath = rule.getString("contribution_path");
|
|
|
- double contributionThreshold = rule.getDoubleValue("contribution_threshold");
|
|
|
-
|
|
|
- List<JSONObject> finalPoints = VectorUtils.extractArrayItemsFromJson(dataContent, finalResultPath + "[*]");
|
|
|
- List<String> qualifiedPointNames = new ArrayList<>();
|
|
|
- for (JSONObject fp : finalPoints) {
|
|
|
- if (isConfidenceQualified(fp, confidenceField, confidenceThreshold)) {
|
|
|
- String pointName = fp.getString(pointNameField);
|
|
|
- if (StringUtils.hasText(pointName)) {
|
|
|
- qualifiedPointNames.add(pointName);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (qualifiedPointNames.isEmpty()) return texts;
|
|
|
-
|
|
|
- List<JSONObject> pointDetails = VectorUtils.extractArrayItemsFromJson(dataContent, pointArrayPath + "[*]");
|
|
|
- Map<String, Double> contributionMap = buildContributionMap(dataContent, contributionPath);
|
|
|
-
|
|
|
- for (String pointName : qualifiedPointNames) {
|
|
|
- try {
|
|
|
- JSONObject matchedPoint = null;
|
|
|
- for (JSONObject detail : pointDetails) {
|
|
|
- if (pointName.equals(detail.getString("点"))) {
|
|
|
- matchedPoint = detail;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (matchedPoint == null) continue;
|
|
|
-
|
|
|
- List<String> itemNames = "substance".equals(target)
|
|
|
- ? extractSubstanceNames(matchedPoint)
|
|
|
- : extractFormNames(matchedPoint);
|
|
|
- for (String name : itemNames) {
|
|
|
- Double contribution = contributionMap.get(name);
|
|
|
- if (contribution != null && contribution >= contributionThreshold) {
|
|
|
- texts.add(name);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.debug("extractTextsFromPointDecomposition 单点处理异常 pointName={}: {}", pointName, e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("extractTextsFromPointDecomposition 失败: {}", e.getMessage(), e);
|
|
|
- }
|
|
|
- return texts;
|
|
|
- }
|
|
|
-
|
|
|
- private List<String> extractSubstanceNames(JSONObject point) {
|
|
|
- List<String> names = new ArrayList<>();
|
|
|
- JSONObject substance = point.getJSONObject("实质");
|
|
|
- if (substance == null) return names;
|
|
|
- for (String key : new String[]{"具体元素", "具象概念", "抽象概念"}) {
|
|
|
- try {
|
|
|
- collectNamesFromArray(substance.getJSONArray(key), names);
|
|
|
- } catch (Exception e) {
|
|
|
- log.debug("extractSubstanceNames key={} 异常: {}", key, e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- return names;
|
|
|
- }
|
|
|
-
|
|
|
- private List<String> extractFormNames(JSONObject point) {
|
|
|
- List<String> names = new ArrayList<>();
|
|
|
- JSONObject form = point.getJSONObject("形式");
|
|
|
- if (form == null) return names;
|
|
|
- for (String key : new String[]{"具体元素形式", "具象概念形式", "整体形式"}) {
|
|
|
- try {
|
|
|
- collectNamesFromArray(form.getJSONArray(key), names);
|
|
|
- } catch (Exception e) {
|
|
|
- log.debug("extractFormNames key={} 异常: {}", key, e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- return names;
|
|
|
- }
|
|
|
-
|
|
|
- private void collectNamesFromArray(JSONArray array, List<String> names) {
|
|
|
- if (array == null || array.isEmpty()) return;
|
|
|
- for (int i = 0; i < array.size(); i++) {
|
|
|
- try {
|
|
|
- JSONObject item = array.getJSONObject(i);
|
|
|
- if (item != null) {
|
|
|
- String name = item.getString("名称");
|
|
|
- if (StringUtils.hasText(name)) {
|
|
|
- names.add(name);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.debug("collectNamesFromArray 单元素解析异常: {}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private Map<String, Double> buildContributionMap(JSONObject dataContent, String contributionPath) {
|
|
|
- Map<String, Double> map = new HashMap<>();
|
|
|
- try {
|
|
|
- List<JSONObject> contributions = VectorUtils.extractArrayItemsFromJson(dataContent, contributionPath + "[*]");
|
|
|
- for (JSONObject c : contributions) {
|
|
|
- try {
|
|
|
- String word = c.getString("词");
|
|
|
- Double contribution = c.getDouble("贡献度");
|
|
|
- if (StringUtils.hasText(word) && contribution != null) {
|
|
|
- map.put(word, contribution);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.debug("buildContributionMap 单元素解析异常: {}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("构建贡献度查找表失败: {}", e.getMessage());
|
|
|
- }
|
|
|
- return map;
|
|
|
- }
|
|
|
-
|
|
|
- private JSONObject navigateToObject(JSONObject json, String path) {
|
|
|
- if (json == null || !StringUtils.hasText(path) || !path.startsWith("$.")) return null;
|
|
|
- try {
|
|
|
- String pathContent = path.substring(2);
|
|
|
- String[] parts = pathContent.split("\\.");
|
|
|
- Object current = json;
|
|
|
- for (String part : parts) {
|
|
|
- if (current instanceof JSONObject) {
|
|
|
- current = ((JSONObject) current).get(part);
|
|
|
- } else {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
- return current instanceof JSONObject ? (JSONObject) current : null;
|
|
|
- } catch (Exception e) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean isConfidenceQualified(JSONObject item, String confidenceField, double threshold) {
|
|
|
- Object value = item.get(confidenceField);
|
|
|
- if (value == null) return false;
|
|
|
- if (value instanceof String) return "high".equalsIgnoreCase((String) value);
|
|
|
- if (value instanceof Number) return ((Number) value).doubleValue() >= threshold;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // ====================================================================
|
|
|
- // 通用辅助
|
|
|
- // ====================================================================
|
|
|
-
|
|
|
private List<DeconstructVectorConfig> getEnabledConfigsBySourceField(String sourceField) {
|
|
|
DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
example.createCriteria()
|
|
|
@@ -764,43 +534,6 @@ public class MaterialVectorJob {
|
|
|
return vectorConfigMapper.selectByExample(example);
|
|
|
}
|
|
|
|
|
|
- private void awaitAndShutdown(List<Future<?>> futures, ExecutorService executor,
|
|
|
- long timeoutMinutes, String taskDesc) {
|
|
|
- long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(timeoutMinutes);
|
|
|
- int completed = 0;
|
|
|
- for (Future<?> future : futures) {
|
|
|
- long remaining = deadline - System.currentTimeMillis();
|
|
|
- if (remaining <= 0) {
|
|
|
- log.error("{} 整体超时({}分钟),已取消剩余任务 (已完成 {}/{})",
|
|
|
- taskDesc, timeoutMinutes, completed, futures.size());
|
|
|
- for (Future<?> f : futures) {
|
|
|
- f.cancel(true);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- try {
|
|
|
- future.get(remaining, TimeUnit.MILLISECONDS);
|
|
|
- completed++;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("{} 并发任务等待异常: {}", taskDesc, e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- executor.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 入参 N 解析为 maxMaterialCount
|
|
|
- */
|
|
|
- private Integer parseMaxCount(String param) {
|
|
|
- if (!StringUtils.hasText(param)) return null;
|
|
|
- try {
|
|
|
- int v = Integer.parseInt(param.trim());
|
|
|
- return v > 0 ? v : null;
|
|
|
- } catch (NumberFormatException e) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 归一化 AIGC bizUniqueId 为 materialId 字符串。
|
|
|
* 外部合作素材为文件 MD5(32 位 hex),内部素材通常为数字字符串。
|