Преглед изворни кода

新增AIGC任务视频向量化功能

wangyunpeng пре 1 недеља
родитељ
комит
a909594c4e

+ 186 - 0
core/src/main/java/com/tzld/videoVector/api/AigcApiService.java

@@ -0,0 +1,186 @@
+package com.tzld.videoVector.api;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.*;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AIGC API 服务
+ * 封装 AIGC 平台任务输入列表和回调详情两个接口
+ */
+@Slf4j
+@Service
+public class AigcApiService {
+
+    private OkHttpClient client;
+
+    /**
+     * 任务输入列表接口
+     */
+    private static final String INPUT_LIST_URL = "http://aigc-api.cybertogether.net/aigc/api/task/input-usage/list";
+
+    /**
+     * 任务回调详情接口
+     */
+    private static final String CALLBACK_DETAIL_URL = "http://aigc-api.cybertogether.net/aigc/api/task/callback-data";
+
+    private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
+
+    @Value("${aigc.api.timeout:30}")
+    private int timeout;
+
+    @PostConstruct
+    public void init() {
+        client = new OkHttpClient.Builder()
+                .connectTimeout(timeout, TimeUnit.SECONDS)
+                .readTimeout(timeout, TimeUnit.SECONDS)
+                .writeTimeout(timeout, TimeUnit.SECONDS)
+                .build();
+        log.info("AIGC API 服务初始化完成");
+    }
+
+    /**
+     * 获取任务输入列表
+     * 请求 POST /aigc/api/task/input-usage/list,返回 bizUniqueId(视频ID)与 taskInstanceId 的映射关系
+     *
+     * @param id 任务 ID
+     * @return 任务输入列表,失败时返回空列表
+     */
+    public List<AigcTaskInput> getTaskInputList(int id) {
+        try {
+            JSONObject params = new JSONObject();
+            params.put("id", id);
+            JSONObject reqBody = new JSONObject();
+            reqBody.put("params", params);
+
+            RequestBody body = RequestBody.create(JSON_MEDIA_TYPE, reqBody.toJSONString());
+            Request request = new Request.Builder()
+                    .url(INPUT_LIST_URL)
+                    .post(body)
+                    .addHeader("Content-Type", "application/json")
+                    .build();
+
+            try (Response response = client.newCall(request).execute()) {
+                if (!response.isSuccessful()) {
+                    log.error("获取任务输入列表请求失败,HTTP状态码: {}, id: {}", response.code(), id);
+                    return Collections.emptyList();
+                }
+                String respStr = response.body().string();
+                JSONObject res = JSON.parseObject(respStr);
+                if (res == null || res.getIntValue("code") != 0) {
+                    log.error("获取任务输入列表响应异常,id: {}, resp: {}", id, respStr);
+                    return Collections.emptyList();
+                }
+                JSONArray data = res.getJSONArray("data");
+                if (data == null || data.isEmpty()) {
+                    log.info("任务输入列表为空,id: {}", id);
+                    return Collections.emptyList();
+                }
+                List<AigcTaskInput> list = new ArrayList<>();
+                for (int i = 0; i < data.size(); i++) {
+                    JSONObject item = data.getJSONObject(i);
+                    if (item == null) {
+                        continue;
+                    }
+                    Long taskInstanceId = item.getLong("taskInstanceId");
+                    String bizUniqueId = item.getString("bizUniqueId");
+                    if (taskInstanceId == null || bizUniqueId == null) {
+                        continue;
+                    }
+                    AigcTaskInput input = new AigcTaskInput();
+                    input.setTaskInstanceId(taskInstanceId);
+                    input.setBizUniqueId(bizUniqueId);
+                    list.add(input);
+                }
+                log.info("获取到 {} 条任务输入数据,id: {}", list.size(), id);
+                return list;
+            }
+        } catch (IOException e) {
+            log.error("获取任务输入列表异常,id: {}, error: {}", id, e.getMessage(), e);
+            return Collections.emptyList();
+        }
+    }
+
+    /**
+     * 获取任务回调详情,解析并返回 dataContent 字段的 JSONObject
+     * 请求 POST /aigc/api/task/callback-data/detail
+     *
+     * @param taskInstanceId 任务实例 ID
+     * @return dataContent 解析后的 JSONObject,失败时返回 null
+     */
+    public JSONObject getTaskCallbackDetail(long taskInstanceId) {
+        try {
+            JSONObject params = new JSONObject();
+            params.put("id", taskInstanceId);
+            JSONObject reqBody = new JSONObject();
+            reqBody.put("params", params);
+
+            RequestBody body = RequestBody.create(JSON_MEDIA_TYPE, reqBody.toJSONString());
+            Request request = new Request.Builder()
+                    .url(CALLBACK_DETAIL_URL)
+                    .post(body)
+                    .addHeader("Content-Type", "application/json")
+                    .build();
+
+            try (Response response = client.newCall(request).execute()) {
+                if (!response.isSuccessful()) {
+                    log.error("获取任务回调详情请求失败,HTTP状态码: {}, taskInstanceId: {}", response.code(), taskInstanceId);
+                    return null;
+                }
+                String respStr = response.body().string();
+                JSONObject res = JSON.parseObject(respStr);
+                if (res == null || res.getIntValue("code") != 0) {
+                    log.error("获取任务回调详情响应异常,taskInstanceId: {}, resp: {}", taskInstanceId, respStr);
+                    return null;
+                }
+                JSONArray dataArray = res.getJSONArray("data");
+                if (dataArray == null || dataArray.isEmpty()) {
+                    log.warn("任务回调详情 data 为空,taskInstanceId: {}", taskInstanceId);
+                    return null;
+                }
+                // 取第一条记录
+                JSONObject data = dataArray.getJSONObject(0);
+                if (data == null) {
+                    log.warn("任务回调详情 data[0] 为空,taskInstanceId: {}", taskInstanceId);
+                    return null;
+                }
+                String dataContent = data.getString("dataContent");
+                if (dataContent == null || dataContent.trim().isEmpty()) {
+                    log.warn("taskInstanceId: {} 的 dataContent 为空", taskInstanceId);
+                    return null;
+                }
+                return JSON.parseObject(dataContent);
+            }
+        } catch (IOException e) {
+            log.error("获取任务回调详情异常,taskInstanceId: {}, error: {}", taskInstanceId, e.getMessage(), e);
+            return null;
+        }
+    }
+
+    /**
+     * AIGC 任务输入数据
+     */
+    @Data
+    public static class AigcTaskInput {
+        /**
+         * 任务实例 ID,用于调用 detail 接口
+         */
+        private Long taskInstanceId;
+        /**
+         * 业务唯一 ID,即视频 ID
+         */
+        private String bizUniqueId;
+    }
+}

