Преглед изворни кода

视频解构向量化 执行重复判断

wangyunpeng пре 10 часа
родитељ
комит
4f0da13c31

+ 17 - 1
core/src/main/java/com/tzld/videoVector/dao/mapper/pgVector/ext/VideoVectorMapperExt.java

@@ -17,12 +17,14 @@ public interface VideoVectorMapperExt {
      * @param pointIndex 向量点索引(单点模式传0)
      * @param embedding  向量字符串,格式: "[0.1,0.2,...]"
      * @param text       向量化的原始文本内容
+     * @param textHash   文本内容的MD5哈希值
      */
     int upsertVector(@Param("videoId") Long videoId,
                      @Param("configCode") String configCode,
                      @Param("pointIndex") int pointIndex,
                      @Param("embedding") String embedding,
-                     @Param("text") String text);
+                     @Param("text") String text,
+                     @Param("textHash") String textHash);
 
     /**
      * 判断指定 videoId + configCode 的向量是否存在(任意 pointIndex)
@@ -94,6 +96,20 @@ public interface VideoVectorMapperExt {
      */
     int updateTextById(@Param("id") Long id, @Param("text") String text);
 
+    /**
+     * 根据 text_hash 和 configCode 查询已存在的 embedding(取第一条匹配记录)
+     * 用于相同文本复用已有的向量化结果,避免重复调用 embedding API
+     */
+    String selectEmbeddingByTextHash(@Param("textHash") String textHash,
+                                     @Param("configCode") String configCode);
+
+    /**
+     * 批量查询指定 configCode 下 text 为空的 videoId(去重)
+     * 用于检测已有向量但 text 缺失的记录,便于删除后重新向量化
+     */
+    List<Long> selectVideoIdsWithEmptyText(@Param("videoIds") List<Long> videoIds,
+                                            @Param("configCode") String configCode);
+
     /**
      * 查询所有不重复的 video_id(跨所有 configCode)
      */

+ 1 - 1
core/src/main/java/com/tzld/videoVector/job/DataMigrationJob.java

