Browse Source

新增素材解构信息

luojunhui 1 tuần trước cách đây
mục cha
commit
2979b971e0

+ 5 - 0
core/src/main/java/com/tzld/videoVector/dao/mapper/pgVector/ext/MaterialVectorMapperExt.java

@@ -44,4 +44,9 @@ public interface MaterialVectorMapperExt {
 
     int deleteBatchByMaterialIds(@Param("materialIds") List<String> materialIds,
                                  @Param("configCode") String configCode);
+
+    /** 删除指定素材在 configCode 下 point_index >= minPointIndex 的旧向量(多点模式清理用) */
+    int deleteAbovePointIndex(@Param("materialId") String materialId,
+                              @Param("configCode") String configCode,
+                              @Param("minPointIndex") int minPointIndex);
 }

+ 88 - 48
core/src/main/java/com/tzld/videoVector/job/MaterialVectorJob.java

@@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -138,11 +139,13 @@ public class MaterialVectorJob {
         log.info("aigc.material.task.source.map: {}", aigcMaterialTaskSourceMap);
 
         // 1. 收集 (materialId, taskInstanceId) 和 (materialId, sourceType) 映射
-        // 同一 materialId 在多个 task 中出现,后处理 task 的 sourceType 会覆盖前者
+        // 按 taskId 自然序遍历保证确定性;同一 materialId 在多个 task 中出现,taskId 较大的覆盖前者
         Map<String, Long> materialIdToTaskInstanceId = new HashMap<>();
         Map<String, Short> materialIdToSourceType = new HashMap<>();
 
-        for (Map.Entry<String, Short> entry : aigcMaterialTaskSourceMap.entrySet()) {
+        List<Map.Entry<String, Short>> sortedEntries = new ArrayList<>(aigcMaterialTaskSourceMap.entrySet());
+        sortedEntries.sort(Comparator.comparingInt(e -> Integer.parseInt(e.getKey())));
+        for (Map.Entry<String, Short> entry : sortedEntries) {
             Integer taskId;
             try {
                 taskId = Integer.parseInt(entry.getKey());
@@ -163,7 +166,11 @@ public class MaterialVectorJob {
                     continue;
                 }
                 materialIdToTaskInstanceId.put(materialId, input.getTaskInstanceId());
-                materialIdToSourceType.put(materialId, sourceType);
+                Short prevSourceType = materialIdToSourceType.put(materialId, sourceType);
+                if (prevSourceType != null && !prevSourceType.equals(sourceType)) {
+                    log.warn("materialId={} 在 taskId={}(sourceType={}) 中 sourceType 被覆盖,原值={}",
+                            materialId, taskId, sourceType, prevSourceType);
+                }
             }
             log.info("taskId={} sourceType={} 拉到 {} 条素材", taskId, sourceType, taskInputList.size());
         }
@@ -191,34 +198,38 @@ public class MaterialVectorJob {
 
             // 并发调 detail 接口
             ExecutorService executor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
-            List<Future<?>> futures = new ArrayList<>();
-            List<MaterialDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
-
-            for (String materialId : needSyncIds) {
-                futures.add(executor.submit(() -> {
-                    try {
-                        Long taskInstanceId = materialIdToTaskInstanceId.get(materialId);
-                        if (taskInstanceId == null) return;
-                        JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
-                        if (dataContent != null) {
-                            MaterialDeconstructResult r = new MaterialDeconstructResult();
-                            r.setMaterialId(materialId);
-                            r.setSource(SOURCE_AIGC);
-                            r.setResult(dataContent.toJSONString());
-                            r.setSourceType(materialIdToSourceType.getOrDefault(materialId, defaultSourceType));
-                            batch.add(r);
+            try {
+                List<Future<?>> futures = new ArrayList<>();
+                List<MaterialDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
+
+                for (String materialId : needSyncIds) {
+                    futures.add(executor.submit(() -> {
+                        try {
+                            Long taskInstanceId = materialIdToTaskInstanceId.get(materialId);
+                            if (taskInstanceId == null) return;
+                            JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
+                            if (dataContent != null) {
+                                MaterialDeconstructResult r = new MaterialDeconstructResult();
+                                r.setMaterialId(materialId);
+                                r.setSource(SOURCE_AIGC);
+                                r.setResult(dataContent.toJSONString());
+                                r.setSourceType(materialIdToSourceType.getOrDefault(materialId, defaultSourceType));
+                                batch.add(r);
+                            }
+                        } catch (Exception e) {
+                            log.error("同步 materialId={} 失败: {}", materialId, e.getMessage());
                         }
-                    } catch (Exception e) {
-                        log.error("同步 materialId={} 失败: {}", materialId, e.getMessage());
-                    }
-                }));
-            }
-            awaitAndShutdown(futures, executor, 30, "素材同步");
+                    }));
+                }
+                awaitAndShutdown(futures, executor, 30, "素材同步");
 
-            if (!batch.isEmpty()) {
-                for (List<MaterialDeconstructResult> subBatch : Lists.partition(batch, 200)) {
-                    insertCount.addAndGet(materialDeconstructResultMapperExt.batchInsertIgnore(subBatch));
+                if (!batch.isEmpty()) {
+                    for (List<MaterialDeconstructResult> subBatch : Lists.partition(batch, 200)) {
+                        insertCount.addAndGet(materialDeconstructResultMapperExt.batchInsertIgnore(subBatch));
+                    }
                 }
+            } finally {
+                executor.shutdownNow();
             }
         }
     }
@@ -282,13 +293,17 @@ public class MaterialVectorJob {
 
                 // 3. 对每个配置并发处理
                 ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
-                List<Future<?>> configFutures = new ArrayList<>();
-                for (DeconstructVectorConfig config : configs) {
-                    configFutures.add(configExecutor.submit(() ->
-                            processConfigForMaterial(config, materialIds, parsedById, totalSuccessCount, totalFailCount)
-                    ));
+                try {
+                    List<Future<?>> configFutures = new ArrayList<>();
+                    for (DeconstructVectorConfig config : configs) {
+                        configFutures.add(configExecutor.submit(() ->
+                                processConfigForMaterial(config, materialIds, parsedById, totalSuccessCount, totalFailCount)
+                        ));
+                    }
+                    awaitAndShutdown(configFutures, configExecutor, 30, "素材向量化配置并发");
+                } finally {
+                    configExecutor.shutdownNow();
                 }
-                awaitAndShutdown(configFutures, configExecutor, 30, "素材向量化配置并发");
 
                 totalProcessed.addAndGet(materialIds.size());
 
@@ -320,7 +335,7 @@ public class MaterialVectorJob {
     private Map<String, ParsedMaterial> loadParsedMaterials(List<String> materialIds) {
         List<MaterialDeconstructResult> results = materialDeconstructResultMapperExt
                 .selectResultsByMaterialIds(SOURCE_AIGC, materialIds);
-        Map<String, ParsedMaterial> map = new HashMap<>(results.size() * 2);
+        Map<String, ParsedMaterial> map = new HashMap<>(materialIds.size());
         for (MaterialDeconstructResult r : results) {
             if (r == null || !StringUtils.hasText(r.getResult())) continue;
             JSONObject dataContent;
@@ -428,9 +443,14 @@ public class MaterialVectorJob {
                 truncated.add(text);
             }
             for (int i = 0; i < vectors.size(); i++) {
-                materialVectorStoreService.save(configCode, materialId, i, vectors.get(i), truncated.get(i), sourceType);
+                if (!materialVectorStoreService.save(configCode, materialId, i, vectors.get(i), truncated.get(i), sourceType)) {
+                    log.error("materialId={} 配置 {} 第{}个点 save 返回 false", materialId, configCode, i);
+                    return false;
+                }
             }
-            log.info("materialId={} 配置 {} 多点向量化存储成功,共 {} 个点", materialId, configCode, vectors.size());
+            // 清理不再需要的旧点(例如上次 5 个点,本次只有 3 个)
+            materialVectorStoreService.deleteAbovePointIndex(configCode, materialId, vectors.size());
+            log.debug("materialId={} 配置 {} 多点向量化存储成功,共 {} 个点", materialId, configCode, vectors.size());
             return true;
         } else {
             String text = null;
@@ -452,8 +472,11 @@ public class MaterialVectorJob {
                 log.error("materialId={} 配置 {} 文本向量化失败", materialId, configCode);
                 return false;
             }
-            materialVectorStoreService.save(configCode, materialId, vector, text, sourceType);
-            log.info("materialId={} 配置 {} 向量化存储成功", materialId, configCode);
+            if (!materialVectorStoreService.save(configCode, materialId, vector, text, sourceType)) {
+                log.error("materialId={} 配置 {} save 返回 false", materialId, configCode);
+                return false;
+            }
+            log.debug("materialId={} 配置 {} 向量化存储成功", materialId, configCode);
             return true;
         }
     }
@@ -468,7 +491,7 @@ public class MaterialVectorJob {
         if (StringUtils.hasText(textHash)) {
             List<Float> cached = materialVectorStoreService.getVectorByTextHash(textHash, configCode);
             if (cached != null && !cached.isEmpty()) {
-                log.info("命中 text_hash 缓存(material),hash={}, configCode={}", textHash, configCode);
+                log.debug("命中 text_hash 缓存(material),hash={}, configCode={}", textHash, configCode);
                 return cached;
             }
         }
@@ -513,7 +536,7 @@ public class MaterialVectorJob {
     }
 
     // ====================================================================
-    // 复用 VideoVectorJob 的私有方法(直接 copy 一份,未来再统一抽工具类)
+    // TODO: 与 VideoVectorJob 的提取逻辑统一抽取到 VectorUtils / ExtractionUtils,避免两边各自维护
     // ====================================================================
 
     /**
@@ -625,7 +648,8 @@ public class MaterialVectorJob {
                             texts.add(name);
                         }
                     }
-                } catch (Exception ignored) {
+                } catch (Exception e) {
+                    log.debug("extractTextsFromPointDecomposition 单点处理异常 pointName={}: {}", pointName, e.getMessage());
                 }
             }
         } catch (Exception e) {
@@ -641,7 +665,8 @@ public class MaterialVectorJob {
         for (String key : new String[]{"具体元素", "具象概念", "抽象概念"}) {
             try {
                 collectNamesFromArray(substance.getJSONArray(key), names);
-            } catch (Exception ignored) {
+            } catch (Exception e) {
+                log.debug("extractSubstanceNames key={} 异常: {}", key, e.getMessage());
             }
         }
         return names;
@@ -654,7 +679,8 @@ public class MaterialVectorJob {
         for (String key : new String[]{"具体元素形式", "具象概念形式", "整体形式"}) {
             try {
                 collectNamesFromArray(form.getJSONArray(key), names);
-            } catch (Exception ignored) {
+            } catch (Exception e) {
+                log.debug("extractFormNames key={} 异常: {}", key, e.getMessage());
             }
         }
         return names;
@@ -671,7 +697,8 @@ public class MaterialVectorJob {
                         names.add(name);
                     }
                 }
-            } catch (Exception ignored) {
+            } catch (Exception e) {
+                log.debug("collectNamesFromArray 单元素解析异常: {}", e.getMessage());
             }
         }
     }
@@ -687,7 +714,8 @@ public class MaterialVectorJob {
                     if (StringUtils.hasText(word) && contribution != null) {
                         map.put(word, contribution);
                     }
-                } catch (Exception ignored) {
+                } catch (Exception e) {
+                    log.debug("buildContributionMap 单元素解析异常: {}", e.getMessage());
                 }
             }
         } catch (Exception e) {
@@ -738,11 +766,23 @@ public class MaterialVectorJob {
 
     private void awaitAndShutdown(List<Future<?>> futures, ExecutorService executor,
                                   long timeoutMinutes, String taskDesc) {
+        long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(timeoutMinutes);
+        int completed = 0;
         for (Future<?> future : futures) {
+            long remaining = deadline - System.currentTimeMillis();
+            if (remaining <= 0) {
+                log.error("{} 整体超时({}分钟),已取消剩余任务 (已完成 {}/{})",
+                        taskDesc, timeoutMinutes, completed, futures.size());
+                for (Future<?> f : futures) {
+                    f.cancel(true);
+                }
+                break;
+            }
             try {
-                future.get(timeoutMinutes, TimeUnit.MINUTES);
+                future.get(remaining, TimeUnit.MILLISECONDS);
+                completed++;
             } catch (Exception e) {
-                log.error("{} 并发任务等待异常: {}", taskDesc, e.getMessage(), e);
+                log.error("{} 并发任务等待异常: {}", taskDesc, e.getMessage());
             }
         }
         executor.shutdown();

+ 13 - 2
core/src/main/java/com/tzld/videoVector/service/MaterialVectorStoreService.java

@@ -13,16 +13,24 @@ import java.util.Set;
  */
 public interface MaterialVectorStoreService {
 
-    void save(String configCode, String materialId, List<Float> vector, String text, Short sourceType);
+    /**
+     * @return true 写入成功,false 参数校验失败
+     */
+    boolean save(String configCode, String materialId, List<Float> vector, String text, Short sourceType);
 
-    void save(String configCode, String materialId, int pointIndex, List<Float> vector, String text, Short sourceType);
+    /**
+     * @return true 写入成功,false 参数校验失败
+     */
+    boolean save(String configCode, String materialId, int pointIndex, List<Float> vector, String text, Short sourceType);
 
     boolean exists(String configCode, String materialId);
 
     Set<String> existsByIds(String configCode, Collection<String> materialIds);
 
+    /** @deprecated 不支持单条精确查询,请使用 searchTopN */
     List<Float> getVector(String configCode, String materialId);
 
+    /** @deprecated 不支持批量精确查询,请使用 searchTopN */
     Map<String, List<Float>> getVectors(String configCode, Collection<String> materialIds);
 
     Set<String> getAllMaterialIds(String configCode);
@@ -31,6 +39,9 @@ public interface MaterialVectorStoreService {
 
     void deleteBatch(String configCode, Collection<String> materialIds);
 
+    /** 删除多点模式下 point_index >= minPointIndex 的旧向量 */
+    void deleteAbovePointIndex(String configCode, String materialId, int minPointIndex);
+
     List<Float> getVectorByTextHash(String textHash, String configCode);
 
     /**

+ 20 - 20
core/src/main/java/com/tzld/videoVector/service/impl/PgMaterialVectorStoreServiceImpl.java

@@ -1,5 +1,6 @@
 package com.tzld.videoVector.service.impl;
 
+import com.tzld.videoVector.common.constant.VectorConstants;
 import com.tzld.videoVector.dao.mapper.pgVector.ext.MaterialVectorMapperExt;
 import com.tzld.videoVector.model.entity.MaterialMatch;
 import com.tzld.videoVector.model.po.pgVector.MaterialVector;
@@ -29,26 +30,27 @@ public class PgMaterialVectorStoreServiceImpl implements MaterialVectorStoreServ
     private MaterialVectorMapperExt materialVectorMapperExt;
 
     @Override
-    public void save(String configCode, String materialId, List<Float> vector, String text, Short sourceType) {
-        save(configCode, materialId, 0, vector, text, sourceType);
+    public boolean save(String configCode, String materialId, List<Float> vector, String text, Short sourceType) {
+        return save(configCode, materialId, 0, vector, text, sourceType);
     }
 
     @Override
-    public void save(String configCode, String materialId, int pointIndex, List<Float> vector, String text, Short sourceType) {
+    public boolean save(String configCode, String materialId, int pointIndex, List<Float> vector, String text, Short sourceType) {
         if (!StringUtils.hasText(materialId) || vector == null || vector.isEmpty()) {
             log.error("save 参数非法,configCode={}, materialId={}", configCode, materialId);
-            return;
+            return false;
         }
         if (configCode == null || configCode.isEmpty()) {
             log.error("save configCode 不能为空");
-            return;
+            return false;
         }
 
         String embedding = vectorToString(vector);
         String textHash = (text != null && !text.isEmpty()) ? Md5Util.encoderByMd5(text) : null;
         materialVectorMapperExt.upsertVector(materialId, configCode, pointIndex, embedding, text, textHash, sourceType);
-        log.info("保存素材向量成功,configCode={}, materialId={}, pointIndex={}, sourceType={}, 维度={}",
+        log.debug("保存素材向量成功,configCode={}, materialId={}, pointIndex={}, sourceType={}, 维度={}",
                 configCode, materialId, pointIndex, sourceType, vector.size());
+        return true;
     }
 
     @Override
@@ -65,9 +67,8 @@ public class PgMaterialVectorStoreServiceImpl implements MaterialVectorStoreServ
 
         List<String> idList = new ArrayList<>(materialIds);
         Set<String> existing = new HashSet<>();
-        int batchSize = 1000;
-        for (int i = 0; i < idList.size(); i += batchSize) {
-            int end = Math.min(i + batchSize, idList.size());
+        for (int i = 0; i < idList.size(); i += VectorConstants.ODPS_IN_BATCH_SIZE) {
+            int end = Math.min(i + VectorConstants.ODPS_IN_BATCH_SIZE, idList.size());
             List<String> batch = idList.subList(i, end);
             List<String> found = materialVectorMapperExt.selectExistingMaterialIds(batch, configCode);
             if (found != null) {
@@ -79,18 +80,12 @@ public class PgMaterialVectorStoreServiceImpl implements MaterialVectorStoreServ
 
     @Override
     public List<Float> getVector(String configCode, String materialId) {
-        if (!StringUtils.hasText(materialId) || configCode == null || configCode.isEmpty()) return null;
-        log.info("getVector 暂不支持单条精确查询,请通过 searchTopN 获取,configCode={}, materialId={}", configCode, materialId);
-        return null;
+        throw new UnsupportedOperationException("getVector 暂不支持,请使用 searchTopN");
     }
 
     @Override
     public Map<String, List<Float>> getVectors(String configCode, Collection<String> materialIds) {
-        if (materialIds == null || materialIds.isEmpty() || configCode == null || configCode.isEmpty()) {
-            return Collections.emptyMap();
-        }
-        log.info("getVectors 暂不支持批量精确查询,请通过 searchTopN 获取");
-        return Collections.emptyMap();
+        throw new UnsupportedOperationException("getVectors 暂不支持,请使用 searchTopN");
     }
 
     @Override
@@ -115,15 +110,20 @@ public class PgMaterialVectorStoreServiceImpl implements MaterialVectorStoreServ
         if (materialIds == null || materialIds.isEmpty() || configCode == null || configCode.isEmpty()) return;
 
         List<String> idList = new ArrayList<>(materialIds);
-        int batchSize = 1000;
-        for (int i = 0; i < idList.size(); i += batchSize) {
-            int end = Math.min(i + batchSize, idList.size());
+        for (int i = 0; i < idList.size(); i += VectorConstants.ODPS_IN_BATCH_SIZE) {
+            int end = Math.min(i + VectorConstants.ODPS_IN_BATCH_SIZE, idList.size());
             List<String> batch = idList.subList(i, end);
             materialVectorMapperExt.deleteBatchByMaterialIds(batch, configCode);
         }
         log.info("批量删除素材向量成功,configCode={}, 数量={}", configCode, materialIds.size());
     }
 
+    @Override
+    public void deleteAbovePointIndex(String configCode, String materialId, int minPointIndex) {
+        if (!StringUtils.hasText(materialId) || configCode == null || configCode.isEmpty()) return;
+        materialVectorMapperExt.deleteAbovePointIndex(materialId, configCode, minPointIndex);
+    }
+
     @Override
     public List<Float> getVectorByTextHash(String textHash, String configCode) {
         if (textHash == null || textHash.isEmpty() || configCode == null || configCode.isEmpty()) return null;

+ 17 - 4
core/src/main/java/com/tzld/videoVector/service/recall/impl/VectorRecallTestServiceImpl.java

@@ -35,6 +35,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -95,6 +96,19 @@ public class VectorRecallTestServiceImpl implements VectorRecallTestService {
             new LinkedBlockingQueue<>(128),
             new ThreadPoolExecutor.CallerRunsPolicy());
 
+    @PreDestroy
+    public void shutdownRecallExecutor() {
+        RECALL_EXECUTOR.shutdown();
+        try {
+            if (!RECALL_EXECUTOR.awaitTermination(10, TimeUnit.SECONDS)) {
+                RECALL_EXECUTOR.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            RECALL_EXECUTOR.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private static final String PLACEHOLDER = "--";
 
     /**
@@ -499,7 +513,7 @@ public class VectorRecallTestServiceImpl implements VectorRecallTestService {
         try {
             JSONObject decoded = videoSearchService.parseDecodeResult(raw);
             JSONObject base = decoded != null ? decoded : raw;
-            Map<String, Object> flat = buildFlatDeconstruct(base.toJSONString());
+            Map<String, Object> flat = buildFlatDeconstruct(base);
             return (flat != null && !flat.isEmpty()) ? flat : null;
         } catch (Exception e) {
             log.info("解析素材解构失败: {}", e.getMessage());
@@ -595,8 +609,7 @@ public class VectorRecallTestServiceImpl implements VectorRecallTestService {
      * 把解构 JSON 转成扁平结构 (topic + 灵感点/关键点/目的点 及其实质)
      * 与 VideoSearchServiceImpl#buildFlatDeconstruct 输出一致
      */
-    private Map<String, Object> buildFlatDeconstruct(String json) {
-        JSONObject src = JSONObject.parseObject(json);
+    private Map<String, Object> buildFlatDeconstruct(JSONObject src) {
         if (src == null) {
             return null;
         }
@@ -825,7 +838,7 @@ public class VectorRecallTestServiceImpl implements VectorRecallTestService {
         }
         try {
             String normalized = imagesJson.trim();
-            // 兼容双重 JSON 编码: "\"[\\\"url\\\"]\""
+            // 兼容双重 JSON 编码,临时兜底;根本修复应在数据写入 material_deconstruct_result.result 时做一次 JSON 规范化
             if (normalized.startsWith("\"") && normalized.endsWith("\"")) {
                 Object unquoted = JSON.parse(normalized);
                 if (unquoted instanceof String) {

+ 8 - 0
core/src/main/resources/mapper/pgVector/ext/MaterialVectorMapperExt.xml

@@ -142,4 +142,12 @@
         </foreach>
     </delete>
 
+    <!-- 删除多点模式下 point_index >= minPointIndex 的旧向量 -->
+    <delete id="deleteAbovePointIndex">
+        DELETE FROM material_vectors
+        WHERE material_id = #{materialId}
+          AND config_code = #{configCode}
+          AND point_index >= #{minPointIndex}
+    </delete>
+
 </mapper>