浏览代码

优化视频向量化任务中的数据处理流程

wangyunpeng 10 小时之前
父节点
当前提交
623596bfea

+ 55 - 51
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.aliyun.odps.data.Record;
+import com.google.common.collect.Lists;
 import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructContentMapper;
 import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.DeconstructVectorConfigMapper;
 import com.tzld.videoVector.model.entity.DeconstructResult;
@@ -88,79 +89,78 @@ public class VideoVectorJob {
             while (true) {
                 // 2. 分页查询 videoId 列表
                 List<Long> videoIds = queryVideoIdsByPage(pageNum, PAGE_SIZE);
-                if (videoIds == null || videoIds.isEmpty()) {
+                if (CollectionUtils.isEmpty(videoIds)) {
                     log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
                     break;
                 }
                 log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
 
-                // 3. 批量查询视频详情(包含 raw_result)
-                Map<Long, String> videoRawResults = batchQueryVideoRawResults(videoIds);
-
-                // 4. 对每个配置进行处理
+                // 3. 对每个配置进行处理
                 for (DeconstructVectorConfig config : configs) {
                     String configCode = config.getConfigCode();
-                    
-                    // 4.1 查询哪些 videoId 在该配置下已有向量
+                    // 3.1 查询哪些 videoId 在该配置下已有向量
                     Set<Long> existingIds = vectorStoreService.existsByIds(configCode, videoIds);
-                    
-                    // 4.2 过滤出需要处理的 videoId
+                    // 3.2 过滤出需要处理的 videoId(排除已有向量的)
                     List<Long> needProcessIds = videoIds.stream()
-                            .filter(id -> !existingIds.contains(id) && videoRawResults.containsKey(id))
+                            .filter(id -> !existingIds.contains(id))
                             .collect(Collectors.toList());
                     
                     if (needProcessIds.isEmpty()) {
                         log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
                         continue;
                     }
-                    
                     log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
 
-                    // 4.3 逐个处理
-                    for (Long videoId : needProcessIds) {
-                        try {
-                            String rawResult = videoRawResults.get(videoId);
-                            if (!StringUtils.hasText(rawResult)) {
-                                log.debug("videoId={} raw_result 为空,跳过", videoId);
-                                totalFailCount++;
-                                continue;
-                            }
+                    // 3.3 批量查询需要处理的视频 raw_result
+                    for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
+                        Map<Long, String> videoRawResults = batchQueryVideoRawResults(partition);
+                        if (videoRawResults.isEmpty()) {
+                            log.warn("配置 {} 未查询到任何 raw_result", configCode);
+                            continue;
+                        }
 
-                            // 根据配置提取文本
-                            List<String> texts = extractTextsFromRawResult(rawResult, config);
-                            if (CollectionUtils.isEmpty(texts)) {
-                                log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
-                                totalFailCount++;
-                                continue;
-                            }
+                        // 3.4 逐个处理
+                        for (Long videoId : partition) {
+                            try {
+                                String rawResult = videoRawResults.get(videoId);
+                                if (!StringUtils.hasText(rawResult)) {
+                                    log.debug("videoId={} raw_result 为空,跳过", videoId);
+                                    totalFailCount++;
+                                    continue;
+                                }
+
+                                // 根据配置提取文本
+                                List<String> texts = extractTextsFromRawResult(rawResult, config);
+                                if (CollectionUtils.isEmpty(texts)) {
+                                    log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
+                                    totalFailCount++;
+                                    continue;
+                                }
+
+                                // 向量化并存储
+                                boolean success = vectorizeAndStore(config, videoId, texts);
+                                if (success) {
+                                    totalSuccessCount++;
+                                } else {
+                                    totalFailCount++;
+                                }
 
-                            // 向量化并存储
-                            boolean success = vectorizeAndStore(config, videoId, texts);
-                            if (success) {
-                                totalSuccessCount++;
-                            } else {
+                            } catch (Exception e) {
+                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
                                 totalFailCount++;
                             }
-
-                        } catch (Exception e) {
-                            log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
-                            totalFailCount++;
                         }
                     }
                 }
-
                 // 如果查询到的数据少于 PAGE_SIZE,说明已经是最后一页
                 if (videoIds.size() < PAGE_SIZE) {
                     log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), PAGE_SIZE);
                     break;
                 }
-
                 pageNum++;
             }
-
             log.info("视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount, totalFailCount, pageNum + 1);
             return ReturnT.SUCCESS;
-
         } catch (Exception e) {
             log.error("视频向量化任务执行失败: {}", e.getMessage(), e);
             return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
@@ -193,9 +193,9 @@ public class VideoVectorJob {
                 .collect(Collectors.joining(","));
         
         String sql = String.format(
-                "SELECT video_id, raw_result " +
+                "SELECT content_id, raw_result " +
                         "FROM videoods.content_profile " +
-                        "WHERE status = 3 AND is_deleted = 0 AND video_id IN (%s)",
+                        "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);",
                 idsStr);
         
         List<Record> records = OdpsUtil.getOdpsData(sql);
@@ -205,8 +205,8 @@ public class VideoVectorJob {
         
         Map<Long, String> result = new HashMap<>();
         for (Record record : records) {
-            Long videoId = record.getBigint("video_id");
-            String rawResult = record.getString("raw_result");
+            Long videoId = Long.valueOf(record.getString(0));
+            String rawResult = record.getString(1);
             if (videoId != null && rawResult != null) {
                 result.put(videoId, rawResult);
             }
@@ -383,20 +383,24 @@ public class VideoVectorJob {
     private List<Long> queryVideoIdsByPage(int pageNum, int pageSize) {
         int offset = pageNum * pageSize;
         String sql = String.format(
-                "SELECT video_id " +
+                "SELECT content_id " +
                         "FROM videoods.content_profile " +
                         "WHERE status = 3 AND is_deleted = 0 " +
-                        "ORDER BY video_id " +
-                        "LIMIT %d, %d",
+                        "ORDER BY content_id " +
+                        "LIMIT %d, %d;",
                 offset, pageSize);
         List<Record> records = OdpsUtil.getOdpsData(sql);
         if (records == null || records.isEmpty()) {
             return new ArrayList<>();
         }
-        return records.stream()
-                .map(record -> record.getBigint("video_id"))
-                .filter(Objects::nonNull)
-                .collect(Collectors.toList());
+        List<Long> videoIds = new ArrayList<>();
+        for (Record record : records) {
+            Long contentId = Long.valueOf(record.getString(0));
+            if (contentId != null) {
+                videoIds.add(contentId);
+            }
+        }
+        return videoIds;
     }
 
     /**

+ 2 - 1
core/src/main/java/com/tzld/videoVector/service/impl/RedisVectorStoreServiceImpl.java

@@ -5,6 +5,7 @@ import com.tzld.videoVector.model.entity.VideoMatch;
 import com.tzld.videoVector.service.VectorStoreService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisCallback;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
@@ -86,7 +87,7 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
                 .collect(Collectors.toList());
 
         List<Boolean> results = redisTemplate.executePipelined(
-                (org.springframework.data.redis.core.RedisCallback<Object>) conn -> {
+                (RedisCallback<Object>) conn -> {
                     for (String key : keys) {
                         conn.exists(key.getBytes());
                     }