@@ -301,7 +301,7 @@ public class DataMigrationJob {
 
                             // Redis 中存储的是归一化后的 JSON 数组 "[0.1,0.2,...]"
                             // pgvector 接受此格式
-                            videoVectorMapperExt.upsertVector(videoId, configCode, 0, vectorJson, null);
+                            videoVectorMapperExt.upsertVector(videoId, configCode, 0, vectorJson, null, null);
                             totalSuccess++;
                         } catch (NumberFormatException e) {
                             log.warn("非法 videoId: {}", idStr);

+ 60 - 2
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -18,6 +18,7 @@ import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
 import com.tzld.videoVector.service.DeconstructService;
 import com.tzld.videoVector.service.EmbeddingService;
 import com.tzld.videoVector.service.VectorStoreService;
+import com.tzld.videoVector.util.Md5Util;
 import com.tzld.videoVector.util.OdpsUtil;
 import com.tzld.videoVector.util.VectorUtils;
 import com.xxl.job.core.biz.model.ReturnT;
@@ -123,6 +124,17 @@ public class VideoVectorJob {
                         try {
                             // 4.1 查询哪些 videoId 在该配置下已有向量(数据库层已做 DISTINCT video_id)
                             Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+
+                            // 4.1.1 检查已存在记录中 text 为空的,删除后重新向量化
+                            if (!existingVideoIds.isEmpty()) {
+                                Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
+                                if (!emptyTextIds.isEmpty()) {
+                                    log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
+                                    vectorStoreService.deleteBatch(configCode, emptyTextIds);
+                                    existingVideoIds.removeAll(emptyTextIds);
+                                }
+                            }
+
                             // 4.2 过滤出需要处理的 videoId(排除已有向量的)
                             List<Long> needProcessIds = auditPassedIds.stream()
                                     .filter(id -> !existingVideoIds.contains(id))
@@ -394,7 +406,8 @@ public class VideoVectorJob {
                 if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
                     text = text.substring(0, maxLength);
                 }
-                List<Float> vector = embeddingService.embed(text, config);
+                // 优先通过 text_hash 复用已有 embedding,避免重复调用 API
+                List<Float> vector = getOrEmbed(text, config);
                 if (vector == null || vector.isEmpty()) {
                     log.warn("videoId={} 配置 {} 第{}个文本向量化失败", videoId, configCode, i);
                     continue;
@@ -420,7 +433,8 @@ public class VideoVectorJob {
             if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
                 text = text.substring(0, maxLength);
             }
-            List<Float> vector = embeddingService.embed(text, config);
+            // 优先通过 text_hash 复用已有 embedding,避免重复调用 API
+            List<Float> vector = getOrEmbed(text, config);
             if (vector == null || vector.isEmpty()) {
                 log.warn("videoId={} 配置 {} 文本向量化失败", videoId, configCode);
                 return 0;
@@ -431,6 +445,28 @@ public class VideoVectorJob {
         }
     }
 
+    /**
+     * 优先通过 text_hash 复用已有 embedding,未命中则调用 embedding API
+     *
+     * @param text   文本内容
+     * @param config 向量化配置
+     * @return 向量数据
+     */
+    private List<Float> getOrEmbed(String text, DeconstructVectorConfig config) {
+        String configCode = config.getConfigCode();
+        // 计算 text_hash,查询是否已有相同文本的 embedding
+        String textHash = Md5Util.encoderByMd5(text);
+        if (StringUtils.hasText(textHash)) {
+            List<Float> cached = vectorStoreService.getVectorByTextHash(textHash, configCode);
+            if (cached != null && !cached.isEmpty()) {
+                log.debug("命中 text_hash 缓存,复用 embedding,hash={}, configCode={}", textHash, configCode);
+                return cached;
+            }
+        }
+        // 未命中缓存,调用 embedding API
+        return embeddingService.embed(text, config);
+    }
+
     /**
      * 分页查询 videoId 列表
      * @param pageNum 页码(从0开始)
@@ -526,6 +562,17 @@ public class VideoVectorJob {
                     try {
                         // 5.1 查询该配置下已有向量的 videoId,排除已处理过的(数据库层已做 DISTINCT video_id)
                         Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+
+                        // 5.1.1 检查已存在记录中 text 为空的,删除后重新向量化
+                        if (!existingVideoIds.isEmpty()) {
+                            Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
+                            if (!emptyTextIds.isEmpty()) {
+                                log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
+                                vectorStoreService.deleteBatch(configCode, emptyTextIds);
+                                existingVideoIds.removeAll(emptyTextIds);
+                            }
+                        }
+
                         List<Long> needProcessIds = auditPassedIds.stream()
                                 .filter(id -> !existingVideoIds.contains(id))
                                 .collect(Collectors.toList());
@@ -904,6 +951,17 @@ public class VideoVectorJob {
                         try {
                             // 5.1 已向量化过滤
                             Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, auditPassedIds);
+
+                            // 5.1.1 检查已存在记录中 text 为空的,删除后重新向量化
+                            if (!existingVideoIds.isEmpty()) {
+                                Set<Long> emptyTextIds = vectorStoreService.findVideoIdsWithEmptyText(configCode, existingVideoIds);
+                                if (!emptyTextIds.isEmpty()) {
+                                    log.info("配置 {} 下发现 {} 个 text 为空的记录,删除后重新向量化: {}", configCode, emptyTextIds.size(), emptyTextIds);
+                                    vectorStoreService.deleteBatch(configCode, emptyTextIds);
+                                    existingVideoIds.removeAll(emptyTextIds);
+                                }
+                            }
+
                             List<Long> needProcessIds = auditPassedIds.stream()
                                     .filter(id -> !existingVideoIds.contains(id))
                                     .collect(Collectors.toList());

+ 34 - 0
core/src/main/java/com/tzld/videoVector/model/po/pgVector/VideoVector.java

@@ -98,6 +98,15 @@ public class VideoVector {
      */
     private String text;
 
+    /**
+     *
+     * This field was generated by MyBatis Generator.
+     * This field corresponds to the database column video_vectors.text_hash
+     *
+     * @mbg.generated
+     */
+    private String textHash;
+
     /**
      * This method was generated by MyBatis Generator.
      * This method returns the value of the database column video_vectors.id
@@ -290,6 +299,30 @@ public class VideoVector {
         this.text = text;
     }
 
+    /**
+     * This method was generated by MyBatis Generator.
+     * This method returns the value of the database column video_vectors.text_hash
+     *
+     * @return the value of video_vectors.text_hash
+     *
+     * @mbg.generated
+     */
+    public String getTextHash() {
+        return textHash;
+    }
+
+    /**
+     * This method was generated by MyBatis Generator.
+     * This method sets the value of the database column video_vectors.text_hash
+     *
+     * @param textHash the value for video_vectors.text_hash
+     *
+     * @mbg.generated
+     */
+    public void setTextHash(String textHash) {
+        this.textHash = textHash;
+    }
+
     /**
      * This method was generated by MyBatis Generator.
      * This method corresponds to the database table video_vectors
@@ -310,6 +343,7 @@ public class VideoVector {
         sb.append(", updatedAt=").append(updatedAt);
         sb.append(", pointIndex=").append(pointIndex);
         sb.append(", text=").append(text);
+        sb.append(", textHash=").append(textHash);
         sb.append("]");
         return sb.toString();
     }

+ 70 - 0
core/src/main/java/com/tzld/videoVector/model/po/pgVector/VideoVectorExample.java

@@ -694,6 +694,76 @@ public class VideoVectorExample {
             addCriterion("\"text\" not between", value1, value2, "text");
             return (Criteria) this;
         }
+
+        public Criteria andTextHashIsNull() {
+            addCriterion("text_hash is null");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashIsNotNull() {
+            addCriterion("text_hash is not null");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashEqualTo(String value) {
+            addCriterion("text_hash =", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashNotEqualTo(String value) {
+            addCriterion("text_hash <>", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashGreaterThan(String value) {
+            addCriterion("text_hash >", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashGreaterThanOrEqualTo(String value) {
+            addCriterion("text_hash >=", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashLessThan(String value) {
+            addCriterion("text_hash <", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashLessThanOrEqualTo(String value) {
+            addCriterion("text_hash <=", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashLike(String value) {
+            addCriterion("text_hash like", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashNotLike(String value) {
+            addCriterion("text_hash not like", value, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashIn(List<String> values) {
+            addCriterion("text_hash in", values, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashNotIn(List<String> values) {
+            addCriterion("text_hash not in", values, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashBetween(String value1, String value2) {
+            addCriterion("text_hash between", value1, value2, "textHash");
+            return (Criteria) this;
+        }
+
+        public Criteria andTextHashNotBetween(String value1, String value2) {
+            addCriterion("text_hash not between", value1, value2, "textHash");
+            return (Criteria) this;
+        }
     }
 
     /**

+ 18 - 0
core/src/main/java/com/tzld/videoVector/service/VectorStoreService.java

@@ -143,6 +143,24 @@ public interface VectorStoreService {
      */
     void deleteBatch(String configCode, Collection<Long> videoIds);
 
+    /**
+     * 批量查询指定配置下 text 为空的 videoId
+     * 用于检测已有向量但 text 缺失的记录,便于删除后重新向量化
+     * @param configCode 配置编码
+     * @param videoIds   视频ID列表
+     * @return text 为空的 videoId 集合
+     */
+    Set<Long> findVideoIdsWithEmptyText(String configCode, Collection<Long> videoIds);
+
+    /**
+     * 根据 text_hash 查询已有的向量化结果(复用 embedding)
+     * 相同文本在相同配置下的 embedding 结果是一样的,可以直接复用避免重复调用 API
+     * @param textHash   文本 MD5 哈希
+     * @param configCode 配置编码
+     * @return 向量数据,未找到返回 null
+     */
+    List<Float> getVectorByTextHash(String textHash, String configCode);
+
     /**
      * 在所有向量中搜索 Top-N 最相似的视频(默认配置)
      * @param queryVector 查询向量

+ 39 - 1
core/src/main/java/com/tzld/videoVector/service/impl/PgVectorStoreServiceImpl.java

@@ -5,6 +5,7 @@ import com.tzld.videoVector.model.entity.VideoMatch;
 import com.tzld.videoVector.model.po.pgVector.VideoVector;
 import com.tzld.videoVector.model.vo.VideoVectorSearchResult;
 import com.tzld.videoVector.service.VectorStoreService;
+import com.tzld.videoVector.util.Md5Util;
 import com.tzld.videoVector.util.VectorUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -56,7 +57,8 @@ public class PgVectorStoreServiceImpl implements VectorStoreService {
         }
 
         String embedding = vectorToString(vector);
-        videoVectorMapperExt.upsertVector(videoId, configCode, pointIndex, embedding, text);
+        String textHash = (text != null && !text.isEmpty()) ? Md5Util.encoderByMd5(text) : null;
+        videoVectorMapperExt.upsertVector(videoId, configCode, pointIndex, embedding, text, textHash);
         log.debug("保存向量成功,configCode={}, videoId={}, pointIndex={}, 维度={}", configCode, videoId, pointIndex, vector.size());
     }
 
@@ -201,6 +203,42 @@ public class PgVectorStoreServiceImpl implements VectorStoreService {
         log.info("批量删除向量成功,configCode={}, 数量={}", configCode, videoIds.size());
     }
 
+    @Override
+    public Set<Long> findVideoIdsWithEmptyText(String configCode, Collection<Long> videoIds) {
+        if (videoIds == null || videoIds.isEmpty()) return Collections.emptySet();
+        if (configCode == null || configCode.isEmpty()) {
+            configCode = DEFAULT_CONFIG_CODE;
+        }
+
+        List<Long> idList = new ArrayList<>(videoIds);
+        Set<Long> emptyTextIds = new HashSet<>();
+        int batchSize = 1000;
+        for (int i = 0; i < idList.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, idList.size());
+            List<Long> batch = idList.subList(i, end);
+            List<Long> found = videoVectorMapperExt.selectVideoIdsWithEmptyText(batch, configCode);
+            if (found != null) {
+                emptyTextIds.addAll(found);
+            }
+        }
+        return emptyTextIds;
+    }
+
+    @Override
+    public List<Float> getVectorByTextHash(String textHash, String configCode) {
+        if (textHash == null || textHash.isEmpty()) return null;
+        if (configCode == null || configCode.isEmpty()) {
+            configCode = DEFAULT_CONFIG_CODE;
+        }
+        try {
+            String embedding = videoVectorMapperExt.selectEmbeddingByTextHash(textHash, configCode);
+            return VectorUtils.parseVectorString(embedding);
+        } catch (Exception e) {
+            log.error("根据 text_hash 查询向量失败,hash={}, configCode={}, error={}", textHash, configCode, e.getMessage());
+            return null;
+        }
+    }
+
     // ---------------------------------------------------------------- 搜索
 
     @Override

+ 12 - 0
core/src/main/java/com/tzld/videoVector/service/impl/RedisVectorStoreServiceImpl.java

@@ -320,6 +320,18 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
         log.info("批量删除向量成功,configCode={}, 数量={}", configCode, videoIds.size());
     }
 
+    @Override
+    public Set<Long> findVideoIdsWithEmptyText(String configCode, Collection<Long> videoIds) {
+        // Redis 实现不存储 text 字段,返回空集合
+        return Collections.emptySet();
+    }
+
+    @Override
+    public List<Float> getVectorByTextHash(String textHash, String configCode) {
+        // Redis 实现不存储 text_hash,无法复用
+        return null;
+    }
+
     // ---------------------------------------------------------------- 搜索(优化版)
 
     @Override

+ 21 - 5
core/src/main/resources/mapper/pgVector/VideoVectorMapper.xml

@@ -14,6 +14,7 @@
     <result column="updated_at" jdbcType="TIMESTAMP" property="updatedAt" />
     <result column="point_index" jdbcType="INTEGER" property="pointIndex" />
     <result column="text" jdbcType="VARCHAR" property="text" />
+    <result column="text_hash" jdbcType="VARCHAR" property="textHash" />
   </resultMap>
   <sql id="Example_Where_Clause">
     <!--
@@ -86,7 +87,8 @@
       WARNING - @mbg.generated
       This element is automatically generated by MyBatis Generator, do not modify.
     -->
-    id, video_id, config_code, embedding, created_at, updated_at, point_index, "text"
+    id, video_id, config_code, embedding, created_at, updated_at, point_index, "text", 
+    text_hash
   </sql>
   <select id="selectByExample" parameterType="com.tzld.videoVector.model.po.pgVector.VideoVectorExample" resultMap="BaseResultMap">
     <!--
@@ -141,10 +143,10 @@
     -->
     insert into video_vectors (video_id, config_code, embedding, 
       created_at, updated_at, point_index, 
-      "text")
+      "text", text_hash)
     values (#{videoId,jdbcType=BIGINT}, #{configCode,jdbcType=VARCHAR}, #{embedding,jdbcType=VARCHAR}, 
       #{createdAt,jdbcType=TIMESTAMP}, #{updatedAt,jdbcType=TIMESTAMP}, #{pointIndex,jdbcType=INTEGER}, 
-      #{text,jdbcType=VARCHAR})
+      #{text,jdbcType=VARCHAR}, #{textHash,jdbcType=VARCHAR})
   </insert>
   <insert id="insertSelective" keyColumn="id" keyProperty="id" parameterType="com.tzld.videoVector.model.po.pgVector.VideoVector" useGeneratedKeys="true">
     <!--
@@ -174,6 +176,9 @@
       <if test="text != null">
         "text",
       </if>
+      <if test="textHash != null">
+        text_hash,
+      </if>
     </trim>
     <trim prefix="values (" suffix=")" suffixOverrides=",">
       <if test="videoId != null">
@@ -197,6 +202,9 @@
       <if test="text != null">
         #{text,jdbcType=VARCHAR},
       </if>
+      <if test="textHash != null">
+        #{textHash,jdbcType=VARCHAR},
+      </if>
     </trim>
   </insert>
   <select id="countByExample" parameterType="com.tzld.videoVector.model.po.pgVector.VideoVectorExample" resultType="java.lang.Long">
@@ -240,6 +248,9 @@
       <if test="record.text != null">
         "text" = #{record.text,jdbcType=VARCHAR},
       </if>
+      <if test="record.textHash != null">
+        text_hash = #{record.textHash,jdbcType=VARCHAR},
+      </if>
     </set>
     <if test="_parameter != null">
       <include refid="Update_By_Example_Where_Clause" />
@@ -258,7 +269,8 @@
       created_at = #{record.createdAt,jdbcType=TIMESTAMP},
       updated_at = #{record.updatedAt,jdbcType=TIMESTAMP},
       point_index = #{record.pointIndex,jdbcType=INTEGER},
-      "text" = #{record.text,jdbcType=VARCHAR}
+      "text" = #{record.text,jdbcType=VARCHAR},
+      text_hash = #{record.textHash,jdbcType=VARCHAR}
     <if test="_parameter != null">
       <include refid="Update_By_Example_Where_Clause" />
     </if>
@@ -291,6 +303,9 @@
       <if test="text != null">
         "text" = #{text,jdbcType=VARCHAR},
       </if>
+      <if test="textHash != null">
+        text_hash = #{textHash,jdbcType=VARCHAR},
+      </if>
     </set>
     where id = #{id,jdbcType=BIGINT}
   </update>
@@ -306,7 +321,8 @@
       created_at = #{createdAt,jdbcType=TIMESTAMP},
       updated_at = #{updatedAt,jdbcType=TIMESTAMP},
       point_index = #{pointIndex,jdbcType=INTEGER},
-      "text" = #{text,jdbcType=VARCHAR}
+      "text" = #{text,jdbcType=VARCHAR},
+      text_hash = #{textHash,jdbcType=VARCHAR}
     where id = #{id,jdbcType=BIGINT}
   </update>
 </mapper>

+ 21 - 3
core/src/main/resources/mapper/pgVector/ext/VideoVectorMapperExt.xml

@@ -26,10 +26,10 @@
 
   <!-- Upsert: 插入或更新向量(支持多点模式) -->
   <insert id="upsertVector">
-    INSERT INTO video_vectors (video_id, config_code, point_index, embedding, text, created_at, updated_at)
-    VALUES (#{videoId}, #{configCode}, #{pointIndex}, #{embedding}::vector, #{text}, NOW(), NOW())
+    INSERT INTO video_vectors (video_id, config_code, point_index, embedding, "text", text_hash, created_at, updated_at)
+    VALUES (#{videoId}, #{configCode}, #{pointIndex}, #{embedding}::vector, #{text}, #{textHash}, NOW(), NOW())
     ON CONFLICT (config_code, video_id, point_index)
-    DO UPDATE SET embedding = EXCLUDED.embedding, text = EXCLUDED.text, updated_at = NOW()
+    DO UPDATE SET embedding = EXCLUDED.embedding, "text" = EXCLUDED."text", text_hash = EXCLUDED.text_hash, updated_at = NOW()
   </insert>
 
   <!-- 判断是否存在(任意 pointIndex) -->
@@ -118,4 +118,22 @@
     SELECT DISTINCT video_id FROM video_vectors
   </select>
 
+  <!-- 根据 text_hash + configCode 查询已有的 embedding,用于复用向量化结果 -->
+  <select id="selectEmbeddingByTextHash" resultType="java.lang.String">
+    SELECT embedding::text FROM video_vectors
+    WHERE text_hash = #{textHash} AND config_code = #{configCode}
+    LIMIT 1
+  </select>
+
+  <!-- 批量查询指定 configCode 下 text 为空的 videoId(去重) -->
+  <select id="selectVideoIdsWithEmptyText" resultType="java.lang.Long">
+    SELECT DISTINCT video_id FROM video_vectors
+    WHERE config_code = #{configCode}
+    AND ("text" IS NULL OR "text" = '')
+    AND video_id IN
+    <foreach collection="videoIds" item="vid" open="(" separator="," close=")">
+      #{vid}
+    </foreach>
+  </select>
+
 </mapper>