Sfoglia il codice sorgente

增加视频标题向量

wangyunpeng 8 ore fa
parent
commit
e63c48731a

+ 3 - 0
core/src/main/java/com/tzld/videoVector/common/constant/VectorConstants.java

@@ -10,6 +10,9 @@ public interface VectorConstants {
     /** 默认配置编码(选题) */
     String DEFAULT_CONFIG_CODE = "VIDEO_TOPIC";
 
+    /** 视频标题配置编码 */
+    String VIDEO_TITLE_CONFIG_CODE = "VIDEO_TITLE";
+
     /** configCode 传 "ALL" 表示搜索所有启用的向量化配置 */
     String ALL_CONFIG_CODE = "ALL";
 

+ 5 - 0
core/src/main/java/com/tzld/videoVector/dao/mapper/pgVector/ext/VideoVectorMapperExt.java

@@ -114,4 +114,9 @@ public interface VideoVectorMapperExt {
      * 查询所有不重复的 video_id(跨所有 configCode)
      */
     List<Long> selectAllDistinctVideoIds();
+
+    /**
+     * 查询所有不重复的 video_id,排除指定 configCode
+     */
+    List<Long> selectDistinctVideoIdsExcludeConfigCode(@Param("excludeConfigCode") String excludeConfigCode);
 }

+ 239 - 0
core/src/main/java/com/tzld/videoVector/job/VideoTitleVectorJob.java

@@ -0,0 +1,239 @@
+package com.tzld.videoVector.job;
+
+import com.tzld.videoVector.api.VideoApiService;
+import com.tzld.videoVector.common.constant.VectorConstants;
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
+import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoVectorMapperExt;
+import com.tzld.videoVector.model.entity.VideoDetail;
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
+import com.tzld.videoVector.service.EmbeddingService;
+import com.tzld.videoVector.service.VectorStoreService;
+import com.tzld.videoVector.util.Md5Util;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * 视频标题向量化定时任务
+ * <p>
+ * 1. 查询 video_vectors 表中排除 VIDEO_TITLE 本身的所有不重复 video_id(即其他 configCode 下的视频)
+ * 2. 对比已有 VIDEO_TITLE 向量的 video_id,删除不再存在于其他 configCode 中的历史标题向量
+ * 3. 对未向量化的 video_id 批量获取标题并向量化存储
+ */
+@Slf4j
+@Component
+public class VideoTitleVectorJob {
+
+    @Resource
+    private VideoVectorMapperExt videoVectorMapperExt;
+
+    @Resource
+    private DeconstructVectorConfigMapper vectorConfigMapper;
+
+    @Resource
+    private VideoApiService videoApiService;
+
+    @Resource
+    private EmbeddingService embeddingService;
+
+    @Resource
+    private VectorStoreService vectorStoreService;
+
+    /** 每批获取视频详情的数量 */
+    private static final int DETAIL_BATCH_SIZE = 100;
+
+    /**
+     * 视频标题向量化任务
+     * 读取 video_vectors 中所有 video_id,获取视频标题,进行向量化写入
+     *
+     * @param param 参数(暂未使用)
+     * @return 执行结果
+     */
+    @XxlJob("videoTitleVectorJob")
+    public ReturnT<String> videoTitleVectorJob(String param) {
+        log.info("开始执行视频标题向量化任务, param: {}", param);
+
+        try {
+            // 1. 获取 VIDEO_TITLE 配置
+            DeconstructVectorConfig config = getVideoTitleConfig();
+            if (config == null) {
+                log.error("未找到 VIDEO_TITLE 向量化配置,任务终止");
+                return new ReturnT<>(ReturnT.FAIL_CODE, "未找到 VIDEO_TITLE 配置");
+            }
+            log.info("加载 VIDEO_TITLE 配置成功: model={}, dimension={}, maxLength={}",
+                    config.getEmbeddingModel(), config.getDimension(), config.getMaxLength());
+
+            // 2. 查询其他 configCode 下的所有 video_id(排除 VIDEO_TITLE 自身)
+            List<Long> otherCodeVideoIds = videoVectorMapperExt.selectDistinctVideoIdsExcludeConfigCode(
+                    VectorConstants.VIDEO_TITLE_CONFIG_CODE);
+            if (CollectionUtils.isEmpty(otherCodeVideoIds)) {
+                log.info("其他 configCode 下无视频数据,跳过");
+                return ReturnT.SUCCESS;
+            }
+            Set<Long> otherCodeVideoIdSet = new HashSet<>(otherCodeVideoIds);
+            log.info("其他 configCode 下的 video_id 数量: {}", otherCodeVideoIdSet.size());
+
+            // 3. 查询已有 VIDEO_TITLE 向量的 video_id
+            Set<Long> existingTitleIds = getExistingVideoTitleIds();
+            log.info("已有 VIDEO_TITLE 向量的 video_id 数量: {}", existingTitleIds.size());
+
+            // 4. 清理:删除已有 VIDEO_TITLE 向量但不再存在于其他 configCode 中的记录
+            List<Long> toDeleteIds = existingTitleIds.stream()
+                    .filter(id -> !otherCodeVideoIdSet.contains(id))
+                    .collect(Collectors.toList());
+            if (!toDeleteIds.isEmpty()) {
+                log.info("需要清理的 VIDEO_TITLE 向量数量: {}", toDeleteIds.size());
+                // 分批删除
+                for (int i = 0; i < toDeleteIds.size(); i += 1000) {
+                    int end = Math.min(i + 1000, toDeleteIds.size());
+                    List<Long> batch = toDeleteIds.subList(i, end);
+                    videoVectorMapperExt.deleteBatchByVideoIdsAndConfigCode(batch, VectorConstants.VIDEO_TITLE_CONFIG_CODE);
+                }
+                log.info("清理完成,删除 {} 条 VIDEO_TITLE 向量", toDeleteIds.size());
+            }
+
+            // 5. 过滤出需要新增向量化的 video_id(在其他 code 中存在但无 VIDEO_TITLE 向量)
+            List<Long> needProcessIds = otherCodeVideoIds.stream()
+                    .filter(id -> !existingTitleIds.contains(id))
+                    .collect(Collectors.toList());
+            if (needProcessIds.isEmpty()) {
+                log.info("所有 video_id 已完成标题向量化,无需处理");
+                return ReturnT.SUCCESS;
+            }
+            log.info("需要新增标题向量化的 video_id 数量: {}", needProcessIds.size());
+
+            // 6. 分批获取视频详情并向量化
+            AtomicInteger totalSuccess = new AtomicInteger(0);
+            AtomicInteger totalFail = new AtomicInteger(0);
+            AtomicInteger totalSkip = new AtomicInteger(0);
+
+            for (int i = 0; i < needProcessIds.size(); i += DETAIL_BATCH_SIZE) {
+                int end = Math.min(i + DETAIL_BATCH_SIZE, needProcessIds.size());
+                List<Long> batchIds = needProcessIds.subList(i, end);
+
+                try {
+                    processBatch(batchIds, config, totalSuccess, totalFail, totalSkip);
+                } catch (Exception e) {
+                    log.error("批次 {}-{} 处理异常: {}", i, end, e.getMessage(), e);
+                    totalFail.addAndGet(batchIds.size());
+                }
+
+                if ((i / DETAIL_BATCH_SIZE + 1) % 10 == 0) {
+                    log.info("进度: 已处理 {}/{}, 成功: {}, 失败: {}, 跳过: {}",
+                            end, needProcessIds.size(), totalSuccess.get(), totalFail.get(), totalSkip.get());
+                }
+            }
+
+            log.info("视频标题向量化任务完成,新增成功: {}, 失败: {}, 跳过: {}, 清理: {}",
+                    totalSuccess.get(), totalFail.get(), totalSkip.get(), toDeleteIds.size());
+            return ReturnT.SUCCESS;
+
+        } catch (Exception e) {
+            log.error("视频标题向量化任务执行失败: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 处理一批视频的标题向量化
+     */
+    private void processBatch(List<Long> videoIds, DeconstructVectorConfig config,
+                              AtomicInteger totalSuccess, AtomicInteger totalFail, AtomicInteger totalSkip) {
+        // 批量获取视频详情
+        Map<Long, VideoDetail> detailMap = videoApiService.getVideoDetail(new HashSet<>(videoIds));
+        if (detailMap == null || detailMap.isEmpty()) {
+            log.warn("批量获取视频详情返回空,videoIds数量: {}", videoIds.size());
+            totalSkip.addAndGet(videoIds.size());
+            return;
+        }
+
+        String configCode = config.getConfigCode();
+        Integer maxLength = config.getMaxLength();
+
+        for (Long videoId : videoIds) {
+            try {
+                VideoDetail detail = detailMap.get(videoId);
+                if (detail == null || !StringUtils.hasText(detail.getTitle())) {
+                    log.debug("videoId={} 无视频详情或标题为空,跳过", videoId);
+                    totalSkip.incrementAndGet();
+                    continue;
+                }
+
+                // 审核过滤:未通过审核的视频跳过
+                if (!detail.isAuditPassed()) {
+                    log.debug("videoId={} 审核未通过,跳过", videoId);
+                    totalSkip.incrementAndGet();
+                    continue;
+                }
+
+                String title = detail.getTitle();
+                // 截断标题
+                if (maxLength != null && maxLength > 0 && title.length() > maxLength) {
+                    title = title.substring(0, maxLength);
+                }
+
+                // 优先通过 text_hash 复用已有 embedding
+                List<Float> vector = getOrEmbed(title, config);
+                if (vector == null || vector.isEmpty()) {
+                    log.warn("videoId={} 标题向量化失败,title={}", videoId, title);
+                    totalFail.incrementAndGet();
+                    continue;
+                }
+
+                // 存储向量
+                vectorStoreService.save(configCode, videoId, vector, title);
+                totalSuccess.incrementAndGet();
+                log.debug("videoId={} 标题向量化存储成功", videoId);
+
+            } catch (Exception e) {
+                log.error("videoId={} 标题向量化异常: {}", videoId, e.getMessage(), e);
+                totalFail.incrementAndGet();
+            }
+        }
+    }
+
+    /**
+     * 优先通过 text_hash 复用已有 embedding,未命中则调用 embedding API
+     */
+    private List<Float> getOrEmbed(String text, DeconstructVectorConfig config) {
+        String configCode = config.getConfigCode();
+        String textHash = Md5Util.encoderByMd5(text);
+        if (StringUtils.hasText(textHash)) {
+            List<Float> cached = vectorStoreService.getVectorByTextHash(textHash, configCode);
+            if (cached != null && !cached.isEmpty()) {
+                log.debug("命中 text_hash 缓存,复用 embedding,hash={}", textHash);
+                return cached;
+            }
+        }
+        return embeddingService.embed(text, config);
+    }
+
+    /**
+     * 获取 VIDEO_TITLE 向量化配置
+     */
+    private DeconstructVectorConfig getVideoTitleConfig() {
+        DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
+        example.createCriteria()
+                .andEnabledEqualTo((short) 1)
+                .andConfigCodeEqualTo(VectorConstants.VIDEO_TITLE_CONFIG_CODE);
+        List<DeconstructVectorConfig> configs = vectorConfigMapper.selectByExample(example);
+        return CollectionUtils.isEmpty(configs) ? null : configs.get(0);
+    }
+
+    /**
+     * 查询已有 VIDEO_TITLE 向量的 video_id 集合
+     */
+    private Set<Long> getExistingVideoTitleIds() {
+        List<Long> ids = videoVectorMapperExt.selectAllVideoIdsByConfigCode(VectorConstants.VIDEO_TITLE_CONFIG_CODE);
+        return ids != null ? new HashSet<>(ids) : Collections.emptySet();
+    }
+}

+ 18 - 0
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -60,6 +60,9 @@ public class VideoVectorJob {
     @Resource
     private AigcApiService aigcApiService;
 
+    @Resource
+    private VideoTitleVectorJob videoTitleVectorJob;
+
 
     @ApolloJsonValue("${aigc.deconstruct.task.ids:[46, 57, 58]}")
     private List<Integer> aigcDeconstructTaskIds;
@@ -1213,6 +1216,21 @@ public class VideoVectorJob {
             log.error("子任务 resultLogVideoVectorJob 执行异常: {}", e.getMessage(), e);
         }
 
+        // 4. 执行 videoTitleVectorJob
+        try {
+            log.info("===== 开始执行子任务: videoTitleVectorJob =====");
+            ReturnT<String> result = videoTitleVectorJob.videoTitleVectorJob(param);
+            if (result.getCode() != ReturnT.SUCCESS_CODE) {
+                hasFailure = true;
+                failMessages.append("videoTitleVectorJob失败: ").append(result.getMsg()).append("; ");
+            }
+            log.info("===== 子任务 videoTitleVectorJob 执行完成, code={} =====", result.getCode());
+        } catch (Exception e) {
+            hasFailure = true;
+            failMessages.append("videoTitleVectorJob异常: ").append(e.getMessage()).append("; ");
+            log.error("子任务 videoTitleVectorJob 执行异常: {}", e.getMessage(), e);
+        }
+
         if (hasFailure) {
             log.warn("聚合视频向量化任务完成,部分子任务失败: {}", failMessages);
             return new ReturnT<>(ReturnT.FAIL_CODE, "部分子任务失败: " + failMessages);

+ 39 - 0
core/src/main/java/com/tzld/videoVector/job/VideoVectorTextBackfillJob.java

@@ -6,8 +6,10 @@ import com.alibaba.fastjson.JSONObject;
 import com.aliyun.odps.data.Record;
 import com.google.common.collect.Lists;
 import com.tzld.videoVector.api.AigcApiService;
+import com.tzld.videoVector.api.VideoApiService;
 import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
 import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoVectorMapperExt;
+import com.tzld.videoVector.model.entity.VideoDetail;
 import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
 import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
 import com.tzld.videoVector.model.po.pgVector.VideoVector;
@@ -47,6 +49,9 @@ public class VideoVectorTextBackfillJob {
     @Resource
     private AigcApiService aigcApiService;
 
+    @Resource
+    private VideoApiService videoApiService;
+
     /** 每批处理数量 */
     private static final int BATCH_SIZE = 500;
 
@@ -170,6 +175,9 @@ public class VideoVectorTextBackfillJob {
             case "aigc_deconstruct":
                 rawDataMap = batchQueryAigcData(videoIds);
                 break;
+            case "video_title":
+                rawDataMap = batchQueryVideoTitles(videoIds);
+                break;
             default:
                 log.warn("不支持的 sourceField: {}", sourceField);
                 return new int[]{0, vectors.size(), 0};
@@ -248,6 +256,16 @@ public class VideoVectorTextBackfillJob {
     private List<String> extractTexts(String rawData, DeconstructVectorConfig config) {
         List<String> texts = new ArrayList<>();
         try {
+            String sourceField = config.getSourceField();
+
+            // video_title 来源:原始数据就是标题文本,无需 JSON 解析
+            if ("video_title".equals(sourceField)) {
+                if (StringUtils.hasText(rawData)) {
+                    texts.add(rawData);
+                }
+                return texts;
+            }
+
             JSONObject json = JSON.parseObject(rawData);
             if (json == null) {
                 return texts;
@@ -475,6 +493,27 @@ public class VideoVectorTextBackfillJob {
         return result;
     }
 
+    /**
+     * 批量查询视频标题
+     * 通过视频详情 API 获取标题
+     */
+    private Map<Long, String> batchQueryVideoTitles(List<Long> videoIds) {
+        Map<Long, String> result = new HashMap<>();
+        try {
+            Map<Long, VideoDetail> detailMap = videoApiService.getVideoDetail(new HashSet<>(videoIds));
+            if (detailMap != null) {
+                for (Map.Entry<Long, VideoDetail> entry : detailMap.entrySet()) {
+                    if (entry.getValue() != null && StringUtils.hasText(entry.getValue().getTitle())) {
+                        result.put(entry.getKey(), entry.getValue().getTitle());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("批量查询视频标题失败: {}", e.getMessage());
+        }
+        return result;
+    }
+
     /**
      * 获取所有启用的向量化配置
      */

+ 6 - 0
core/src/main/resources/mapper/pgVector/ext/VideoVectorMapperExt.xml

@@ -118,6 +118,12 @@
     SELECT DISTINCT video_id FROM video_vectors
   </select>
 
+  <!-- 查询所有不重复的 video_id,排除指定 configCode -->
+  <select id="selectDistinctVideoIdsExcludeConfigCode" resultType="java.lang.Long">
+    SELECT DISTINCT video_id FROM video_vectors
+    WHERE config_code != #{excludeConfigCode}
+  </select>
+
   <!-- 根据 text_hash + configCode 查询已有的 embedding,用于复用向量化结果 -->
   <select id="selectEmbeddingByTextHash" resultType="java.lang.String">
     SELECT embedding::text FROM video_vectors

+ 11 - 0
server/src/main/java/com/tzld/videoVector/controller/XxlJobController.java

@@ -26,6 +26,9 @@ public class XxlJobController {
     @Autowired
     private VideoDetailSyncJob videoDetailSyncJob;
 
+    @Autowired
+    private VideoTitleVectorJob videoTitleVectorJob;
+
     // ==================== 视频向量化任务 ====================
 
     @GetMapping("/vectorVideoJob")
@@ -86,4 +89,12 @@ public class XxlJobController {
         return CommonResponse.success();
     }
 
+    // ==================== 视频标题向量化任务 ====================
+
+    @GetMapping("/videoTitleVectorJob")
+    public CommonResponse<Void> videoTitleVectorJob() {
+        videoTitleVectorJob.videoTitleVectorJob(null);
+        return CommonResponse.success();
+    }
+
 }