wangyunpeng пре 1 недеља
родитељ
комит
727d4a2d2d

+ 301 - 26
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -69,8 +69,8 @@ public class VideoVectorJob {
         log.info("开始执行视频向量化任务, param: {}", param);
 
         try {
-            // 1. 获取所有启用的向量化配置(不限制内容类型)
-            List<DeconstructVectorConfig> configs = getEnabledConfigs();
+            // 1. 获取 result_json 来源的向量化配置
+            List<DeconstructVectorConfig> configs = getEnabledConfigsBySourceField("result_json");
             if (CollectionUtils.isEmpty(configs)) {
                 log.warn("未找到启用的向量化配置");
                 return ReturnT.SUCCESS;
@@ -178,15 +178,6 @@ public class VideoVectorJob {
         }
     }
 
-    /**
-     * 获取启用的向量化配置(不限制内容类型)
-     */
-    private List<DeconstructVectorConfig> getEnabledConfigs() {
-        DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
-        example.createCriteria().andEnabledEqualTo((short) 1);
-        example.setOrderByClause("priority ASC");
-        return vectorConfigMapper.selectByExample(example);
-    }
 
     /**
      * 批量查询视频的 raw_result
@@ -259,10 +250,12 @@ public class VideoVectorJob {
 
     /**
      * 带置信度过滤的文本提取
-     * 从数组路径中提取满足置信度条件的文本
+     * 支持两种路径模式:
+     * - 数组路径(以 [*] 结尾):从数组中提取满足置信度条件的文本
+     * - 单对象路径(不以 [*] 结尾):对单个对象进行置信度检查后提取文本
      *
      * @param json       原始JSON
-     * @param sourcePath 数组路径(如 $.final_normalization_rebuild.keypoint_final.最终关键点列表[*])
+     * @param sourcePath 路径(如 $.keypoint_final.最终关键点列表[*] 或 $.最终选题
      * @param extractRule 提取规则JSON(如 {"text_field":"关键点","confidence_field":"置信度","confidence_threshold":0.8})
      * @return 满足置信度条件的文本列表
      */
@@ -280,20 +273,34 @@ public class VideoVectorJob {
                 return texts;
             }
 
-            // 提取数组项(sourcePath 以 [*] 结尾,提取整个对象列表)
-            List<JSONObject> items = VectorUtils.extractArrayItemsFromJson(json, sourcePath);
-
-            for (JSONObject item : items) {
-                if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
-                    String text = item.getString(textField);
-                    if (StringUtils.hasText(text)) {
-                        texts.add(text);
+            if (sourcePath.endsWith("[*]")) {
+                // 数组模式:提取数组项,逐个检查置信度
+                List<JSONObject> items = VectorUtils.extractArrayItemsFromJson(json, sourcePath);
+                for (JSONObject item : items) {
+                    if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
+                        String text = item.getString(textField);
+                        if (StringUtils.hasText(text)) {
+                            texts.add(text);
+                        }
+                    }
+                }
+                log.debug("置信度过滤(数组):路径={}, 总数={}, 满足条件={}", sourcePath, items.size(), texts.size());
+            } else {
+                // 单对象模式:导航到目标对象,检查置信度后提取文本
+                List<String> pathValues = VectorUtils.extractFromJson(json, sourcePath);
+                if (!pathValues.isEmpty()) {
+                    // sourcePath 指向父对象,需要获取父对象进行置信度检查
+                    JSONObject targetObj = navigateToObject(json, sourcePath);
+                    if (targetObj != null && isConfidenceQualified(targetObj, confidenceField, confidenceThreshold)) {
+                        String text = targetObj.getString(textField);
+                        if (StringUtils.hasText(text)) {
+                            texts.add(text);
+                        }
                     }
+                    log.debug("置信度过滤(单对象):路径={}, 通过={}", sourcePath, !texts.isEmpty());
                 }
             }
 
-            log.debug("置信度过滤:路径={}, 总数={}, 满足条件={}", sourcePath, items.size(), texts.size());
-
         } catch (Exception e) {
             log.error("置信度过滤提取失败: path={}, error={}", sourcePath, e.getMessage());
         }
@@ -301,6 +308,35 @@ public class VideoVectorJob {
         return texts;
     }
 
+    /**
+     * 根据 JSON 路径导航到目标对象
+     * 支持 $.key1.key2 格式的嵌套路径
+     *
+     * @param json 原始JSON
+     * @param path 路径(如 $.最终选题)
+     * @return 目标 JSONObject,找不到返回 null
+     */
+    private JSONObject navigateToObject(JSONObject json, String path) {
+        if (json == null || !StringUtils.hasText(path) || !path.startsWith("$.")) {
+            return null;
+        }
+        try {
+            String pathContent = path.substring(2);
+            String[] parts = pathContent.split("\\.");
+            Object current = json;
+            for (String part : parts) {
+                if (current instanceof JSONObject) {
+                    current = ((JSONObject) current).get(part);
+                } else {
+                    return null;
+                }
+            }
+            return current instanceof JSONObject ? (JSONObject) current : null;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     /**
      * 判断置信度是否满足条件
      * 规则:置信度 == "high"(字符串)或 置信度 > threshold(数值)
@@ -427,10 +463,10 @@ public class VideoVectorJob {
     public ReturnT<String> aigcVideoVectorJob(String param) {
         log.info("开始执行 AIGC 来源视频向量化任务, param: {}", param);
         try {
-            // 1. 获取所有启用的向量化配置
-            List<DeconstructVectorConfig> configs = getEnabledConfigs();
+            // 1. 获取 aigc_deconstruct 专用的向量化配置
+            List<DeconstructVectorConfig> configs = getEnabledConfigsBySourceField("aigc_deconstruct");
             if (CollectionUtils.isEmpty(configs)) {
-                log.warn("未找到启用的向量化配置");
+                log.warn("未找到 aigc_deconstruct 来源的向量化配置");
                 return ReturnT.SUCCESS;
             }
 
@@ -735,6 +771,245 @@ public class VideoVectorJob {
         return passed;
     }
 
+    /**
+     * result_log 来源视频向量化任务
+     * 从 ODPS 查询播放量>10000 且有 result_log 的视频,
+     * 经过审核过滤、已向量化过滤后,查询解构内容并提取文本进行向量化
+     *
+     * @param param 参数
+     * @return 执行结果
+     */
+    @XxlJob("resultLogVideoVectorJob")
+    public ReturnT<String> resultLogVideoVectorJob(String param) {
+        log.info("开始执行 result_log 来源视频向量化任务, param: {}", param);
+
+        try {
+            // 1. 获取 source_field = 'result_log' 的启用配置
+            List<DeconstructVectorConfig> configs = getEnabledConfigsBySourceField("result_log");
+            if (CollectionUtils.isEmpty(configs)) {
+                log.warn("未找到 result_log 来源的向量化配置");
+                return ReturnT.SUCCESS;
+            }
+            log.info("加载 {} 个 result_log 向量化配置", configs.size());
+
+            // 2. 审核清理:每次 Job 执行只做一次
+            for (DeconstructVectorConfig config : configs) {
+                checkAndRemoveNotAuditPassedVideos(config.getConfigCode());
+            }
+            log.info("审核清理完成,开始分页向量化处理");
+
+            int totalSuccessCount = 0;
+            int totalFailCount = 0;
+            int pageNum = 0;
+
+            while (true) {
+                // 3. 分页查询 result_log 视频池中的 videoId
+                List<Long> videoIds = queryResultLogVideoIdsByPage(pageNum, VectorConstants.PAGE_SIZE);
+                if (CollectionUtils.isEmpty(videoIds)) {
+                    log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
+                    break;
+                }
+                log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
+
+                // 4. 对每个配置进行处理
+                for (DeconstructVectorConfig config : configs) {
+                    String configCode = config.getConfigCode();
+
+                    // 4.1 已向量化过滤
+                    Set<Long> existingVideoIds = vectorStoreService.existsByIds(configCode, videoIds);
+                    List<Long> needProcessIds = videoIds.stream()
+                            .filter(id -> !existingVideoIds.contains(id))
+                            .collect(Collectors.toList());
+                    if (needProcessIds.isEmpty()) {
+                        log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
+                        continue;
+                    }
+                    log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
+
+                    // 4.2 审核状态过滤
+                    needProcessIds = filterAuditPassedIds(needProcessIds);
+                    if (needProcessIds.isEmpty()) {
+                        log.info("配置 {} 待处理视频均未通过审核,跳过", configCode);
+                        continue;
+                    }
+                    log.info("配置 {} 审核通过后需处理 {} 个视频", configCode, needProcessIds.size());
+
+                    // 4.3 分批查询 result_log 的 data 字段并向量化
+                    for (List<Long> partition : Lists.partition(needProcessIds, 50)) {
+                        Map<Long, String> videoDataMap = batchQueryResultLogData(partition);
+                        if (videoDataMap.isEmpty()) {
+                            log.warn("配置 {} 未查询到任何 result_log data", configCode);
+                            continue;
+                        }
+
+                        for (Long videoId : partition) {
+                            try {
+                                String data = videoDataMap.get(videoId);
+                                if (!StringUtils.hasText(data)) {
+                                    log.debug("videoId={} result_log data 为空,跳过", videoId);
+                                    totalFailCount++;
+                                    continue;
+                                }
+
+                                // 从 data JSON 中根据配置提取文本
+                                List<String> texts = extractTextsFromResultLogData(data, config);
+                                if (CollectionUtils.isEmpty(texts)) {
+                                    log.debug("videoId={} 配置 {} 未提取到文本,跳过", videoId, configCode);
+                                    totalFailCount++;
+                                    continue;
+                                }
+
+                                // 向量化并存储
+                                int storeCount = vectorizeAndStore(config, videoId, texts);
+                                if (storeCount > 0) {
+                                    totalSuccessCount++;
+                                } else {
+                                    totalFailCount++;
+                                }
+                            } catch (Exception e) {
+                                log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                                totalFailCount++;
+                            }
+                        }
+                    }
+                }
+
+                if (videoIds.size() < VectorConstants.PAGE_SIZE) {
+                    log.info("第 {} 页数据量 {} 小于 PAGE_SIZE {},分页查询结束", pageNum, videoIds.size(), VectorConstants.PAGE_SIZE);
+                    break;
+                }
+                pageNum++;
+            }
+
+            log.info("result_log 来源视频向量化任务完成,总成功: {}, 总失败: {}, 总页数: {}", totalSuccessCount, totalFailCount, pageNum + 1);
+            return ReturnT.SUCCESS;
+        } catch (Exception e) {
+            log.error("result_log 来源视频向量化任务执行失败: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 获取指定 source_field 的启用向量化配置
+     * 用于区分不同内容池的配置
+     *
+     * @param sourceField 来源字段标识(如 result_log)
+     * @return 匹配的启用配置列表
+     */
+    private List<DeconstructVectorConfig> getEnabledConfigsBySourceField(String sourceField) {
+        DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
+        example.createCriteria()
+                .andEnabledEqualTo((short) 1)
+                .andSourceFieldEqualTo(sourceField);
+        example.setOrderByClause("priority ASC");
+        return vectorConfigMapper.selectByExample(example);
+    }
+
+    /**
+     * 分页查询 result_log 视频池中的 videoId
+     * 条件:播放量>10000 且有 result_log 记录(dt > 20240001)
+     *
+     * @param pageNum  页码(从0开始)
+     * @param pageSize 每页数量
+     * @return videoId 列表
+     */
+    private List<Long> queryResultLogVideoIdsByPage(int pageNum, int pageSize) {
+        int offset = pageNum * pageSize;
+        String sql = String.format(
+                "SELECT v.id " +
+                        "FROM videoods.wx_video v " +
+                        "INNER JOIN loghubods.result_log r " +
+                        "ON v.id = r.video_id " +
+                        "WHERE v.play_count_total > 10000 " +
+                        "AND r.dt > 20240001 " +
+                        "ORDER BY v.id " +
+                        "LIMIT %d, %d;",
+                offset, pageSize);
+        List<Record> records = OdpsUtil.getOdpsData(sql);
+        if (records == null || records.isEmpty()) {
+            return new ArrayList<>();
+        }
+        List<Long> videoIds = new ArrayList<>();
+        for (Record record : records) {
+            Long videoId = Long.valueOf(record.getString(0));
+            if (videoId != null) {
+                videoIds.add(videoId);
+            }
+        }
+        return videoIds;
+    }
+
+    /**
+     * 批量查询 result_log 的 data 字段
+     *
+     * @param videoIds 视频ID列表
+     * @return videoId -> data JSON 字符串
+     */
+    private Map<Long, String> batchQueryResultLogData(List<Long> videoIds) {
+        if (CollectionUtils.isEmpty(videoIds)) {
+            return Collections.emptyMap();
+        }
+
+        String idsStr = videoIds.stream()
+                .map(String::valueOf)
+                .collect(Collectors.joining(","));
+
+        String sql = String.format(
+                "SELECT video_id, data " +
+                        "FROM loghubods.result_log " +
+                        "WHERE video_id IN (%s) AND dt > 20240001;",
+                idsStr);
+
+        List<Record> records = OdpsUtil.getOdpsData(sql);
+        if (records == null || records.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Map<Long, String> result = new HashMap<>();
+        for (Record record : records) {
+            Long videoId = Long.valueOf(record.getString(0));
+            String data = record.getString(1);
+            if (videoId != null && data != null) {
+                result.put(videoId, data);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * 从 result_log 的 data 字段中提取文本
+     * data 结构参考 r.json,直接按 sourcePath 提取单点文本值
+     * 与 content_profile 的 raw_result 提取逻辑不同,此处数据结构是扁平的中文 key 嵌套
+     *
+     * @param data   result_log 的 data JSON 字符串
+     * @param config 向量化配置
+     * @return 提取的文本列表
+     */
+    private List<String> extractTextsFromResultLogData(String data, DeconstructVectorConfig config) {
+        List<String> texts = new ArrayList<>();
+
+        try {
+            JSONObject json = JSON.parseObject(data);
+            if (json == null) {
+                return texts;
+            }
+
+            String sourcePath = config.getSourcePath();
+            if (!StringUtils.hasText(sourcePath)) {
+                return texts;
+            }
+
+            // result_log 的 data 结构为扁平中文 key 嵌套(如 $.一、基础信息.内容选题)
+            // 直接使用单点模式提取
+            texts.addAll(VectorUtils.extractFromJson(json, sourcePath));
+
+        } catch (Exception e) {
+            log.error("解析 result_log data 失败: {}", e.getMessage());
+        }
+
+        return texts;
+    }
+
     /**
      * 更新内容状态为失败
      */

+ 2 - 1
core/src/main/java/com/tzld/videoVector/service/impl/MaterialSearchServiceImpl.java

@@ -435,11 +435,12 @@ public class MaterialSearchServiceImpl implements MaterialSearchService {
 
     /**
      * 获取所有启用的向量化配置
+     * 仅返回 source_field = 'result_json' 的素材/视频配置
      */
     private List<DeconstructVectorConfig> getEnabledConfigs() {
         try {
             DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
-            example.createCriteria().andEnabledEqualTo((short) 1);
+            example.createCriteria().andEnabledEqualTo((short) 1).andSourceFieldEqualTo("result_json");
             example.setOrderByClause("priority ASC");
             List<DeconstructVectorConfig> configs = deconstructVectorConfigMapper.selectByExample(example);
             return configs != null ? configs : Collections.emptyList();

+ 2 - 2
core/src/main/java/com/tzld/videoVector/service/impl/RedisVectorStoreServiceImpl.java

@@ -517,9 +517,9 @@ public class RedisVectorStoreServiceImpl implements VectorStoreService {
     private Set<String> getAllConfigCodes() {
         Set<String> configCodes = new HashSet<>();
         try {
-            // 从数据库查询所有启用的配置
+            // 从数据库查询启用的 result_json 配置
             DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
-            example.createCriteria().andEnabledEqualTo((short) 1);
+            example.createCriteria().andEnabledEqualTo((short) 1).andSourceFieldEqualTo("result_json");
             List<DeconstructVectorConfig> configs = vectorConfigMapper.selectByExample(example);
 
             for (DeconstructVectorConfig config : configs) {

+ 2 - 1
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -562,11 +562,12 @@ public class VideoSearchServiceImpl implements VideoSearchService {
 
     /**
      * 获取所有启用的向量化配置
+     * 仅返回 source_field = 'result_json' 的素材/视频配置
      */
     private List<DeconstructVectorConfig> getEnabledConfigs() {
         try {
             DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
-            example.createCriteria().andEnabledEqualTo((short) 1);
+            example.createCriteria().andEnabledEqualTo((short) 1).andSourceFieldEqualTo("result_json");
             example.setOrderByClause("priority ASC");
             List<DeconstructVectorConfig> configs = deconstructVectorConfigMapper.selectByExample(example);
             return configs != null ? configs : Collections.emptyList();

+ 2 - 1
core/src/main/resources/mapper/pgVector/ext/DeconstructVectorConfigMapperExt.xml

@@ -8,12 +8,13 @@
     create_time, update_time
   </sql>
 
-  <!-- 自定义:按业务类型和内容类型查询启用的向量配置 -->
+  <!-- 自定义:按业务类型和内容类型查询启用的向量配置(仅返回 source_field = 'result_json' 的配置) -->
   <select id="selectMatchingConfigs" resultMap="com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper.BaseResultMap">
     SELECT
     <include refid="Base_Column_List" />
     FROM deconstruct_vector_config
     WHERE enabled = 1
+    AND source_field = 'result_json'
     <if test="bizType != null">
       AND (biz_type IS NULL OR biz_type = #{bizType,jdbcType=SMALLINT})
     </if>