+ 137 - 0
core/src/main/java/com/tzld/videoVector/job/VideoVectorJob.java

@@ -12,6 +12,7 @@ import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContent;
 import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructContentExample;
 import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfig;
 import com.tzld.videoVector.model.po.videoVector.deconstruct.DeconstructVectorConfigExample;
+import com.tzld.videoVector.api.AigcApiService;
 import com.tzld.videoVector.api.VideoApiService;
 import com.tzld.videoVector.service.DeconstructService;
 import com.tzld.videoVector.service.EmbeddingService;
@@ -51,6 +52,9 @@ public class VideoVectorJob {
     @Resource
     private VideoApiService videoApiService;
 
+    @Resource
+    private AigcApiService aigcApiService;
+
     /**
      * 每页查询数量
      */
@@ -434,6 +438,139 @@ public class VideoVectorJob {
         return videoIds;
     }
 
+    /**
+     * AIGC 来源视频向量化任务
+     * 从 AIGC API(id=46)拉取视频列表,经过向量存在性检查和审核过滤后,
+     * 调用 detail 接口获取解构详情(dataContent),提取选题文本并向量化写入 Redis
+     *
+     * @param param 参数
+     * @return 执行结果
+     */
+    @XxlJob("aigcVideoVectorJob")
+    public ReturnT<String> aigcVideoVectorJob(String param) {
+        log.info("开始执行 AIGC 来源视频向量化任务, param: {}", param);
+        try {
+            // 1. 获取所有启用的向量化配置
+            List<DeconstructVectorConfig> configs = getEnabledConfigs();
+            if (CollectionUtils.isEmpty(configs)) {
+                log.warn("未找到启用的向量化配置");
+                return ReturnT.SUCCESS;
+            }
+
+            // 2. 从 AIGC API 获取任务输入列表(bizUniqueId 为视频 ID)
+            List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(46);
+            if (CollectionUtils.isEmpty(taskInputList)) {
+                log.info("AIGC API 未返回任务输入数据");
+                return ReturnT.SUCCESS;
+            }
+            log.info("获取到 {} 条 AIGC 任务输入数据", taskInputList.size());
+
+            // 3. 构建 videoId -> taskInstanceId 映射
+            Map<Long, Long> videoIdToTaskInstanceId = new HashMap<>();
+            for (AigcApiService.AigcTaskInput input : taskInputList) {
+                try {
+                    Long videoId = Long.parseLong(input.getBizUniqueId());
+                    videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
+                } catch (NumberFormatException e) {
+                    log.warn("bizUniqueId 格式非法,跳过: {}", input.getBizUniqueId());
+                }
+            }
+            if (videoIdToTaskInstanceId.isEmpty()) {
+                log.info("无有效 videoId,任务结束");
+                return ReturnT.SUCCESS;
+            }
+            List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
+            log.info("共 {} 个有效 videoId", allVideoIds.size());
+
+            int totalSuccessCount = 0;
+            int totalFailCount = 0;
+
+            // 4. 对每个配置进行处理
+            for (DeconstructVectorConfig config : configs) {
+                String configCode = config.getConfigCode();
+
+                // 4.1 查询该配置下已有向量的 videoId,排除已处理过的
+                Set<Long> existingIds = vectorStoreService.existsByIds(configCode, allVideoIds);
+                List<Long> needProcessIds = allVideoIds.stream()
+                        .filter(id -> !existingIds.contains(id))
+                        .collect(Collectors.toList());
+                if (needProcessIds.isEmpty()) {
+                    log.debug("配置 {} 下所有视频已有向量,跳过", configCode);
+                    continue;
+                }
+                log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
+
+                // 4.2 审核状态过滤:排除审核未通过的视频
+                needProcessIds = filterAuditPassedIds(needProcessIds);
+                if (needProcessIds.isEmpty()) {
+                    log.info("配置 {} 待处理视频均未通过审核,跳过", configCode);
+                    continue;
+                }
+                log.info("配置 {} 审核通过后需处理 {} 个视频", configCode, needProcessIds.size());
+
+                // 4.3 逐个调用 detail 接口,提取选题并向量化存储
+                for (Long videoId : needProcessIds) {
+                    try {
+                        Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
+                        if (taskInstanceId == null) {
+                            log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
+                            totalFailCount++;
+                            continue;
+                        }
+
+                        // 调用 detail 接口获取 dataContent(解构详情)
+                        JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
+                        if (dataContent == null) {
+                            log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
+                            totalFailCount++;
+                            continue;
+                        }
+
+                        // 从 dataContent 中提取选题文本
+                        List<String> texts = extractTopicFromDataContent(dataContent);
+                        if (CollectionUtils.isEmpty(texts)) {
+                            log.debug("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
+                            totalFailCount++;
+                            continue;
+                        }
+
+                        // 向量化并写入 Redis
+                        boolean success = vectorizeAndStore(config, videoId, texts);
+                        if (success) {
+                            totalSuccessCount++;
+                        } else {
+                            totalFailCount++;
+                        }
+                    } catch (Exception e) {
+                        log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
+                        totalFailCount++;
+                    }
+                }
+            }
+
+            log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount, totalFailCount);
+            return ReturnT.SUCCESS;
+        } catch (Exception e) {
+            log.error("AIGC 来源视频向量化任务执行失败: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 从 dataContent 中提取选题文本
+     * 默认复用配置的 sourcePath 提取逻辑
+     *
+     * @param dataContent dataContent 解析后的 JSONObject
+     * @return 提取的文本列表
+     */
+    private List<String> extractTopicFromDataContent(JSONObject dataContent) {
+        if (dataContent == null) {
+            return Collections.emptyList();
+        }
+        String sourcePath = "$.最终选题.选题";
+        return extractFromJson(dataContent, sourcePath);
+    }
+
     /**
      * 重试超时的解构任务
      * 检查创建超过一小时,状态不是成功或失败的内容重新查询解构结果

+ 43 - 0
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -394,6 +394,44 @@ public class VideoSearchServiceImpl implements VideoSearchService {
         }
     }
 
+    /**
+     * 若内容已解构完成(status==2)但向量表缺少对应 configCode 的向量,则同步触发向量化
+     */
+    private void triggerVectorizeIfNeeded(String channelContentId, String configCode) {
+        try {
+            // 查询解构内容(含 BLOB 字段,vectorizeContent 需要 resultJson/bodyText 等)
+            DeconstructContentExample example = new DeconstructContentExample();
+            example.createCriteria().andChannelContentIdEqualTo(channelContentId);
+            example.setOrderByClause("id DESC");
+            List<DeconstructContent> list = deconstructContentMapper.selectByExampleWithBLOBs(example);
+            if (list == null || list.isEmpty()) {
+                log.debug("triggerVectorizeIfNeeded: 未找到 channelContentId={} 的解构记录", channelContentId);
+                return;
+            }
+            DeconstructContent content = list.get(0);
+            // 仅对已解构完成(status == 2)的内容触发向量化
+            if (content.getStatus() == null || content.getStatus() != 2) {
+                log.debug("triggerVectorizeIfNeeded: channelContentId={} 解构未完成,status={}",
+                        channelContentId, content.getStatus());
+                return;
+            }
+            // 检查向量表是否已有该 configCode 的记录
+            List<DeconstructContentVector> vectors =
+                    vectorizeService.getVectorsByContentId(content.getId(), configCode);
+            if (vectors != null && !vectors.isEmpty()) {
+                log.debug("triggerVectorizeIfNeeded: channelContentId={} 已有 configCode={} 向量,无需触发",
+                        channelContentId, configCode);
+                return;
+            }
+            log.info("内容已解构完成但向量表无对应 configCode 向量,触发向量化,channelContentId={}, contentId={}, configCode={}",
+                    channelContentId, content.getId(), configCode);
+            vectorizeService.vectorizeContent(content);
+        } catch (Exception e) {
+            log.error("检查并触发向量化失败,channelContentId={}, configCode={}, error={}",
+                    channelContentId, configCode, e.getMessage(), e);
+        }
+    }
+
     /**
      * 异步向量化内容,使用线程池执行,避免阻塞接口响应
      */
@@ -423,6 +461,11 @@ public class VideoSearchServiceImpl implements VideoSearchService {
             configCode = DEFAULT_CONFIG_CODE;
         }
 
+        // 若提供了 channelContentId,且内容已解构完成但向量表缺少对应 configCode 的向量,则异步触发向量化
+        if (StringUtils.hasText(param.getChannelContentId())) {
+            triggerVectorizeIfNeeded(param.getChannelContentId(), configCode);
+        }
+
         // 确定查询向量:直接传入 > channelContentId历史向量 > text_hash历史embedding > 文本向量化
         List<Float> queryVector = resolveQueryVector(param);
         if (queryVector == null || queryVector.isEmpty()) {

+ 6 - 0
server/src/main/java/com/tzld/videoVector/controller/XxlJobController.java

@@ -20,6 +20,12 @@ public class XxlJobController {
         return CommonResponse.success();
     }
 
+    @GetMapping("/aigcVideoVectorJob")
+    public CommonResponse<Void> aigcVideoVectorJob() {
+        videoVectorJob.aigcVideoVectorJob(null);
+        return CommonResponse.success();
+    }
+
     @GetMapping("/retryDeconstructJob")
     public CommonResponse<Void> retryDeconstructJob() {
         videoVectorJob.retryDeconstructJob(null);