|
@@ -18,6 +18,7 @@ import com.tzld.videoVector.service.DeconstructService;
|
|
|
import com.tzld.videoVector.service.EmbeddingService;
|
|
import com.tzld.videoVector.service.EmbeddingService;
|
|
|
import com.tzld.videoVector.service.VectorStoreService;
|
|
import com.tzld.videoVector.service.VectorStoreService;
|
|
|
import com.tzld.videoVector.util.OdpsUtil;
|
|
import com.tzld.videoVector.util.OdpsUtil;
|
|
|
|
|
+import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -55,27 +56,6 @@ public class VideoVectorJob {
|
|
|
@Resource
|
|
@Resource
|
|
|
private AigcApiService aigcApiService;
|
|
private AigcApiService aigcApiService;
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 每页查询数量
|
|
|
|
|
- */
|
|
|
|
|
- private static final int PAGE_SIZE = 1000;
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * 审核状态检查批次大小
|
|
|
|
|
- */
|
|
|
|
|
- private static final int AUDIT_CHECK_BATCH_SIZE = 20;
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * 超时时间:1小时(毫秒)
|
|
|
|
|
- */
|
|
|
|
|
- private static final long TIMEOUT_MS = 60 * 60 * 1000L;
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * 内容类型:长文
|
|
|
|
|
- * @deprecated 已不再按内容类型过滤,加载所有启用的配置
|
|
|
|
|
- */
|
|
|
|
|
- @Deprecated
|
|
|
|
|
- private static final byte CONTENT_TYPE_LONG_ARTICLE = 1;
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 视频向量化
|
|
* 视频向量化
|
|
@@ -99,7 +79,7 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
// 2. 分页处理前,每次 Job 执行只做一次审核清理
|
|
// 2. 分页处理前,每次 Job 执行只做一次审核清理
|
|
|
for (DeconstructVectorConfig config : configs) {
|
|
for (DeconstructVectorConfig config : configs) {
|
|
|
- checkAndRemoveNotAuditPassedVideos(config.getConfigCode());
|
|
|
|
|
|
|
+ checkAndRemoveNotAuditPassedVideos(config.getConfigCode(), isMultiPointConfig(config));
|
|
|
}
|
|
}
|
|
|
log.info("审核清理完成,开始分页向量化处理");
|
|
log.info("审核清理完成,开始分页向量化处理");
|
|
|
|
|
|
|
@@ -109,7 +89,7 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
|
// 2. 分页查询 videoId 列表
|
|
// 2. 分页查询 videoId 列表
|
|
|
- List<Long> videoIds = queryVideoIdsByPage(pageNum, PAGE_SIZE);
|
|
|
|
|
|
|
+ List<Long> videoIds = queryVideoIdsByPage(pageNum, VectorConstants.PAGE_SIZE);
|
|
|
if (CollectionUtils.isEmpty(videoIds)) {
|
|
if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
|
break;
|
|
break;
|
|
@@ -122,10 +102,23 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
// 3.0 审核清理已移至分页外,此处仅进行向量存在性检查
|
|
// 3.0 审核清理已移至分页外,此处仅进行向量存在性检查
|
|
|
// 3.1 查询哪些 videoId 在该配置下已有向量
|
|
// 3.1 查询哪些 videoId 在该配置下已有向量
|
|
|
- Set<Long> existingIds = vectorStoreService.existsByIds(configCode, videoIds);
|
|
|
|
|
|
|
+ boolean multiPoint = isMultiPointConfig(config);
|
|
|
|
|
+ Set<Long> existingVideoIds;
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ // 多点模式:将 videoId 转为复合基准ID(videoId*100)检查存在性
|
|
|
|
|
+ List<Long> baseIds = videoIds.stream()
|
|
|
|
|
+ .map(id -> encodeMultiPointId(id, 0))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ Set<Long> existingBaseIds = vectorStoreService.existsByIds(configCode, baseIds);
|
|
|
|
|
+ existingVideoIds = existingBaseIds.stream()
|
|
|
|
|
+ .map(VideoVectorJob::decodeVideoId)
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ existingVideoIds = vectorStoreService.existsByIds(configCode, videoIds);
|
|
|
|
|
+ }
|
|
|
// 3.2 过滤出需要处理的 videoId(排除已有向量的)
|
|
// 3.2 过滤出需要处理的 videoId(排除已有向量的)
|
|
|
List<Long> needProcessIds = videoIds.stream()
|
|
List<Long> needProcessIds = videoIds.stream()
|
|
|
- .filter(id -> !existingIds.contains(id))
|
|
|
|
|
|
|
+ .filter(id -> !existingVideoIds.contains(id))
|
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
if (needProcessIds.isEmpty()) {
|
|
if (needProcessIds.isEmpty()) {
|
|
@@ -160,7 +153,7 @@ public class VideoVectorJob {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 根据配置提取文本
|
|
|
|
|
|
|
+ // 根据配置提取文本(支持置信度过滤)
|
|
|
List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
|
log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
@@ -168,9 +161,9 @@ public class VideoVectorJob {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 向量化并存储
|
|
|
|
|
- boolean success = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (success) {
|
|
|
|
|
|
|
+ // 向量化并存储(多点模式返回成功数>0即为成功)
|
|
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (storeCount > 0) {
|
|
|
totalSuccessCount++;
|
|
totalSuccessCount++;
|
|
|
} else {
|
|
} else {
|
|
|
totalFailCount++;
|
|
totalFailCount++;
|
|
@@ -184,8 +177,8 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
|
- if (videoIds.size() < PAGE_SIZE) {
|
|
|
|
|
- log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), PAGE_SIZE);
|
|
|
|
|
|
|
+ if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
|
|
+ log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
pageNum++;
|
|
pageNum++;
|
|
@@ -245,31 +238,185 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 根据配置从 raw_result 中提取文本
|
|
* 根据配置从 raw_result 中提取文本
|
|
|
|
|
+ * 当配置了 extract_rule 时,启用置信度过滤逻辑
|
|
|
*/
|
|
*/
|
|
|
private List<String> extractTextsFromRawResult(String rawResult, DeconstructVectorConfig config) {
|
|
private List<String> extractTextsFromRawResult(String rawResult, DeconstructVectorConfig config) {
|
|
|
List<String> texts = new ArrayList<>();
|
|
List<String> texts = new ArrayList<>();
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
try {
|
|
try {
|
|
|
JSONObject json = JSON.parseObject(rawResult);
|
|
JSONObject json = JSON.parseObject(rawResult);
|
|
|
if (json == null) {
|
|
if (json == null) {
|
|
|
return texts;
|
|
return texts;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
String sourcePath = config.getSourcePath();
|
|
String sourcePath = config.getSourcePath();
|
|
|
if (!StringUtils.hasText(sourcePath)) {
|
|
if (!StringUtils.hasText(sourcePath)) {
|
|
|
return texts;
|
|
return texts;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // 解析路径并提取
|
|
|
|
|
- texts.addAll(extractFromJson(json, sourcePath));
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ String extractRule = config.getExtractRule();
|
|
|
|
|
+ if (StringUtils.hasText(extractRule)) {
|
|
|
|
|
+ // 多点模式:从数组中提取对象,按置信度过滤后取文本字段
|
|
|
|
|
+ texts.addAll(extractTextsWithConfidence(json, sourcePath, extractRule));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 单点模式:直接提取文本值(向后兼容)
|
|
|
|
|
+ texts.addAll(extractFromJson(json, sourcePath));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("解析 raw_result 失败: {}", e.getMessage());
|
|
log.error("解析 raw_result 失败: {}", e.getMessage());
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 带置信度过滤的文本提取
|
|
|
|
|
+ * 从数组路径中提取满足置信度条件的文本
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param json 原始JSON
|
|
|
|
|
+ * @param sourcePath 数组路径(如 $.final_normalization_rebuild.keypoint_final.最终关键点列表[*])
|
|
|
|
|
+ * @param extractRule 提取规则JSON(如 {"text_field":"关键点","confidence_field":"置信度","confidence_threshold":0.8})
|
|
|
|
|
+ * @return 满足置信度条件的文本列表
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> extractTextsWithConfidence(JSONObject json, String sourcePath, String extractRule) {
|
|
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject rule = JSON.parseObject(extractRule);
|
|
|
|
|
+ String textField = rule.getString("text_field");
|
|
|
|
|
+ String confidenceField = rule.getString("confidence_field");
|
|
|
|
|
+ double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
|
|
+
|
|
|
|
|
+ if (!StringUtils.hasText(textField) || !StringUtils.hasText(confidenceField)) {
|
|
|
|
|
+ log.warn("extract_rule 缺少必要字段: text_field={}, confidence_field={}", textField, confidenceField);
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 提取数组项(sourcePath 以 [*] 结尾,提取整个对象列表)
|
|
|
|
|
+ List<JSONObject> items = extractArrayItemsFromJson(json, sourcePath);
|
|
|
|
|
+
|
|
|
|
|
+ for (JSONObject item : items) {
|
|
|
|
|
+ if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
|
|
|
|
|
+ String text = item.getString(textField);
|
|
|
|
|
+ if (StringUtils.hasText(text)) {
|
|
|
|
|
+ texts.add(text);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.debug("置信度过滤:路径={}, 总数={}, 满足条件={}", sourcePath, items.size(), texts.size());
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("置信度过滤提取失败: path={}, error={}", sourcePath, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
return texts;
|
|
return texts;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从JSON中提取数组项对象列表
|
|
|
|
|
+ * 支持路径以 [*] 结尾,如 $.a.b.c[*]
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<JSONObject> extractArrayItemsFromJson(JSONObject json, String sourcePath) {
|
|
|
|
|
+ List<JSONObject> items = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ if (json == null || !StringUtils.hasText(sourcePath)) {
|
|
|
|
|
+ return items;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!sourcePath.startsWith("$.")) {
|
|
|
|
|
+ return items;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String pathContent = sourcePath.substring(2);
|
|
|
|
|
+ // 路径应以 [*] 结尾
|
|
|
|
|
+ if (!pathContent.endsWith("[*]")) {
|
|
|
|
|
+ log.warn("多点模式下 source_path 应以 [*] 结尾: {}", sourcePath);
|
|
|
|
|
+ return items;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 去掉末尾的 [*],找到数组所在的父路径和数组字段名
|
|
|
|
|
+ String pathWithoutWildcard = pathContent.substring(0, pathContent.length() - 3);
|
|
|
|
|
+ List<String> parts = parseJsonPath(pathWithoutWildcard);
|
|
|
|
|
+
|
|
|
|
|
+ // 逐层访问到数组字段的父对象
|
|
|
|
|
+ Object current = json;
|
|
|
|
|
+ for (int i = 0; i < parts.size() - 1; i++) {
|
|
|
|
|
+ if (current instanceof JSONObject) {
|
|
|
|
|
+ current = ((JSONObject) current).get(parts.get(i));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return items;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取数组
|
|
|
|
|
+ if (current instanceof JSONObject && !parts.isEmpty()) {
|
|
|
|
|
+ String arrayKey = parts.get(parts.size() - 1);
|
|
|
|
|
+ JSONArray array = ((JSONObject) current).getJSONArray(arrayKey);
|
|
|
|
|
+ if (array != null) {
|
|
|
|
|
+ for (int i = 0; i < array.size(); i++) {
|
|
|
|
|
+ JSONObject item = array.getJSONObject(i);
|
|
|
|
|
+ if (item != null) {
|
|
|
|
|
+ items.add(item);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("提取数组项失败: path={}, error={}", sourcePath, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return items;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 判断置信度是否满足条件
|
|
|
|
|
+ * 规则:置信度 == "high"(字符串)或 置信度 > threshold(数值)
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean isConfidenceQualified(JSONObject item, String confidenceField, double threshold) {
|
|
|
|
|
+ Object value = item.get(confidenceField);
|
|
|
|
|
+ if (value == null) {
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (value instanceof String) {
|
|
|
|
|
+ return "high".equalsIgnoreCase((String) value);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (value instanceof Number) {
|
|
|
|
|
+ return ((Number) value).doubleValue() > threshold;
|
|
|
|
|
+ }
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 判断配置是否为多点模式(有 extract_rule 配置)
|
|
|
|
|
+ */
|
|
|
|
|
+ private static boolean isMultiPointConfig(DeconstructVectorConfig config) {
|
|
|
|
|
+ return StringUtils.hasText(config.getExtractRule());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 编码多点复合ID
|
|
|
|
|
+ * @param videoId 原始视频ID
|
|
|
|
|
+ * @param index 点索引(0~99)
|
|
|
|
|
+ * @return 复合ID = videoId * 100 + index
|
|
|
|
|
+ */
|
|
|
|
|
+ private static Long encodeMultiPointId(Long videoId, int index) {
|
|
|
|
|
+ return videoId * VectorConstants.MULTI_POINT_FACTOR + index;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从复合ID解码出原始视频ID
|
|
|
|
|
+ * @param compositeId 复合ID
|
|
|
|
|
+ * @return 原始视频ID = compositeId / 100
|
|
|
|
|
+ */
|
|
|
|
|
+ private static Long decodeVideoId(Long compositeId) {
|
|
|
|
|
+ return compositeId / VectorConstants.MULTI_POINT_FACTOR;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 从JSON中提取文本
|
|
* 从JSON中提取文本
|
|
|
* 支持路径格式:$.final_normalization_rebuild.topic_fusion_result.最终选题.选题
|
|
* 支持路径格式:$.final_normalization_rebuild.topic_fusion_result.最终选题.选题
|
|
@@ -370,43 +517,69 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 向量化并存储
|
|
|
|
|
- * 取第一段有效文本进行向量化,以 configCode 为命名空间存储
|
|
|
|
|
- * 避免多分段时拼接 configCode:i 导致存储键与查询键不一致
|
|
|
|
|
|
|
+ * 向量化并存储(兼容单点和多点模式)
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 多点模式(extract_rule 非空):对 texts 中每个有效文本分别向量化,
|
|
|
|
|
+ * 以复合ID(videoId * 100 + index)存入同一 configCode 命名空间,
|
|
|
|
|
+ * 后续搜索时每个点都可被独立匹配。
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 单点模式(extract_rule 为空):仅取第一段有效文本向量化,
|
|
|
|
|
+ * 直接以 videoId 存储(向后兼容)。
|
|
|
|
|
+ *
|
|
|
|
|
+ * @return 成功向量化的数量(单点模式返回0或1)
|
|
|
*/
|
|
*/
|
|
|
- private boolean vectorizeAndStore(DeconstructVectorConfig config, Long videoId, List<String> texts) {
|
|
|
|
|
|
|
+ private int vectorizeAndStore(DeconstructVectorConfig config, Long videoId, List<String> texts) {
|
|
|
String configCode = config.getConfigCode();
|
|
String configCode = config.getConfigCode();
|
|
|
Integer maxLength = config.getMaxLength();
|
|
Integer maxLength = config.getMaxLength();
|
|
|
-
|
|
|
|
|
- // 取第一段有效文本(VIDEO_TOPIC 等配置通常只有一段)
|
|
|
|
|
- String text = null;
|
|
|
|
|
- for (String t : texts) {
|
|
|
|
|
- if (StringUtils.hasText(t)) {
|
|
|
|
|
- text = t;
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ boolean multiPoint = isMultiPointConfig(config);
|
|
|
|
|
+
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ // ---- 多点模式:每个文本独立向量化存储 ----
|
|
|
|
|
+ int successCount = 0;
|
|
|
|
|
+ for (int i = 0; i < texts.size(); i++) {
|
|
|
|
|
+ String text = texts.get(i);
|
|
|
|
|
+ if (!StringUtils.hasText(text)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
|
|
+ }
|
|
|
|
|
+ List<Float> vector = embeddingService.embed(text, config);
|
|
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
|
|
+ log.warn("videoId={} 配置 {} 第{}个文本向量化失败", videoId, configCode, i);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ Long compositeId = encodeMultiPointId(videoId, i);
|
|
|
|
|
+ vectorStoreService.save(configCode, compositeId, vector);
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 第{}个点向量化存储成功,compositeId={}", videoId, configCode, i, compositeId);
|
|
|
|
|
+ successCount++;
|
|
|
}
|
|
}
|
|
|
|
|
+ return successCount;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // ---- 单点模式:仅取第一段有效文本(向后兼容) ----
|
|
|
|
|
+ String text = null;
|
|
|
|
|
+ for (String t : texts) {
|
|
|
|
|
+ if (StringUtils.hasText(t)) {
|
|
|
|
|
+ text = t;
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (text == null) {
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 无有效文本,跳过", videoId, configCode);
|
|
|
|
|
+ return 0;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
|
|
+ }
|
|
|
|
|
+ List<Float> vector = embeddingService.embed(text, config);
|
|
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
|
|
+ log.warn("videoId={} 配置 {} 文本向量化失败", videoId, configCode);
|
|
|
|
|
+ return 0;
|
|
|
|
|
+ }
|
|
|
|
|
+ vectorStoreService.save(configCode, videoId, vector);
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 向量化存储成功", videoId, configCode);
|
|
|
|
|
+ return 1;
|
|
|
}
|
|
}
|
|
|
- if (text == null) {
|
|
|
|
|
- log.debug("videoId={} 配置 {} 无有效文本,跳过", videoId, configCode);
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 文本截断
|
|
|
|
|
- if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
|
|
- text = text.substring(0, maxLength);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 向量化
|
|
|
|
|
- List<Float> vector = embeddingService.embed(text, config);
|
|
|
|
|
- if (vector == null || vector.isEmpty()) {
|
|
|
|
|
- log.warn("videoId={} 配置 {} 文本向量化失败", videoId, configCode);
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 以 configCode 为命名空间存储
|
|
|
|
|
- vectorStoreService.save(configCode, videoId, vector);
|
|
|
|
|
- log.debug("videoId={} 配置 {} 向量化存储成功", videoId, configCode);
|
|
|
|
|
- return true;
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -490,9 +663,21 @@ public class VideoVectorJob {
|
|
|
String configCode = config.getConfigCode();
|
|
String configCode = config.getConfigCode();
|
|
|
|
|
|
|
|
// 4.1 查询该配置下已有向量的 videoId,排除已处理过的
|
|
// 4.1 查询该配置下已有向量的 videoId,排除已处理过的
|
|
|
- Set<Long> existingIds = vectorStoreService.existsByIds(configCode, allVideoIds);
|
|
|
|
|
|
|
+ boolean multiPoint = isMultiPointConfig(config);
|
|
|
|
|
+ Set<Long> existingVideoIds;
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ List<Long> baseIds = allVideoIds.stream()
|
|
|
|
|
+ .map(id -> encodeMultiPointId(id, 0))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ Set<Long> existingBaseIds = vectorStoreService.existsByIds(configCode, baseIds);
|
|
|
|
|
+ existingVideoIds = existingBaseIds.stream()
|
|
|
|
|
+ .map(VideoVectorJob::decodeVideoId)
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ existingVideoIds = vectorStoreService.existsByIds(configCode, allVideoIds);
|
|
|
|
|
+ }
|
|
|
List<Long> needProcessIds = allVideoIds.stream()
|
|
List<Long> needProcessIds = allVideoIds.stream()
|
|
|
- .filter(id -> !existingIds.contains(id))
|
|
|
|
|
|
|
+ .filter(id -> !existingVideoIds.contains(id))
|
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
if (needProcessIds.isEmpty()) {
|
|
if (needProcessIds.isEmpty()) {
|
|
|
log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
@@ -526,17 +711,17 @@ public class VideoVectorJob {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 从 dataContent 中提取选题文本
|
|
|
|
|
- List<String> texts = extractTopicFromDataContent(dataContent);
|
|
|
|
|
|
|
+ // 从 dataContent 中提取文本(支持置信度过滤)
|
|
|
|
|
+ List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
if (CollectionUtils.isEmpty(texts)) {
|
|
|
log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
totalFailCount++;
|
|
totalFailCount++;
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 向量化并写入 Redis
|
|
|
|
|
- boolean success = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (success) {
|
|
|
|
|
|
|
+ // 向量化并写入 Redis(多点模式返回成功数>0即为成功)
|
|
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (storeCount > 0) {
|
|
|
totalSuccessCount++;
|
|
totalSuccessCount++;
|
|
|
} else {
|
|
} else {
|
|
|
totalFailCount++;
|
|
totalFailCount++;
|
|
@@ -557,12 +742,34 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 从 dataContent 中提取选题文本
|
|
|
|
|
- * 默认复用配置的 sourcePath 提取逻辑
|
|
|
|
|
|
|
+ * 从 dataContent 中提取文本
|
|
|
|
|
+ * 根据配置的 extract_rule 决定是否进行置信度过滤
|
|
|
*
|
|
*
|
|
|
* @param dataContent dataContent 解析后的 JSONObject
|
|
* @param dataContent dataContent 解析后的 JSONObject
|
|
|
|
|
+ * @param config 向量化配置
|
|
|
* @return 提取的文本列表
|
|
* @return 提取的文本列表
|
|
|
*/
|
|
*/
|
|
|
|
|
+ private List<String> extractTextsFromDataContent(JSONObject dataContent, DeconstructVectorConfig config) {
|
|
|
|
|
+ if (dataContent == null) {
|
|
|
|
|
+ return Collections.emptyList();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String extractRule = config.getExtractRule();
|
|
|
|
|
+ if (StringUtils.hasText(extractRule)) {
|
|
|
|
|
+ // 多点模式:使用配置的 sourcePath 中相对路径 + 置信度过滤
|
|
|
|
|
+ // AIGC dataContent 的结构与 raw_result 中 final_normalization_rebuild 下的子结构一致
|
|
|
|
|
+ return extractTextsWithConfidence(dataContent, config.getSourcePath(), extractRule);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 单点模式:直接提取
|
|
|
|
|
+ return extractFromJson(dataContent, config.getSourcePath());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从 dataContent 中提取选题文本(向后兼容)
|
|
|
|
|
+ * @deprecated 请使用 extractTextsFromDataContent(dataContent, config)
|
|
|
|
|
+ */
|
|
|
|
|
+ @Deprecated
|
|
|
private List<String> extractTopicFromDataContent(JSONObject dataContent) {
|
|
private List<String> extractTopicFromDataContent(JSONObject dataContent) {
|
|
|
if (dataContent == null) {
|
|
if (dataContent == null) {
|
|
|
return Collections.emptyList();
|
|
return Collections.emptyList();
|
|
@@ -588,7 +795,7 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
// 1. 查询超时的任务(创建时间超过1小时,状态为PENDING或RUNNING)
|
|
// 1. 查询超时的任务(创建时间超过1小时,状态为PENDING或RUNNING)
|
|
|
- Date timeoutThreshold = new Date(System.currentTimeMillis() - TIMEOUT_MS);
|
|
|
|
|
|
|
+ Date timeoutThreshold = new Date(System.currentTimeMillis() - VectorConstants.TIMEOUT_MS);
|
|
|
|
|
|
|
|
DeconstructContentExample example = new DeconstructContentExample();
|
|
DeconstructContentExample example = new DeconstructContentExample();
|
|
|
example.createCriteria().andStatusIn(Arrays.asList((byte) 0, (byte) 1)) // PENDING=0, RUNNING=1
|
|
example.createCriteria().andStatusIn(Arrays.asList((byte) 0, (byte) 1)) // PENDING=0, RUNNING=1
|
|
@@ -666,41 +873,62 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 检查并移除审核状态不通过的视频向量
|
|
* 检查并移除审核状态不通过的视频向量
|
|
|
- * 每批次最多检查20个videoId
|
|
|
|
|
*
|
|
*
|
|
|
* @param configCode 配置编码
|
|
* @param configCode 配置编码
|
|
|
|
|
+ * @param multiPoint 是否为多点模式
|
|
|
*/
|
|
*/
|
|
|
- private void checkAndRemoveNotAuditPassedVideos(String configCode) {
|
|
|
|
|
|
|
+ private void checkAndRemoveNotAuditPassedVideos(String configCode, boolean multiPoint) {
|
|
|
try {
|
|
try {
|
|
|
- // 获取该配置下所有已有的视频ID
|
|
|
|
|
- Set<Long> allVideoIds = vectorStoreService.getAllVideoIds(configCode);
|
|
|
|
|
- if (allVideoIds == null || allVideoIds.isEmpty()) {
|
|
|
|
|
|
|
+ // 获取该配置下所有已有的存储ID
|
|
|
|
|
+ Set<Long> allStoredIds = vectorStoreService.getAllVideoIds(configCode);
|
|
|
|
|
+ if (allStoredIds == null || allStoredIds.isEmpty()) {
|
|
|
log.debug("配置 {} 下没有已存储的向量,跳过审核检查", configCode);
|
|
log.debug("配置 {} 下没有已存储的向量,跳过审核检查", configCode);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.info("配置 {} 开始检查审核状态,共 {} 个视频", configCode, allVideoIds.size());
|
|
|
|
|
|
|
+ // 多点模式:存储ID是复合ID,需要还原为真实videoId
|
|
|
|
|
+ Set<Long> realVideoIds;
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ realVideoIds = allStoredIds.stream()
|
|
|
|
|
+ .map(VideoVectorJob::decodeVideoId)
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ realVideoIds = allStoredIds;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("配置 {} 开始检查审核状态,共 {} 个视频(存储条目 {} 个)",
|
|
|
|
|
+ configCode, realVideoIds.size(), allStoredIds.size());
|
|
|
|
|
|
|
|
// 分批检查审核状态
|
|
// 分批检查审核状态
|
|
|
- List<Long> videoIdList = new ArrayList<>(allVideoIds);
|
|
|
|
|
|
|
+ List<Long> videoIdList = new ArrayList<>(realVideoIds);
|
|
|
int totalRemoved = 0;
|
|
int totalRemoved = 0;
|
|
|
|
|
|
|
|
- for (int i = 0; i < videoIdList.size(); i += AUDIT_CHECK_BATCH_SIZE) {
|
|
|
|
|
- int end = Math.min(i + AUDIT_CHECK_BATCH_SIZE, videoIdList.size());
|
|
|
|
|
|
|
+ for (int i = 0; i < videoIdList.size(); i += VectorConstants.AUDIT_CHECK_BATCH_SIZE) {
|
|
|
|
|
+ int end = Math.min(i + VectorConstants.AUDIT_CHECK_BATCH_SIZE, videoIdList.size());
|
|
|
Set<Long> batchIds = new HashSet<>(videoIdList.subList(i, end));
|
|
Set<Long> batchIds = new HashSet<>(videoIdList.subList(i, end));
|
|
|
|
|
|
|
|
// 获取审核未通过的视频ID
|
|
// 获取审核未通过的视频ID
|
|
|
Set<Long> notPassedIds = videoApiService.getNotAuditPassedVideoIds(batchIds);
|
|
Set<Long> notPassedIds = videoApiService.getNotAuditPassedVideoIds(batchIds);
|
|
|
|
|
|
|
|
if (!notPassedIds.isEmpty()) {
|
|
if (!notPassedIds.isEmpty()) {
|
|
|
- // 批量删除审核不通过的视频向量
|
|
|
|
|
- vectorStoreService.deleteBatch(configCode, notPassedIds);
|
|
|
|
|
- totalRemoved += notPassedIds.size();
|
|
|
|
|
- log.info("配置 {} 移除审核不通过的视频 {} 个: {}", configCode, notPassedIds.size(), notPassedIds);
|
|
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ // 多点模式:找出所有属于未通过视频的复合ID进行删除
|
|
|
|
|
+ Set<Long> compositeIdsToDelete = allStoredIds.stream()
|
|
|
|
|
+ .filter(storedId -> notPassedIds.contains(decodeVideoId(storedId)))
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+ vectorStoreService.deleteBatch(configCode, compositeIdsToDelete);
|
|
|
|
|
+ totalRemoved += compositeIdsToDelete.size();
|
|
|
|
|
+ log.info("配置 {} 移除审核不通过的视频 {} 个(向量条目 {} 个): {}",
|
|
|
|
|
+ configCode, notPassedIds.size(), compositeIdsToDelete.size(), notPassedIds);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ vectorStoreService.deleteBatch(configCode, notPassedIds);
|
|
|
|
|
+ totalRemoved += notPassedIds.size();
|
|
|
|
|
+ log.info("配置 {} 移除审核不通过的视频 {} 个: {}", configCode, notPassedIds.size(), notPassedIds);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.info("配置 {} 审核检查完成,共移除 {} 个视频向量", configCode, totalRemoved);
|
|
|
|
|
|
|
+ log.info("配置 {} 审核检查完成,共移除 {} 个向量条目", configCode, totalRemoved);
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("配置 {} 检查审核状态失败: {}", configCode, e.getMessage(), e);
|
|
log.error("配置 {} 检查审核状态失败: {}", configCode, e.getMessage(), e);
|
|
@@ -719,7 +947,7 @@ public class VideoVectorJob {
|
|
|
return Collections.emptyList();
|
|
return Collections.emptyList();
|
|
|
}
|
|
}
|
|
|
Set<Long> notPassedIds = new HashSet<>();
|
|
Set<Long> notPassedIds = new HashSet<>();
|
|
|
- for (List<Long> batch : Lists.partition(videoIds, AUDIT_CHECK_BATCH_SIZE)) {
|
|
|
|
|
|
|
+ for (List<Long> batch : Lists.partition(videoIds, VectorConstants.AUDIT_CHECK_BATCH_SIZE)) {
|
|
|
try {
|
|
try {
|
|
|
Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
|
|
Set<Long> batchNotPassed = videoApiService.getNotAuditPassedVideoIds(new HashSet<>(batch));
|
|
|
notPassedIds.addAll(batchNotPassed);
|
|
notPassedIds.addAll(batchNotPassed);
|