Browse Source

素材解构 多点模式

wangyunpeng 1 week ago
parent
commit
07102548fe

+ 157 - 0
core/src/main/java/com/tzld/videoVector/job/MaterialDeconstructCheckJob.java

@@ -0,0 +1,157 @@
+package com.tzld.videoVector.job;
+
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
+import com.tzld.videoVector.model.entity.DeconstructResult;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContentExample;
+import com.tzld.videoVector.service.DeconstructService;
+import com.tzld.videoVector.service.VectorizeService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 素材解构结果轮询定时任务
+ * <p>
+ * 扫描 deconstruct_content 中未完成(status=0 PENDING 或 status=1 RUNNING)的记录,
+ * 调用解构 API 查询结果,更新状态。
+ * 若解构成功(status=2),自动触发 VectorizeService 向量化。
+ */
+@Slf4j
+@Component
+public class MaterialDeconstructCheckJob {
+
+    @Resource
+    private DeconstructContentMapper deconstructContentMapper;
+
+    @Resource
+    private DeconstructService deconstructService;
+
+    @Resource
+    private VectorizeService vectorizeService;
+
+    /**
+     * 素材解构结果轮询
+     * 扫描未完成的解构记录,查询 API 更新状态,完成后自动向量化
+     */
+    @XxlJob("checkMaterialDeconstructJob")
+    public ReturnT<String> checkMaterialDeconstructJob(String param) {
+        log.info("开始执行素材解构结果轮询任务, param: {}", param);
+
+        try {
+            int totalChecked = 0;
+            int totalCompleted = 0;
+            int totalVectorized = 0;
+            int totalFailed = 0;
+            int pageNum = 0;
+            int pageSize = 50;
+
+            while (true) {
+                // 分页查询未完成的解构记录(status=0 或 status=1)
+                List<DeconstructContent> contents = queryPendingContents(pageNum, pageSize);
+                if (CollectionUtils.isEmpty(contents)) {
+                    log.info("第 {} 页无待查询数据,轮询结束", pageNum);
+                    break;
+                }
+                log.info("第 {} 页查询到 {} 条未完成的解构记录", pageNum, contents.size());
+
+                for (DeconstructContent content : contents) {
+                    totalChecked++;
+                    try {
+                        String taskId = content.getTaskId();
+                        if (!StringUtils.hasText(taskId)) {
+                            log.warn("contentId={} 的 taskId 为空,跳过", content.getId());
+                            continue;
+                        }
+
+                        // 调用解构 API 查询结果
+                        DeconstructResult result = deconstructService.getDeconstructResult(taskId);
+                        if (result == null) {
+                            log.warn("查询解构结果返回空,taskId={}", taskId);
+                            continue;
+                        }
+
+                        // 更新状态
+                        Short newStatus = result.getStatus() != null
+                                ? result.getStatus().shortValue() : content.getStatus();
+
+                        // 状态未变化则跳过更新
+                        if (newStatus.equals(content.getStatus())) {
+                            log.debug("解构状态未变化,taskId={}, status={}", taskId, newStatus);
+                            continue;
+                        }
+
+                        content.setStatus(newStatus);
+                        content.setResultJson(result.getResult());
+                        content.setFailureReason(result.getReason());
+                        content.setPointUrl(result.getPointUrl());
+                        content.setWeightUrl(result.getWeightUrl());
+                        content.setPatternUrl(result.getPatternUrl());
+                        content.setUpdateTime(new Date());
+                        deconstructContentMapper.updateByPrimaryKeySelective(content);
+
+                        log.info("更新解构状态,contentId={}, taskId={}, status={} -> {}",
+                                content.getId(), taskId, content.getStatus(), newStatus);
+
+                        // 解构成功,触发向量化
+                        if (newStatus == 2) {
+                            totalCompleted++;
+                            try {
+                                vectorizeService.vectorizeContent(content);
+                                totalVectorized++;
+                                log.info("解构完成并向量化成功,contentId={}, taskId={}",
+                                        content.getId(), taskId);
+                            } catch (Exception e) {
+                                log.error("向量化失败,contentId={}, taskId={}, error={}",
+                                        content.getId(), taskId, e.getMessage(), e);
+                            }
+                        } else if (newStatus == 3) {
+                            totalFailed++;
+                            log.warn("解构失败,contentId={}, taskId={}, reason={}",
+                                    content.getId(), taskId, result.getReason());
+                        }
+
+                    } catch (Exception e) {
+                        log.error("处理解构结果失败,contentId={}, error={}",
+                                content.getId(), e.getMessage(), e);
+                    }
+                }
+
+                if (contents.size() < pageSize) {
+                    log.info("第 {} 页数据量 {} 小于 pageSize {},轮询结束",
+                            pageNum, contents.size(), pageSize);
+                    break;
+                }
+                pageNum++;
+            }
+
+            log.info("素材解构结果轮询完成,检查: {}, 完成: {}, 向量化: {}, 失败: {}",
+                    totalChecked, totalCompleted, totalVectorized, totalFailed);
+            return ReturnT.SUCCESS;
+
+        } catch (Exception e) {
+            log.error("素材解构结果轮询任务异常: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务异常: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 分页查询未完成的解构内容(status=0 PENDING 或 status=1 RUNNING)
+     */
+    private List<DeconstructContent> queryPendingContents(int pageNum, int pageSize) {
+        DeconstructContentExample example = new DeconstructContentExample();
+        DeconstructContentExample.Criteria criteria = example.createCriteria();
+        // status IN (0, 1)
+        List<Short> pendingStatuses = java.util.Arrays.asList((short) 0, (short) 1);
+        criteria.andStatusIn(pendingStatuses);
+        example.setOrderByClause("id ASC LIMIT " + pageSize + " OFFSET " + (pageNum * pageSize));
+        return deconstructContentMapper.selectByExample(example);
+    }
+}

+ 96 - 0
core/src/main/java/com/tzld/videoVector/job/MaterialVectorJob.java

@@ -0,0 +1,96 @@
+package com.tzld.videoVector.job;
+
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContentExample;
+import com.tzld.videoVector.service.VectorizeService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * 素材批量向量化定时任务
+ * <p>
+ * 扫描 deconstruct_content 中解构完成(status=2)的记录,
+ * 调用 VectorizeService.vectorizeContent() 进行向量化(内部自动幂等)。
+ * <p>
+ * 复用 deconstruct_vector_config 中 biz_type=NULL 的通用配置,与视频解构配置共享。
+ */
+@Slf4j
+@Component
+public class MaterialVectorJob {
+
+    @Resource
+    private DeconstructContentMapper deconstructContentMapper;
+
+    @Resource
+    private VectorizeService vectorizeService;
+
+    /**
+     * 素材批量向量化
+     * 扫描解构完成的素材,逐条调用 VectorizeService 进行向量化
+     */
+    @XxlJob("vectorMaterialJob")
+    public ReturnT<String> vectorMaterialJob(String param) {
+        log.info("开始执行素材批量向量化任务, param: {}", param);
+
+        try {
+            int totalSuccess = 0;
+            int totalFail = 0;
+            int pageNum = 0;
+            int pageSize = 100;
+
+            while (true) {
+                // 分页查询解构完成的内容(status=2)
+                List<DeconstructContent> contents = queryDeconstructedContents(pageNum, pageSize);
+                if (CollectionUtils.isEmpty(contents)) {
+                    log.info("第 {} 页无数据,分页结束", pageNum);
+                    break;
+                }
+                log.info("第 {} 页查询到 {} 条解构完成的内容", pageNum, contents.size());
+
+                for (DeconstructContent content : contents) {
+                    try {
+                        // VectorizeService 内部自动幂等:已有向量的 configCode 会跳过
+                        vectorizeService.vectorizeContent(content);
+                        totalSuccess++;
+                    } catch (Exception e) {
+                        log.error("素材向量化失败,contentId={}, error={}",
+                                content.getId(), e.getMessage(), e);
+                        totalFail++;
+                    }
+                }
+
+                if (contents.size() < pageSize) {
+                    log.info("第 {} 页数据量 {} 小于 pageSize {},分页结束",
+                            pageNum, contents.size(), pageSize);
+                    break;
+                }
+                pageNum++;
+            }
+
+            log.info("素材批量向量化完成,成功: {}, 失败: {}, 总页数: {}",
+                    totalSuccess, totalFail, pageNum + 1);
+            return ReturnT.SUCCESS;
+
+        } catch (Exception e) {
+            log.error("素材批量向量化任务异常: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务异常: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 分页查询解构完成的内容
+     */
+    private List<DeconstructContent> queryDeconstructedContents(int pageNum, int pageSize) {
+        DeconstructContentExample example = new DeconstructContentExample();
+        example.createCriteria().andStatusEqualTo((short) 2);
+        example.setOrderByClause("id ASC LIMIT " + pageSize + " OFFSET " + (pageNum * pageSize));
+        return deconstructContentMapper.selectByExample(example);
+    }
+}

+ 27 - 0
core/src/main/java/com/tzld/videoVector/model/param/MaterialMatchParam.java

@@ -0,0 +1,27 @@
+package com.tzld.videoVector.model.param;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * 素材相似搜索请求参数
+ */
+@Data
+public class MaterialMatchParam {
+
+    /** 查询文本,将被向量化后进行检索 */
+    private String queryText;
+
+    /** 直接传入查询向量(与 queryText 二选一,优先使用此字段) */
+    private List<Float> queryVector;
+
+    /** 业务内容ID,用于复用历史向量(可选) */
+    private String channelContentId;
+
+    /** 配置编码(默认复用 VIDEO_TOPIC 等通用配置,传 ALL 搜所有启用配置) */
+    private String configCode;
+
+    /** 返回 Top-N 结果数量,默认 10 */
+    private Integer topN;
+}

+ 36 - 0
core/src/main/java/com/tzld/videoVector/model/param/MaterialSubmitParam.java

@@ -0,0 +1,36 @@
+package com.tzld.videoVector.model.param;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * 素材入库请求参数
+ */
+@Data
+public class MaterialSubmitParam {
+
+    /** 素材业务ID(复用 deconstruct_content.channel_content_id) */
+    private String channelContentId;
+
+    /** 内容类型:1长文 2图文 3视频 */
+    private Integer contentType;
+
+    /** 素材标题 */
+    private String title;
+
+    /** 素材正文/描述 */
+    private String bodyText;
+
+    /** 视频地址(可选) */
+    private String videoUrl;
+
+    /** 图片列表(可选) */
+    private List<String> imageList;
+
+    /** 作者ID(可选) */
+    private String channelAccountId;
+
+    /** 作者名称(可选) */
+    private String channelAccountName;
+}

+ 41 - 0
core/src/main/java/com/tzld/videoVector/model/vo/MaterialMatchResult.java

@@ -0,0 +1,41 @@
+package com.tzld.videoVector.model.vo;
+
+import lombok.Data;
+
+/**
+ * 素材匹配结果
+ */
+@Data
+public class MaterialMatchResult {
+
+    /** 命中的配置编码 */
+    private String configCode;
+
+    /** deconstruct_content.id */
+    private Long contentId;
+
+    /** 素材业务ID(deconstruct_content.channel_content_id) */
+    private String channelContentId;
+
+    /** 余弦相似度分值 */
+    private Double score;
+
+    /** 素材标题 */
+    private String title;
+
+    /** 命中的向量原文 */
+    private String sourceText;
+
+    public MaterialMatchResult() {
+    }
+
+    public MaterialMatchResult(String configCode, Long contentId, String channelContentId,
+                               Double score, String title, String sourceText) {
+        this.configCode = configCode;
+        this.contentId = contentId;
+        this.channelContentId = channelContentId;
+        this.score = score;
+        this.title = title;
+        this.sourceText = sourceText;
+    }
+}

+ 32 - 0
core/src/main/java/com/tzld/videoVector/service/MaterialSearchService.java

@@ -0,0 +1,32 @@
+package com.tzld.videoVector.service;
+
+import com.tzld.videoVector.model.param.MaterialMatchParam;
+import com.tzld.videoVector.model.param.MaterialSubmitParam;
+import com.tzld.videoVector.model.vo.MaterialMatchResult;
+
+import java.util.List;
+
+/**
+ * 素材搜索服务接口
+ * <p>
+ * 复用现有 deconstruct_content / content_vectors / deconstruct_vector_config 表,
+ * 通过 biz_type 区分素材与视频,向量化配置完全复用视频解构配置(biz_type=NULL 通配)。
+ */
+public interface MaterialSearchService {
+
+    /**
+     * 提交素材入库(解构 + 异步向量化)
+     *
+     * @param param 素材入库参数
+     * @return taskId 解构任务ID
+     */
+    String submitMaterial(MaterialSubmitParam param);
+
+    /**
+     * 素材相似搜索 Top-N
+     *
+     * @param param 搜索参数
+     * @return 匹配结果列表
+     */
+    List<MaterialMatchResult> matchTopNMaterial(MaterialMatchParam param);
+}

+ 469 - 0
core/src/main/java/com/tzld/videoVector/service/impl/MaterialSearchServiceImpl.java

@@ -0,0 +1,469 @@
+package com.tzld.videoVector.service.impl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.tzld.videoVector.common.constant.VectorConstants;
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
+import com.tzld.videoVector.dao.mapper.pgVector.ext.ContentVectorMapperExt;
+import com.tzld.videoVector.model.param.MaterialMatchParam;
+import com.tzld.videoVector.model.param.MaterialSubmitParam;
+import com.tzld.videoVector.model.po.pgVector.ContentVector;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
+import com.tzld.videoVector.model.po.pgVector.DeconstructContentExample;
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
+import com.tzld.videoVector.model.vo.MaterialMatchResult;
+import com.tzld.videoVector.service.DeconstructService;
+import com.tzld.videoVector.service.EmbeddingService;
+import com.tzld.videoVector.service.MaterialSearchService;
+import com.tzld.videoVector.service.VectorizeService;
+import com.tzld.videoVector.util.Md5Util;
+import com.tzld.videoVector.util.VectorUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.tzld.videoVector.common.constant.VectorConstants.*;
+
+/**
+ * 素材搜索服务实现
+ * <p>
+ * 完全复用 deconstruct_content / content_vectors / deconstruct_vector_config 表,
+ * 配置复用视频解构配置(biz_type=NULL 通配所有业务类型)。
+ */
+@Slf4j
+@Service
+public class MaterialSearchServiceImpl implements MaterialSearchService {
+
+    @Resource
+    private DeconstructService deconstructService;
+
+    @Resource
+    private DeconstructContentMapper deconstructContentMapper;
+
+    @Resource
+    private DeconstructVectorConfigMapper deconstructVectorConfigMapper;
+
+    @Resource
+    private VectorizeService vectorizeService;
+
+    @Resource
+    private EmbeddingService embeddingService;
+
+    @Resource
+    private ContentVectorMapperExt contentVectorMapper;
+
+    // ================================================================ 入库
+    @Override
+    public String submitMaterial(MaterialSubmitParam param) {
+        if (param == null) {
+            log.error("submitMaterial 参数为空");
+            return null;
+        }
+
+        log.info("素材入库,channelContentId={}, title={}, contentType={}",
+                param.getChannelContentId(), param.getTitle(), param.getContentType());
+
+        // 幂等检查:channelContentId 是否已存在
+        String channelContentId = param.getChannelContentId();
+        DeconstructContent failedContent = null;
+        if (StringUtils.hasText(channelContentId)) {
+            DeconstructContent existing = getDeconstructContentByChannelContentId(channelContentId);
+            if (existing != null) {
+                Short status = existing.getStatus();
+                if (status != null && status != 3) {
+                    log.info("素材已存在,channelContentId={}, taskId={}, status={}, 不重复提交",
+                            channelContentId, existing.getTaskId(), status);
+                    return existing.getTaskId();
+                }
+                log.info("素材已存在但失败,channelContentId={}, 允许重新提交", channelContentId);
+                failedContent = existing;
+            }
+        }
+
+        Integer contentType = param.getContentType() != null ? param.getContentType() : 2;
+
+        // 调用解构服务
+        String taskId = deconstructService.deconstruct(
+                0,
+                contentType,
+                param.getChannelContentId(),
+                param.getVideoUrl(),
+                param.getImageList(),
+                param.getBodyText(),
+                param.getTitle(),
+                param.getChannelAccountId(),
+                param.getChannelAccountName()
+        );
+
+        if (!StringUtils.hasText(taskId)) {
+            log.error("素材解构任务提交失败");
+            return null;
+        }
+
+        log.info("素材解构任务提交成功,taskId={}", taskId);
+
+        // 保存到 deconstruct_content
+        try {
+            if (failedContent != null) {
+                failedContent.setTaskId(taskId);
+                failedContent.setBizType((short) 0);
+                failedContent.setContentType(contentType.shortValue());
+                failedContent.setTitle(param.getTitle());
+                failedContent.setBodyText(param.getBodyText());
+                failedContent.setVideoUrl(param.getVideoUrl());
+                failedContent.setChannelAccountId(param.getChannelAccountId());
+                failedContent.setChannelAccountName(param.getChannelAccountName());
+                failedContent.setStatus((short) 0);
+                failedContent.setResultJson("");
+                failedContent.setFailureReason("");
+                failedContent.setPointUrl("");
+                failedContent.setWeightUrl("");
+                failedContent.setPatternUrl("");
+                failedContent.setUpdateTime(new Date());
+                if (param.getImageList() != null && !param.getImageList().isEmpty()) {
+                    failedContent.setImages(JSONObject.toJSONString(param.getImageList()));
+                }
+                deconstructContentMapper.updateByPrimaryKeySelective(failedContent);
+                log.info("素材失败记录已更新,contentId={}, taskId={}", failedContent.getId(), taskId);
+            } else {
+                DeconstructContent content = new DeconstructContent();
+                content.setTaskId(taskId);
+                content.setBizType((short) 0);
+                content.setContentType(contentType.shortValue());
+                content.setChannelContentId(param.getChannelContentId());
+                content.setTitle(param.getTitle());
+                content.setBodyText(param.getBodyText());
+                content.setVideoUrl(param.getVideoUrl());
+                content.setChannelAccountId(param.getChannelAccountId());
+                content.setChannelAccountName(param.getChannelAccountName());
+                content.setStatus((short) 0);
+                content.setCreateTime(new Date());
+                content.setUpdateTime(new Date());
+                if (param.getImageList() != null && !param.getImageList().isEmpty()) {
+                    content.setImages(JSONObject.toJSONString(param.getImageList()));
+                }
+                deconstructContentMapper.insertSelective(content);
+                log.info("素材记录已保存,contentId={}, taskId={}", content.getId(), taskId);
+            }
+        } catch (Exception e) {
+            log.error("保存素材记录失败,taskId={}, error={}", taskId, e.getMessage(), e);
+        }
+
+        return taskId;
+    }
+
+    // ================================================================ 搜索
+    @Override
+    public List<MaterialMatchResult> matchTopNMaterial(MaterialMatchParam param) {
+        if (param == null) {
+            log.error("matchTopNMaterial 参数为空");
+            return Collections.emptyList();
+        }
+
+        int topN = param.getTopN() != null && param.getTopN() > 0 ? param.getTopN() : 10;
+        String configCode = param.getConfigCode();
+        if (!StringUtils.hasText(configCode)) {
+            configCode = DEFAULT_CONFIG_CODE;
+        }
+
+        // 若提供 channelContentId,检查是否需要触发向量化
+        if (StringUtils.hasText(param.getChannelContentId())
+                && !ALL_CONFIG_CODE.equalsIgnoreCase(configCode)) {
+            triggerVectorizeIfNeeded(param.getChannelContentId(), configCode);
+        }
+
+        // 确定要搜索的配置列表
+        List<DeconstructVectorConfig> searchConfigs;
+        if (ALL_CONFIG_CODE.equalsIgnoreCase(configCode)) {
+            searchConfigs = getEnabledConfigs();
+            if (searchConfigs.isEmpty()) {
+                log.warn("素材搜索:未找到任何启用的向量化配置");
+                return Collections.emptyList();
+            }
+            log.info("素材搜索 ALL 模式,加载 {} 个配置", searchConfigs.size());
+        } else {
+            DeconstructVectorConfig singleConfig = getVectorConfigByCode(configCode);
+            if (singleConfig == null) {
+                log.warn("素材搜索:未找到 configCode={} 的配置", configCode);
+                return Collections.emptyList();
+            }
+            searchConfigs = Collections.singletonList(singleConfig);
+        }
+
+        List<MaterialMatchResult> result = new ArrayList<>();
+        int candidateSize = topN * 3;
+        Map<String, List<Float>> embeddingCache = new HashMap<>();
+
+        for (DeconstructVectorConfig config : searchConfigs) {
+            String cfgCode = config.getConfigCode();
+            try {
+                // 解析查询向量
+                List<Float> queryVector = resolveQueryVector(param, config, embeddingCache);
+                if (queryVector == null || queryVector.isEmpty()) {
+                    log.warn("配置 {} 无法获取查询向量,跳过", cfgCode);
+                    continue;
+                }
+
+                log.info("素材搜索 配置 {} 开始 Top-{},向量维度={}", cfgCode, candidateSize, queryVector.size());
+
+                // 搜索 content_vectors
+                String queryVectorStr = queryVector.toString();
+                List<ContentVector> matches = contentVectorMapper.searchTopNByCosine(
+                        cfgCode, queryVectorStr, candidateSize);
+
+                if (CollectionUtils.isEmpty(matches)) {
+                    log.debug("配置 {} 无匹配结果", cfgCode);
+                    continue;
+                }
+
+                // 按 contentId 去重保留最高分
+                Map<Long, ContentVector> deduped = new LinkedHashMap<>();
+                for (ContentVector cv : matches) {
+                    Long contentId = cv.getContentId();
+                    ContentVector existing = deduped.get(contentId);
+                    if (existing == null || (cv.getScore() != null && cv.getScore() > existing.getScore())) {
+                        deduped.put(contentId, cv);
+                    }
+                }
+
+                // 取 topN 并填充素材详情
+                List<ContentVector> topMatches = deduped.values().stream()
+                        .limit(topN)
+                        .collect(Collectors.toList());
+
+                // 批量查询 deconstruct_content 填充 channelContentId / title
+                Set<Long> contentIds = topMatches.stream()
+                        .map(ContentVector::getContentId)
+                        .collect(Collectors.toSet());
+                Map<Long, DeconstructContent> contentMap = batchGetDeconstructContent(contentIds);
+
+                for (ContentVector cv : topMatches) {
+                    DeconstructContent dc = contentMap.get(cv.getContentId());
+                    result.add(new MaterialMatchResult(
+                            cfgCode,
+                            cv.getContentId(),
+                            dc != null ? dc.getChannelContentId() : null,
+                            cv.getScore(),
+                            dc != null ? dc.getTitle() : null,
+                            cv.getSourceText()
+                    ));
+                }
+
+                log.info("配置 {} 搜索完成,返回 {} 条", cfgCode, topMatches.size());
+
+            } catch (Exception e) {
+                log.error("配置 {} 搜索失败: {}", cfgCode, e.getMessage(), e);
+            }
+        }
+
+        log.info("素材搜索完成,共返回 {} 条结果", result.size());
+        return result;
+    }
+
+    // ================================================================ 私有方法
+
+    /**
+     * 解析查询向量
+     * 优先级:queryVector > channelContentId 历史向量 > queryText embedding
+     */
+    private List<Float> resolveQueryVector(MaterialMatchParam param,
+                                           DeconstructVectorConfig config,
+                                           Map<String, List<Float>> embeddingCache) {
+        // 1. 直接传入的 queryVector
+        if (param.getQueryVector() != null && !param.getQueryVector().isEmpty()) {
+            return param.getQueryVector();
+        }
+
+        // 2. channelContentId 历史向量
+        if (StringUtils.hasText(param.getChannelContentId())) {
+            List<Float> cached = getVectorByChannelContentId(
+                    param.getChannelContentId(), config.getConfigCode());
+            if (cached != null && !cached.isEmpty()) {
+                log.info("配置 {} 命中 channelContentId 历史向量", config.getConfigCode());
+                return cached;
+            }
+            log.info("配置 {} channelContentId={} 未命中,降级到 queryText",
+                    config.getConfigCode(), param.getChannelContentId());
+        }
+
+        // 3. queryText embedding(按 embeddingModel 缓存)
+        if (StringUtils.hasText(param.getQueryText())) {
+            String embeddingModel = config.getEmbeddingModel();
+            String cacheKey = embeddingModel != null ? embeddingModel : "__default__";
+
+            if (embeddingCache.containsKey(cacheKey)) {
+                return embeddingCache.get(cacheKey);
+            }
+
+            // text_hash 缓存查询
+            String textHash = Md5Util.encoderByMd5(param.getQueryText());
+            if (StringUtils.hasText(textHash)) {
+                List<Float> hashCached = getVectorByTextHash(textHash, config.getConfigCode());
+                if (hashCached != null && !hashCached.isEmpty()) {
+                    embeddingCache.put(cacheKey, hashCached);
+                    return hashCached;
+                }
+            }
+
+            // 调用 embedding API
+            List<Float> vector = embeddingService.embed(param.getQueryText(), config);
+            if (vector != null && !vector.isEmpty()) {
+                embeddingCache.put(cacheKey, vector);
+                return vector;
+            }
+            log.warn("配置 {} embedding 失败", config.getConfigCode());
+        }
+
+        return null;
+    }
+
+    /**
+     * 通过 channelContentId 查询历史向量
+     */
+    private List<Float> getVectorByChannelContentId(String channelContentId, String configCode) {
+        try {
+            DeconstructContent content = getDeconstructContentByChannelContentId(channelContentId);
+            if (content == null || content.getId() == null) {
+                return null;
+            }
+            List<ContentVector> vectors;
+            if (StringUtils.hasText(configCode)) {
+                vectors = contentVectorMapper.selectByContentIdAndConfigCode(content.getId(), configCode);
+            } else {
+                vectors = contentVectorMapper.selectByContentId(content.getId());
+            }
+            if (vectors == null || vectors.isEmpty()) {
+                return null;
+            }
+            ContentVector vector = vectors.get(0);
+            if (!StringUtils.hasText(vector.getEmbedding())) {
+                return null;
+            }
+            List<Float> vectorData = VectorUtils.parseVectorString(vector.getEmbedding());
+            log.info("复用历史向量,channelContentId={}, contentId={}, configCode={}, 维度={}",
+                    channelContentId, content.getId(), configCode,
+                    vectorData != null ? vectorData.size() : 0);
+            return vectorData;
+        } catch (Exception e) {
+            log.error("获取历史向量失败,channelContentId={}, error={}", channelContentId, e.getMessage(), e);
+            return null;
+        }
+    }
+
+    /**
+     * 通过 text_hash 查询缓存向量
+     */
+    private List<Float> getVectorByTextHash(String textHash, String configCode) {
+        try {
+            ContentVector cached;
+            if (StringUtils.hasText(configCode)) {
+                cached = contentVectorMapper.selectByTextHashAndConfigCode(textHash, configCode);
+            } else {
+                cached = contentVectorMapper.selectByTextHash(textHash);
+            }
+            if (cached != null && StringUtils.hasText(cached.getEmbedding())) {
+                return VectorUtils.parseVectorString(cached.getEmbedding());
+            }
+        } catch (Exception e) {
+            log.error("text_hash 查询向量失败,hash={}, error={}", textHash, e.getMessage(), e);
+        }
+        return null;
+    }
+
+    /**
+     * 若内容已解构完成但向量表缺少对应 configCode 的向量,则触发向量化
+     */
+    private void triggerVectorizeIfNeeded(String channelContentId, String configCode) {
+        try {
+            DeconstructContent content = getDeconstructContentByChannelContentId(channelContentId);
+            if (content == null || content.getStatus() == null || content.getStatus() != 2) {
+                return;
+            }
+            List<ContentVector> vectors =
+                    vectorizeService.getVectorsByContentId(content.getId(), configCode);
+            if (vectors != null && !vectors.isEmpty()) {
+                return;
+            }
+            log.info("素材已解构但缺少向量,触发向量化,channelContentId={}, contentId={}, configCode={}",
+                    channelContentId, content.getId(), configCode);
+            vectorizeService.vectorizeContent(content);
+        } catch (Exception e) {
+            log.error("触发向量化失败,channelContentId={}, error={}", channelContentId, e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 根据 channelContentId 查询解构内容
+     */
+    private DeconstructContent getDeconstructContentByChannelContentId(String channelContentId) {
+        DeconstructContentExample example = new DeconstructContentExample();
+        example.createCriteria().andChannelContentIdEqualTo(channelContentId);
+        example.setOrderByClause("id DESC");
+        List<DeconstructContent> list = deconstructContentMapper.selectByExample(example);
+        return list.isEmpty() ? null : list.get(0);
+    }
+
+    /**
+     * 批量查询 deconstruct_content
+     */
+    private Map<Long, DeconstructContent> batchGetDeconstructContent(Set<Long> contentIds) {
+        Map<Long, DeconstructContent> map = new HashMap<>();
+        if (CollectionUtils.isEmpty(contentIds)) {
+            return map;
+        }
+        try {
+            DeconstructContentExample example = new DeconstructContentExample();
+            example.createCriteria().andIdIn(new ArrayList<>(contentIds));
+            List<DeconstructContent> list = deconstructContentMapper.selectByExample(example);
+            if (list != null) {
+                for (DeconstructContent dc : list) {
+                    map.put(dc.getId(), dc);
+                }
+            }
+        } catch (Exception e) {
+            log.error("批量查询 deconstruct_content 失败,ids={}, error={}",
+                    contentIds, e.getMessage(), e);
+        }
+        return map;
+    }
+
+    /**
+     * 获取所有启用的向量化配置
+     */
+    private List<DeconstructVectorConfig> getEnabledConfigs() {
+        try {
+            DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
+            example.createCriteria().andEnabledEqualTo((short) 1);
+            example.setOrderByClause("priority ASC");
+            List<DeconstructVectorConfig> configs = deconstructVectorConfigMapper.selectByExample(example);
+            return configs != null ? configs : Collections.emptyList();
+        } catch (Exception e) {
+            log.error("查询启用配置失败: {}", e.getMessage(), e);
+            return Collections.emptyList();
+        }
+    }
+
+    /**
+     * 根据 configCode 查询向量配置
+     */
+    private DeconstructVectorConfig getVectorConfigByCode(String configCode) {
+        if (!StringUtils.hasText(configCode)) {
+            return null;
+        }
+        try {
+            DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
+            example.createCriteria().andConfigCodeEqualTo(configCode);
+            List<DeconstructVectorConfig> configs = deconstructVectorConfigMapper.selectByExample(example);
+            return (configs != null && !configs.isEmpty()) ? configs.get(0) : null;
+        } catch (Exception e) {
+            log.error("查询向量配置失败,configCode={}, error={}", configCode, e.getMessage(), e);
+            return null;
+        }
+    }
+}

+ 1 - 84
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -27,7 +27,6 @@ import org.springframework.util.StringUtils;
 
 import javax.annotation.Resource;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 import static com.tzld.videoVector.common.constant.VectorConstants.*;
@@ -51,28 +50,12 @@ public class VideoSearchServiceImpl implements VideoSearchService {
     @Resource
     private ContentVectorMapperExt pgContentVectorMapper;
 
-    @Resource
-    private VectorizeService vectorizeService;
-
     @Resource
     private VideoApiService videoApiService;
 
     @Resource
     private DeconstructVectorConfigMapper deconstructVectorConfigMapper;
 
-    // 异步向量化线程池,避免裸创建 Thread
-    private static final ExecutorService VECTORIZE_EXECUTOR = new ThreadPoolExecutor(
-            2, 10, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(100),
-            r -> {
-                Thread t = new Thread(r);
-                t.setName("vectorize-async-" + t.getId());
-                t.setDaemon(true);
-                return t;
-            },
-            new ThreadPoolExecutor.CallerRunsPolicy()
-    );
-
     @Override
     public String deconstruct(DeconstructParam param) {
         if (param == null) {
@@ -226,13 +209,6 @@ public class VideoSearchServiceImpl implements VideoSearchService {
             content = updateContentFromResult(content, result, taskId, param);
         }
 
-        // 如果任务完成且成功,触发向量化处理
-        // vectorizeContent 内部已按 configCode 做幂等检查,新增配置也会被自动补充向量化
-        if (content != null && content.getStatus() != null && content.getStatus() == 2) {
-            vectorizeContentAsync(content);
-            log.info("触发向量化处理,contentId={}, taskId={}", content.getId(), taskId);
-        }
-
         // 构建返回结果
         if (result != null) {
             return buildResultFromApiResult(result);
@@ -397,61 +373,7 @@ public class VideoSearchServiceImpl implements VideoSearchService {
     }
 
     /**
-     * 若内容已解构完成(status==2)但向量表缺少对应 configCode 的向量,则同步触发向量化
-     */
-    private void triggerVectorizeIfNeeded(String channelContentId, String configCode) {
-        try {
-            // 查询解构内容(含 BLOB 字段,vectorizeContent 需要 resultJson/bodyText 等)
-            DeconstructContentExample example = new DeconstructContentExample();
-            example.createCriteria().andChannelContentIdEqualTo(channelContentId);
-            example.setOrderByClause("id DESC");
-            List<DeconstructContent> list = deconstructContentMapper.selectByExample(example);
-            if (list == null || list.isEmpty()) {
-                log.debug("triggerVectorizeIfNeeded: 未找到 channelContentId={} 的解构记录", channelContentId);
-                return;
-            }
-            DeconstructContent content = list.get(0);
-            // 仅对已解构完成(status == 2)的内容触发向量化
-            if (content.getStatus() == null || content.getStatus() != 2) {
-                log.debug("triggerVectorizeIfNeeded: channelContentId={} 解构未完成,status={}",
-                        channelContentId, content.getStatus());
-                return;
-            }
-            // 检查向量表是否已有该 configCode 的记录
-            List<ContentVector> vectors =
-                    vectorizeService.getVectorsByContentId(content.getId(), configCode);
-            if (vectors != null && !vectors.isEmpty()) {
-                log.debug("triggerVectorizeIfNeeded: channelContentId={} 已有 configCode={} 向量,无需触发",
-                        channelContentId, configCode);
-                return;
-            }
-            log.info("内容已解构完成但向量表无对应 configCode 向量,触发向量化,channelContentId={}, contentId={}, configCode={}",
-                    channelContentId, content.getId(), configCode);
-            vectorizeService.vectorizeContent(content);
-        } catch (Exception e) {
-            log.error("检查并触发向量化失败,channelContentId={}, configCode={}, error={}",
-                    channelContentId, configCode, e.getMessage(), e);
-        }
-    }
-
-    /**
-     * 异步向量化内容,使用线程池执行,避免阻塞接口响应
-     */
-    private void vectorizeContentAsync(DeconstructContent content) {
-        VECTORIZE_EXECUTOR.submit(() -> {
-            try {
-                log.info("开始异步向量化处理,contentId={}", content.getId());
-                vectorizeService.vectorizeContent(content);
-                log.info("异步向量化处理完成,contentId={}", content.getId());
-            } catch (Exception e) {
-                log.error("异步向量化处理失败,contentId={}, error={}",
-                        content.getId(), e.getMessage(), e);
-            }
-        });
-    }
-
-    /**
-     * 异步向量化内容,使用线程池执行,避免阻塞接口响应
+     * 匹配TopN视频
      */
     @Override
     public List<VideoMatchResult> matchTopNVideo(MatchTopNVideoParam param) {
@@ -466,11 +388,6 @@ public class VideoSearchServiceImpl implements VideoSearchService {
             configCode = DEFAULT_CONFIG_CODE;
         }
 
-        // 若提供了 channelContentId,且内容已解构完成但向量表缺少对应 configCode 的向量,则异步触发向量化
-        if (StringUtils.hasText(param.getChannelContentId()) && !VectorConstants.ALL_CONFIG_CODE.equalsIgnoreCase(configCode)) {
-            triggerVectorizeIfNeeded(param.getChannelContentId(), configCode);
-        }
-
         // 确定要搜索的配置列表
         List<DeconstructVectorConfig> searchConfigs;
         if (VectorConstants.ALL_CONFIG_CODE.equalsIgnoreCase(configCode)) {

+ 2 - 1
core/src/main/resources/mapper/pgVector/ext/ContentVectorMapperExt.xml

@@ -25,6 +25,7 @@
     <result column="id" jdbcType="BIGINT" property="id" />
     <result column="content_id" jdbcType="BIGINT" property="contentId" />
     <result column="config_code" jdbcType="VARCHAR" property="configCode" />
+    <result column="source_text" jdbcType="VARCHAR" property="sourceText" />
     <result column="score" jdbcType="DOUBLE" property="score" />
   </resultMap>
 
@@ -98,7 +99,7 @@
 
   <!-- 余弦相似度搜索 Top-N -->
   <select id="searchTopNByCosine" resultMap="SearchResultMap">
-    SELECT id, content_id, config_code, 1 - (embedding &lt;=&gt; #{queryVector}::vector) AS score
+    SELECT id, content_id, config_code, source_text, 1 - (embedding &lt;=&gt; #{queryVector}::vector) AS score
     FROM content_vectors
     WHERE config_code = #{configCode}
     ORDER BY embedding &lt;=&gt; #{queryVector}::vector

+ 41 - 0
server/src/main/java/com/tzld/videoVector/controller/MaterialController.java

@@ -0,0 +1,41 @@
+package com.tzld.videoVector.controller;
+
+import com.tzld.videoVector.common.base.CommonResponse;
+import com.tzld.videoVector.model.param.MaterialMatchParam;
+import com.tzld.videoVector.model.param.MaterialSubmitParam;
+import com.tzld.videoVector.model.vo.MaterialMatchResult;
+import com.tzld.videoVector.service.MaterialSearchService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * 素材向量化搜索接口
+ */
+@RestController
+@RequestMapping("/material")
+public class MaterialController {
+
+    @Autowired
+    private MaterialSearchService materialSearchService;
+
+    /**
+     * 素材入库(解构 + 异步向量化)
+     */
+    @PostMapping("/submit")
+    public CommonResponse<String> submitMaterial(@RequestBody MaterialSubmitParam param) {
+        return CommonResponse.success(materialSearchService.submitMaterial(param));
+    }
+
+    /**
+     * 素材相似搜索 Top-N
+     */
+    @PostMapping("/matchTopN")
+    public CommonResponse<List<MaterialMatchResult>> matchTopN(@RequestBody MaterialMatchParam param) {
+        return CommonResponse.success(materialSearchService.matchTopNMaterial(param));
+    }
+}