Просмотр исходного кода

优化向量缓存刷新逻辑

wangyunpeng 6 часов назад
Родитель
Сommit
71c5197a53

+ 90 - 8
core/src/main/java/com/tzld/videoVector/service/impl/RedisVectorStoreServiceImpl.java

@@ -3,7 +3,10 @@ package com.tzld.videoVector.service.impl;
 import com.alibaba.fastjson.JSONArray;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructVectorConfigMapper;
 import com.tzld.videoVector.model.entity.VideoMatch;
+import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfig;
+import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfigExample;
 import com.tzld.videoVector.service.VectorStoreService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,7 +47,7 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
     /** 本地缓存:configCode -> (videoId -> 归一化向量) */
     private final Cache<String, Map<Long, float[]>> vectorCache = CacheBuilder.newBuilder()
             .maximumSize(10)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
+            .expireAfterAccess(180, TimeUnit.MINUTES)
             .build();
 
     /** 缓存读写锁 */
@@ -53,6 +56,9 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
 
+    @Autowired
+    private DeconstructVectorConfigMapper vectorConfigMapper;
+
     // ---------------------------------------------------------------- CRUD
 
     @Override
@@ -394,18 +400,94 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
     }
 
     /**
-     * 定时刷新缓存(每5分钟)
+     * 定时刷新缓存(每10分钟)
+     * 直接覆盖写入,不先清空,避免时间差
+     * 旧数据通过 expireAfterAccess 自动过期
      */
-    @Scheduled(fixedRate = 10 * 60 * 1000)
+    @Scheduled(fixedRate = 60 * 60 * 1000)
     public void refreshCache() {
         log.info("开始定时刷新向量缓存");
-        cacheLock.writeLock().lock();
+
+        // 从数据库获取所有启用的配置
+        Set<String> configCodes = getAllConfigCodes();
+        int totalLoaded = 0;
+
+        for (String configCode : configCodes) {
+            // 直接从 Redis 加载并覆盖写入缓存
+            Map<Long, float[]> vectors = reloadVectorsFromRedis(configCode);
+            if (!vectors.isEmpty()) {
+                cacheLock.writeLock().lock();
+                try {
+                    vectorCache.put(configCode, vectors);
+                } finally {
+                    cacheLock.writeLock().unlock();
+                }
+                totalLoaded += vectors.size();
+                log.info("配置 [{}] 已刷新 {} 条向量数据", configCode, vectors.size());
+            }
+        }
+        log.info("向量缓存刷新完成,共加载 {} 条数据", totalLoaded);
+    }
+
+    /**
+     * 从 Redis 重新加载指定配置的向量数据(不经过本地缓存)
+     */
+    private Map<Long, float[]> reloadVectorsFromRedis(String configCode) {
+        Set<Long> allIds = getAllVideoIds(configCode);
+        if (allIds.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        log.info("从 Redis 加载向量数据,configCode={},数量={}", configCode, allIds.size());
+
+        List<Long> idList = new ArrayList<>(allIds);
+        List<String> keys = idList.stream().map(id -> buildKey(configCode, id)).collect(Collectors.toList());
+
+        // 分批加载
+        int batchSize = 1000;
+        Map<Long, float[]> allVectors = new HashMap<>(idList.size());
+
+        for (int i = 0; i < keys.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, keys.size());
+            List<String> batchKeys = keys.subList(i, end);
+            List<String> values = redisTemplate.opsForValue().multiGet(batchKeys);
+
+            if (values != null) {
+                for (int j = 0; j < batchKeys.size(); j++) {
+                    float[] vector = parseVector(values.get(j));
+                    if (vector != null) {
+                        allVectors.put(idList.get(i + j), vector);
+                    }
+                }
+            }
+        }
+
+        return allVectors;
+    }
+
+    /**
+     * 获取所有配置编码(从数据库查询启用的配置)
+     */
+    private Set<String> getAllConfigCodes() {
+        Set<String> configCodes = new HashSet<>();
         try {
-            vectorCache.invalidateAll();
-            log.info("向量缓存已清空,下次查询时重新加载");
-        } finally {
-            cacheLock.writeLock().unlock();
+            // 从数据库查询所有启用的配置
+            DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
+            example.createCriteria().andEnabledEqualTo((byte) 1);
+            List<DeconstructVectorConfig> configs = vectorConfigMapper.selectByExample(example);
+
+            for (DeconstructVectorConfig config : configs) {
+                if (config.getConfigCode() != null && !config.getConfigCode().isEmpty()) {
+                    configCodes.add(config.getConfigCode());
+                }
+            }
+            log.info("从数据库加载到 {} 个启用的向量化配置", configCodes.size());
+        } catch (Exception e) {
+            log.error("从数据库获取配置编码失败: {}", e.getMessage());
         }
+        // 确保默认配置存在
+        configCodes.add(DEFAULT_CONFIG_CODE);
+        return configCodes;
     }
 
     /**