|
@@ -1,13 +1,20 @@
|
|
|
package com.tzld.videoVector.job;
|
|
package com.tzld.videoVector.job;
|
|
|
|
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructContentMapper;
|
|
import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructContentMapper;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructVectorConfigMapper;
|
|
|
import com.tzld.videoVector.model.entity.DeconstructResult;
|
|
import com.tzld.videoVector.model.entity.DeconstructResult;
|
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContent;
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContent;
|
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContentExample;
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContentExample;
|
|
|
-import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContentVector;
|
|
|
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfig;
|
|
import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfig;
|
|
|
|
|
+import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfigExample;
|
|
|
import com.tzld.videoVector.service.DeconstructService;
|
|
import com.tzld.videoVector.service.DeconstructService;
|
|
|
-import com.tzld.videoVector.service.VectorizeService;
|
|
|
|
|
|
|
+import com.tzld.videoVector.service.EmbeddingService;
|
|
|
|
|
+import com.tzld.videoVector.service.VectorStoreService;
|
|
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -16,10 +23,7 @@ import org.springframework.util.CollectionUtils;
|
|
|
import org.springframework.util.StringUtils;
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
import javax.annotation.Resource;
|
|
|
-import java.util.Arrays;
|
|
|
|
|
-import java.util.Date;
|
|
|
|
|
-import java.util.List;
|
|
|
|
|
-import java.util.Set;
|
|
|
|
|
|
|
+import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
@@ -30,11 +34,17 @@ public class VideoVectorJob {
|
|
|
@Resource
|
|
@Resource
|
|
|
private DeconstructContentMapper deconstructContentMapper;
|
|
private DeconstructContentMapper deconstructContentMapper;
|
|
|
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private DeconstructVectorConfigMapper vectorConfigMapper;
|
|
|
|
|
+
|
|
|
@Resource
|
|
@Resource
|
|
|
private DeconstructService deconstructService;
|
|
private DeconstructService deconstructService;
|
|
|
|
|
|
|
|
@Resource
|
|
@Resource
|
|
|
- private VectorizeService vectorizeService;
|
|
|
|
|
|
|
+ private VectorStoreService vectorStoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private EmbeddingService embeddingService;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 每页查询数量
|
|
* 每页查询数量
|
|
@@ -46,6 +56,11 @@ public class VideoVectorJob {
|
|
|
*/
|
|
*/
|
|
|
private static final long TIMEOUT_MS = 60 * 60 * 1000L;
|
|
private static final long TIMEOUT_MS = 60 * 60 * 1000L;
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 内容类型:视频
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final byte CONTENT_TYPE_VIDEO = 3;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 视频向量化
|
|
* 视频向量化
|
|
|
* 根据配置对解构内容进行向量化
|
|
* 根据配置对解构内容进行向量化
|
|
@@ -57,88 +72,93 @@ public class VideoVectorJob {
|
|
|
public ReturnT<String> vectorVideoJob(String param) {
|
|
public ReturnT<String> vectorVideoJob(String param) {
|
|
|
log.info("开始执行视频向量化任务, param: {}", param);
|
|
log.info("开始执行视频向量化任务, param: {}", param);
|
|
|
|
|
|
|
|
- int totalSuccessCount = 0;
|
|
|
|
|
- int totalFailCount = 0;
|
|
|
|
|
- int totalSkipCount = 0;
|
|
|
|
|
- int pageNum = 0;
|
|
|
|
|
-
|
|
|
|
|
try {
|
|
try {
|
|
|
- // 1. 获取向量配置(视频类型 content_type=3)
|
|
|
|
|
- List<DeconstructVectorConfig> configs = vectorizeService.getVectorConfigs(null, 3);
|
|
|
|
|
|
|
+ // 1. 获取所有启用的向量化配置(content_type=3 视频)
|
|
|
|
|
+ List<DeconstructVectorConfig> configs = getEnabledConfigs(CONTENT_TYPE_VIDEO);
|
|
|
if (CollectionUtils.isEmpty(configs)) {
|
|
if (CollectionUtils.isEmpty(configs)) {
|
|
|
- log.warn("未找到视频类型的向量配置");
|
|
|
|
|
|
|
+ log.warn("未找到启用的向量化配置");
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
}
|
|
|
- log.info("加载 {} 个向量配置", configs.size());
|
|
|
|
|
|
|
+ log.info("加载 {} 个向量化配置", configs.size());
|
|
|
|
|
+
|
|
|
|
|
+ int totalSuccessCount = 0;
|
|
|
|
|
+ int totalFailCount = 0;
|
|
|
|
|
+ int pageNum = 0;
|
|
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
|
- // 2. 分页查询解构成功的内容
|
|
|
|
|
- List<DeconstructContent> contents = querySuccessContentsByPage(pageNum, PAGE_SIZE, (byte) 3);
|
|
|
|
|
- if (CollectionUtils.isEmpty(contents)) {
|
|
|
|
|
|
|
+ // 2. 分页查询 videoId 列表
|
|
|
|
|
+ List<Long> videoIds = queryVideoIdsByPage(pageNum, PAGE_SIZE);
|
|
|
|
|
+ if (videoIds == null || videoIds.isEmpty()) {
|
|
|
log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
- log.info("第 {} 页查询到 {} 条解构内容", pageNum, contents.size());
|
|
|
|
|
-
|
|
|
|
|
- // 3. 逐个处理内容
|
|
|
|
|
- for (DeconstructContent content : contents) {
|
|
|
|
|
- try {
|
|
|
|
|
- // 3.1 查询已有向量
|
|
|
|
|
- List<DeconstructContentVector> existingVectors = vectorizeService.getVectorsByContentId(content.getId());
|
|
|
|
|
- Set<String> existingFields = existingVectors.stream()
|
|
|
|
|
- .map(DeconstructContentVector::getSourceField)
|
|
|
|
|
- .collect(Collectors.toSet());
|
|
|
|
|
-
|
|
|
|
|
- // 3.2 遍历配置,对缺失的向量进行补充
|
|
|
|
|
- boolean hasNewVector = false;
|
|
|
|
|
- for (DeconstructVectorConfig config : configs) {
|
|
|
|
|
- String sourceField = config.getSourceField();
|
|
|
|
|
-
|
|
|
|
|
- // 检查是否已有该字段的向量
|
|
|
|
|
- if (existingFields.contains(sourceField)) {
|
|
|
|
|
- log.debug("contentId={} 已有 {} 字段向量,跳过", content.getId(), sourceField);
|
|
|
|
|
|
|
+ log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 批量查询视频详情(包含 raw_result)
|
|
|
|
|
+ Map<Long, String> videoRawResults = batchQueryVideoRawResults(videoIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 对每个配置进行处理
|
|
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+
|
|
|
|
|
+ // 4.1 查询哪些 videoId 在该配置下已有向量
|
|
|
|
|
+ Set<Long> existingIds = vectorStoreService.existsByIds(configCode, videoIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 4.2 过滤出需要处理的 videoId
|
|
|
|
|
+ List<Long> needProcessIds = videoIds.stream()
|
|
|
|
|
+ .filter(id -> !existingIds.contains(id) && videoRawResults.containsKey(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+ if (needProcessIds.isEmpty()) {
|
|
|
|
|
+ log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 4.3 逐个处理
|
|
|
|
|
+ for (Long videoId : needProcessIds) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String rawResult = videoRawResults.get(videoId);
|
|
|
|
|
+ if (!StringUtils.hasText(rawResult)) {
|
|
|
|
|
+ log.debug("videoId={} raw_result 为空,跳过", videoId);
|
|
|
|
|
+ totalFailCount++;
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 检查原始数据是否有该字段内容
|
|
|
|
|
- if (!hasSourceContent(content, config)) {
|
|
|
|
|
- log.debug("contentId={} 无 {} 字段原始数据,跳过", content.getId(), sourceField);
|
|
|
|
|
|
|
+ // 根据配置提取文本
|
|
|
|
|
+ List<String> texts = extractTextsFromRawResult(rawResult, config);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
|
|
|
|
|
+ totalFailCount++;
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 执行向量化
|
|
|
|
|
- log.info("contentId={} 开始向量化字段 {}", content.getId(), sourceField);
|
|
|
|
|
- List<DeconstructContentVector> newVectors = vectorizeService.vectorizeByConfig(content, config);
|
|
|
|
|
- if (!CollectionUtils.isEmpty(newVectors)) {
|
|
|
|
|
- vectorizeService.batchSaveVectors(newVectors);
|
|
|
|
|
- hasNewVector = true;
|
|
|
|
|
|
|
+ // 向量化并存储
|
|
|
|
|
+ boolean success = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (success) {
|
|
|
totalSuccessCount++;
|
|
totalSuccessCount++;
|
|
|
- log.info("contentId={} 字段 {} 向量化完成,生成 {} 条向量",
|
|
|
|
|
- content.getId(), sourceField, newVectors.size());
|
|
|
|
|
|
|
+ } else {
|
|
|
|
|
+ totalFailCount++;
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- if (!hasNewVector) {
|
|
|
|
|
- totalSkipCount++;
|
|
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
+ totalFailCount++;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("处理 contentId={} 时发生异常: {}", content.getId(), e.getMessage(), e);
|
|
|
|
|
- totalFailCount++;
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
// 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
|
|
|
- if (contents.size() < PAGE_SIZE) {
|
|
|
|
|
- log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, contents.size(), PAGE_SIZE);
|
|
|
|
|
|
|
+ if (videoIds.size() < PAGE_SIZE) {
|
|
|
|
|
+ log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), PAGE_SIZE);
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pageNum++;
|
|
pageNum++;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.info("视频向量化任务完成,总成功: {}, 总失败: {}, 总跳过: {}, 总页数: {}",
|
|
|
|
|
- totalSuccessCount, totalFailCount, totalSkipCount, pageNum + 1);
|
|
|
|
|
|
|
+ log.info("视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount, totalFailCount, pageNum + 1);
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -148,45 +168,235 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 分页查询解构成功的内容
|
|
|
|
|
- *
|
|
|
|
|
- * @param pageNum 页码
|
|
|
|
|
- * @param pageSize 每页数量
|
|
|
|
|
- * @param contentType 内容类型
|
|
|
|
|
- * @return 内容列表
|
|
|
|
|
|
|
+ * 获取启用的向量化配置
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<DeconstructVectorConfig> getEnabledConfigs(byte contentType) {
|
|
|
|
|
+ DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
|
|
+ example.createCriteria()
|
|
|
|
|
+ .andEnabledEqualTo((byte) 1)
|
|
|
|
|
+ .andContentTypeEqualTo(contentType);
|
|
|
|
|
+ example.setOrderByClause("priority ASC");
|
|
|
|
|
+ return vectorConfigMapper.selectByExample(example);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量查询视频的 raw_result
|
|
|
*/
|
|
*/
|
|
|
- private List<DeconstructContent> querySuccessContentsByPage(int pageNum, int pageSize, Byte contentType) {
|
|
|
|
|
- DeconstructContentExample example = new DeconstructContentExample();
|
|
|
|
|
- DeconstructContentExample.Criteria criteria = example.createCriteria();
|
|
|
|
|
- criteria.andStatusEqualTo((byte) 2); // SUCCESS
|
|
|
|
|
- if (contentType != null) {
|
|
|
|
|
- criteria.andContentTypeEqualTo(contentType);
|
|
|
|
|
|
|
+ private Map<Long, String> batchQueryVideoRawResults(List<Long> videoIds) {
|
|
|
|
|
+ if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
}
|
|
}
|
|
|
- example.setOrderByClause("id ASC LIMIT " + (pageNum * pageSize) + ", " + pageSize);
|
|
|
|
|
- return deconstructContentMapper.selectByExampleWithBLOBs(example);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 构建IN查询
|
|
|
|
|
+ String idsStr = videoIds.stream()
|
|
|
|
|
+ .map(String::valueOf)
|
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
|
+
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id, raw_result " +
|
|
|
|
|
+ "FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 AND video_id IN (%s)",
|
|
|
|
|
+ idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records == null || records.isEmpty()) {
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Map<Long, String> result = new HashMap<>();
|
|
|
|
|
+ for (Record record : records) {
|
|
|
|
|
+ Long videoId = record.getBigint("video_id");
|
|
|
|
|
+ String rawResult = record.getString("raw_result");
|
|
|
|
|
+ if (videoId != null && rawResult != null) {
|
|
|
|
|
+ result.put(videoId, rawResult);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return result;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 检查内容是否有配置所需的原始数据
|
|
|
|
|
- *
|
|
|
|
|
- * @param content 内容
|
|
|
|
|
- * @param config 配置
|
|
|
|
|
- * @return 是否有原始数据
|
|
|
|
|
|
|
+ * 根据配置从 raw_result 中提取文本
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> extractTextsFromRawResult(String rawResult, DeconstructVectorConfig config) {
|
|
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject json = JSON.parseObject(rawResult);
|
|
|
|
|
+ if (json == null) {
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sourcePath = config.getSourcePath();
|
|
|
|
|
+ if (!StringUtils.hasText(sourcePath)) {
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 解析路径并提取
|
|
|
|
|
+ texts.addAll(extractFromJson(json, sourcePath));
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解析 raw_result 失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从JSON中提取文本
|
|
|
|
|
+ * 支持路径格式:$.final_normalization_rebuild.topic_fusion_result.最终选题.选题
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> extractFromJson(JSONObject json, String path) {
|
|
|
|
|
+ List<String> results = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ if (json == null || !StringUtils.hasText(path)) {
|
|
|
|
|
+ return results;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 路径处理
|
|
|
|
|
+ if (path.startsWith("$.")) {
|
|
|
|
|
+ String pathContent = path.substring(2);
|
|
|
|
|
+ List<String> parts = parseJsonPath(pathContent);
|
|
|
|
|
+ Object current = json;
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < parts.size(); i++) {
|
|
|
|
|
+ String part = parts.get(i);
|
|
|
|
|
+ if (current == null) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理数组路径(如 topics[*])
|
|
|
|
|
+ if (part.endsWith("[*]")) {
|
|
|
|
|
+ String arrayKey = part.substring(0, part.length() - 3);
|
|
|
|
|
+ if (current instanceof JSONObject) {
|
|
|
|
|
+ JSONArray array = ((JSONObject) current).getJSONArray(arrayKey);
|
|
|
|
|
+ if (array != null) {
|
|
|
|
|
+ List<String> remainingParts = parts.subList(i + 1, parts.size());
|
|
|
|
|
+ String remainingPath = String.join(".", remainingParts);
|
|
|
|
|
+ for (int j = 0; j < array.size(); j++) {
|
|
|
|
|
+ Object item = array.get(j);
|
|
|
|
|
+ if (remainingParts.isEmpty()) {
|
|
|
|
|
+ if (item instanceof String) {
|
|
|
|
|
+ results.add((String) item);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ results.addAll(extractFromJson(
|
|
|
|
|
+ JSON.parseObject(JSON.toJSONString(item)), "$." + remainingPath));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return results;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ if (current instanceof JSONObject) {
|
|
|
|
|
+ current = ((JSONObject) current).get(part);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (current instanceof String) {
|
|
|
|
|
+ results.add((String) current);
|
|
|
|
|
+ } else if (current instanceof JSONArray) {
|
|
|
|
|
+ JSONArray array = (JSONArray) current;
|
|
|
|
|
+ for (int i = 0; i < array.size(); i++) {
|
|
|
|
|
+ Object item = array.get(i);
|
|
|
|
|
+ if (item instanceof String) {
|
|
|
|
|
+ results.add((String) item);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("JSON提取失败,path={}, error={}", path, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return results;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 解析 JSONPath 路径
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> parseJsonPath(String pathContent) {
|
|
|
|
|
+ List<String> parts = new ArrayList<>();
|
|
|
|
|
+ StringBuilder current = new StringBuilder();
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < pathContent.length(); i++) {
|
|
|
|
|
+ char c = pathContent.charAt(i);
|
|
|
|
|
+ if (c == '.') {
|
|
|
|
|
+ if (current.length() > 0) {
|
|
|
|
|
+ parts.add(current.toString());
|
|
|
|
|
+ current = new StringBuilder();
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ current.append(c);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (current.length() > 0) {
|
|
|
|
|
+ parts.add(current.toString());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return parts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 向量化并存储
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean vectorizeAndStore(DeconstructVectorConfig config, Long videoId, List<String> texts) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ Integer maxLength = config.getMaxLength();
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < texts.size(); i++) {
|
|
|
|
|
+ String text = texts.get(i);
|
|
|
|
|
+ if (!StringUtils.hasText(text)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 文本截断
|
|
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 向量化
|
|
|
|
|
+ List<Float> vector = embeddingService.embed(text);
|
|
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
|
|
+ log.warn("videoId={} 配置 {} 文本向量化失败", videoId, configCode);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 存储(如果有多个分段,使用 configCode:segmentIndex 作为key的一部分)
|
|
|
|
|
+ String storeConfigCode = texts.size() > 1 ? configCode + ":" + i : configCode;
|
|
|
|
|
+ vectorStoreService.save(storeConfigCode, videoId, vector);
|
|
|
|
|
+ log.debug("videoId={} 配置 {} 向量化存储成功", videoId, storeConfigCode);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 分页查询 videoId 列表
|
|
|
|
|
+ * @param pageNum 页码(从0开始)
|
|
|
|
|
+ * @param pageSize 每页数量
|
|
|
|
|
+ * @return videoId 列表
|
|
|
*/
|
|
*/
|
|
|
- private boolean hasSourceContent(DeconstructContent content, DeconstructVectorConfig config) {
|
|
|
|
|
- String sourceField = config.getSourceField();
|
|
|
|
|
-
|
|
|
|
|
- switch (sourceField) {
|
|
|
|
|
- case "title":
|
|
|
|
|
- return StringUtils.hasText(content.getTitle());
|
|
|
|
|
- case "body_text":
|
|
|
|
|
- return StringUtils.hasText(content.getBodyText());
|
|
|
|
|
- case "result_json":
|
|
|
|
|
- return StringUtils.hasText(content.getResultJson()) && StringUtils.hasText(config.getSourcePath());
|
|
|
|
|
- default:
|
|
|
|
|
- // 其他字段从 result_json 中提取
|
|
|
|
|
- return StringUtils.hasText(content.getResultJson()) && StringUtils.hasText(config.getSourcePath());
|
|
|
|
|
|
|
+ private List<Long> queryVideoIdsByPage(int pageNum, int pageSize) {
|
|
|
|
|
+ int offset = pageNum * pageSize;
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id " +
|
|
|
|
|
+ "FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 " +
|
|
|
|
|
+ "ORDER BY video_id " +
|
|
|
|
|
+ "LIMIT %d, %d",
|
|
|
|
|
+ offset, pageSize);
|
|
|
|
|
+ List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records == null || records.isEmpty()) {
|
|
|
|
|
+ return new ArrayList<>();
|
|
|
}
|
|
}
|
|
|
|
|
+ return records.stream()
|
|
|
|
|
+ .map(record -> record.getBigint("video_id"))
|
|
|
|
|
+ .filter(Objects::nonNull)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|