|
@@ -0,0 +1,487 @@
|
|
|
|
|
+package com.tzld.videoVector.job;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
|
|
+import com.tzld.videoVector.api.AigcApiService;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoVectorMapperExt;
|
|
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
|
|
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
|
|
+import com.tzld.videoVector.model.po.pgVector.VideoVector;
|
|
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
|
|
+import com.tzld.videoVector.util.VectorUtils;
|
|
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 历史数据 text 字段补充任务
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 扫描 video_vectors 中的现有数据,根据其 config_code 对应的配置,
|
|
|
|
|
+ * 从原始数据源(content_profile / result_log / aigc_deconstruct)中提取文本,
|
|
|
|
|
+ * 回填到 video_vectors.text 字段。
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 不检查数据状态,只要有数据就根据 config 提取对应数据。
|
|
|
|
|
+ */
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+public class VideoVectorTextBackfillJob {
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private VideoVectorMapperExt videoVectorMapperExt;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private DeconstructVectorConfigMapper vectorConfigMapper;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private AigcApiService aigcApiService;
|
|
|
|
|
+
|
|
|
|
|
+ /** 每批处理数量 */
|
|
|
|
|
+ private static final int BATCH_SIZE = 500;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 历史数据 text 字段补充
|
|
|
|
|
+ * 遍历所有启用的配置,对 video_vectors 中已有记录补充 text 字段
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param param 参数(可选:指定 configCode 只处理特定配置)
|
|
|
|
|
+ * @return 执行结果
|
|
|
|
|
+ */
|
|
|
|
|
+ @XxlJob("videoVectorTextBackfillJob")
|
|
|
|
|
+ public ReturnT<String> videoVectorTextBackfillJob(String param) {
|
|
|
|
|
+ log.info("开始执行历史数据 text 字段补充任务, param: {}", param);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 获取所有启用的配置
|
|
|
|
|
+ List<DeconstructVectorConfig> configs = getAllEnabledConfigs();
|
|
|
|
|
+ if (CollectionUtils.isEmpty(configs)) {
|
|
|
|
|
+ log.warn("未找到启用的向量化配置");
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果指定了 configCode,只处理特定配置
|
|
|
|
|
+ if (StringUtils.hasText(param)) {
|
|
|
|
|
+ configs = configs.stream()
|
|
|
|
|
+ .filter(c -> param.equals(c.getConfigCode()))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ if (configs.isEmpty()) {
|
|
|
|
|
+ log.warn("未找到 configCode={} 的启用配置", param);
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("共加载 {} 个向量化配置", configs.size());
|
|
|
|
|
+
|
|
|
|
|
+ int totalUpdateCount = 0;
|
|
|
|
|
+ int totalSkipCount = 0;
|
|
|
|
|
+ int totalFailCount = 0;
|
|
|
|
|
+
|
|
|
|
|
+ // 逐个配置处理
|
|
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
|
|
+ String sourceField = config.getSourceField();
|
|
|
|
|
+ log.info("开始处理配置: configCode={}, sourceField={}", configCode, sourceField);
|
|
|
|
|
+
|
|
|
|
|
+ int configUpdateCount = 0;
|
|
|
|
|
+ int configSkipCount = 0;
|
|
|
|
|
+ int configFailCount = 0;
|
|
|
|
|
+ int offset = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ // 分页查询该配置下的记录
|
|
|
|
|
+ List<VideoVector> vectors = videoVectorMapperExt.selectByConfigCodePaged(configCode, offset, BATCH_SIZE);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(vectors)) {
|
|
|
|
|
+ log.info("配置 {} 第 {} 偏移无数据,处理结束", configCode, offset);
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 按 sourceField 类型分发处理
|
|
|
|
|
+ int[] counts = processVectorBatch(vectors, config);
|
|
|
|
|
+ configUpdateCount += counts[0];
|
|
|
|
|
+ configSkipCount += counts[1];
|
|
|
|
|
+ configFailCount += counts[2];
|
|
|
|
|
+
|
|
|
|
|
+ if (vectors.size() < BATCH_SIZE) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ offset += BATCH_SIZE;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("配置 {} 处理完成,更新: {}, 跳过: {}, 失败: {}",
|
|
|
|
|
+ configCode, configUpdateCount, configSkipCount, configFailCount);
|
|
|
|
|
+ totalUpdateCount += configUpdateCount;
|
|
|
|
|
+ totalSkipCount += configSkipCount;
|
|
|
|
|
+ totalFailCount += configFailCount;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("历史数据 text 字段补充任务完成,总更新: {}, 总跳过: {}, 总失败: {}",
|
|
|
|
|
+ totalUpdateCount, totalSkipCount, totalFailCount);
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("历史数据 text 字段补充任务执行失败: {}", e.getMessage(), e);
|
|
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理一批 VideoVector 记录
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 多点模式处理策略:
|
|
|
|
|
+ * 原始向量化时 texts[i] 对应 point_index=i,回填时保持相同映射。
|
|
|
|
|
+ * 由于置信度过滤可能导致重新提取的文本列表与原始不同,
|
|
|
|
|
+ * 当 point_index 超出当前提取的 texts 范围时直接跳过,避免错误匹配。
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 单点模式(point_index=0):直接取第一个有效文本。
|
|
|
|
|
+ *
|
|
|
|
|
+ * @return int[]{更新数, 跳过数, 失败数}
|
|
|
|
|
+ */
|
|
|
|
|
+ private int[] processVectorBatch(List<VideoVector> vectors, DeconstructVectorConfig config) {
|
|
|
|
|
+ String sourceField = config.getSourceField();
|
|
|
|
|
+ boolean multiPoint = VectorUtils.isMultiPointConfig(config);
|
|
|
|
|
+ AtomicInteger updateCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger skipCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
|
|
+
|
|
|
|
|
+ // 收集需要处理的 videoId(去重)
|
|
|
|
|
+ List<Long> videoIds = vectors.stream()
|
|
|
|
|
+ .map(VideoVector::getVideoId)
|
|
|
|
|
+ .distinct()
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+ // 根据 sourceField 获取原始数据
|
|
|
|
|
+ Map<Long, String> rawDataMap;
|
|
|
|
|
+ switch (sourceField) {
|
|
|
|
|
+ case "result_json":
|
|
|
|
|
+ rawDataMap = batchQueryVideoRawResults(videoIds);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case "result_log":
|
|
|
|
|
+ rawDataMap = batchQueryResultLogData(videoIds);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case "aigc_deconstruct":
|
|
|
|
|
+ rawDataMap = batchQueryAigcData(videoIds);
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ log.warn("不支持的 sourceField: {}", sourceField);
|
|
|
|
|
+ return new int[]{0, vectors.size(), 0};
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 缓存每个 videoId 提取的文本列表,避免对同一 videoId 重复解析
|
|
|
|
|
+ Map<Long, List<String>> textsCache = new HashMap<>();
|
|
|
|
|
+
|
|
|
|
|
+ // 遍历每条记录,提取文本并更新
|
|
|
|
|
+ for (VideoVector vector : vectors) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long videoId = vector.getVideoId();
|
|
|
|
|
+ String rawData = rawDataMap.get(videoId);
|
|
|
|
|
+ if (!StringUtils.hasText(rawData)) {
|
|
|
|
|
+ skipCount.incrementAndGet();
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 根据配置提取文本(相同 videoId 复用缓存)
|
|
|
|
|
+ List<String> texts = textsCache.computeIfAbsent(videoId,
|
|
|
|
|
+ id -> extractTexts(rawData, config));
|
|
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
+ skipCount.incrementAndGet();
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 根据 pointIndex 取对应的文本
|
|
|
|
|
+ int pointIndex = vector.getPointIndex() != null ? vector.getPointIndex() : 0;
|
|
|
|
|
+ String text;
|
|
|
|
|
+
|
|
|
|
|
+ if (multiPoint) {
|
|
|
|
|
+ // 多点模式:严格按 point_index 映射,超出范围则跳过
|
|
|
|
|
+ if (pointIndex < texts.size()) {
|
|
|
|
|
+ text = texts.get(pointIndex);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.debug("videoId={} point_index={} 超出提取文本范围(size={}),跳过",
|
|
|
|
|
+ videoId, pointIndex, texts.size());
|
|
|
|
|
+ skipCount.incrementAndGet();
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 单点模式:取第一个有效文本
|
|
|
|
|
+ text = texts.stream()
|
|
|
|
|
+ .filter(StringUtils::hasText)
|
|
|
|
|
+ .findFirst()
|
|
|
|
|
+ .orElse(null);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (!StringUtils.hasText(text)) {
|
|
|
|
|
+ skipCount.incrementAndGet();
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 截断文本
|
|
|
|
|
+ Integer maxLength = config.getMaxLength();
|
|
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 更新 text 字段
|
|
|
|
|
+ videoVectorMapperExt.updateTextById(vector.getId(), text);
|
|
|
|
|
+ updateCount.incrementAndGet();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 videoVector id={} videoId={} 时异常: {}",
|
|
|
|
|
+ vector.getId(), vector.getVideoId(), e.getMessage());
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return new int[]{updateCount.get(), skipCount.get(), failCount.get()};
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 根据配置从原始数据中提取文本
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> extractTexts(String rawData, DeconstructVectorConfig config) {
|
|
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject json = JSON.parseObject(rawData);
|
|
|
|
|
+ if (json == null) {
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sourcePath = config.getSourcePath();
|
|
|
|
|
+ if (!StringUtils.hasText(sourcePath)) {
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String extractRule = config.getExtractRule();
|
|
|
|
|
+ if (StringUtils.hasText(extractRule)) {
|
|
|
|
|
+ // 多点模式:带置信度过滤
|
|
|
|
|
+ texts.addAll(extractTextsWithConfidence(json, sourcePath, extractRule));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 单点模式:直接提取
|
|
|
|
|
+ texts.addAll(VectorUtils.extractFromJson(json, sourcePath));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解析原始数据失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 带置信度过滤的文本提取
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<String> extractTextsWithConfidence(JSONObject json, String sourcePath, String extractRule) {
|
|
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject rule = JSON.parseObject(extractRule);
|
|
|
|
|
+ String textField = rule.getString("text_field");
|
|
|
|
|
+ String confidenceField = rule.getString("confidence_field");
|
|
|
|
|
+ double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
|
|
+
|
|
|
|
|
+ if (!StringUtils.hasText(textField) || !StringUtils.hasText(confidenceField)) {
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (sourcePath.endsWith("[*]")) {
|
|
|
|
|
+ // 数组路径模式
|
|
|
|
|
+ String arrayPath = sourcePath.substring(0, sourcePath.length() - 3);
|
|
|
|
|
+ Object arrayObj = navigateToValue(json, arrayPath);
|
|
|
|
|
+ if (arrayObj instanceof JSONArray) {
|
|
|
|
|
+ JSONArray array = (JSONArray) arrayObj;
|
|
|
|
|
+ for (int i = 0; i < array.size(); i++) {
|
|
|
|
|
+ JSONObject item = array.getJSONObject(i);
|
|
|
|
|
+ if (item != null && isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
|
|
|
|
|
+ String text = item.getString(textField);
|
|
|
|
|
+ if (StringUtils.hasText(text)) {
|
|
|
|
|
+ texts.add(text);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 单对象路径模式
|
|
|
|
|
+ Object obj = navigateToValue(json, sourcePath);
|
|
|
|
|
+ if (obj instanceof JSONObject) {
|
|
|
|
|
+ JSONObject item = (JSONObject) obj;
|
|
|
|
|
+ if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
|
|
|
|
|
+ String text = item.getString(textField);
|
|
|
|
|
+ if (StringUtils.hasText(text)) {
|
|
|
|
|
+ texts.add(text);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("置信度提取失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ return texts;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 根据 JSON 路径导航到目标值
|
|
|
|
|
+ */
|
|
|
|
|
+ private Object navigateToValue(JSONObject json, String path) {
|
|
|
|
|
+ if (json == null || !StringUtils.hasText(path) || !path.startsWith("$.")) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ String pathContent = path.substring(2);
|
|
|
|
|
+ String[] parts = pathContent.split("\\.");
|
|
|
|
|
+ Object current = json;
|
|
|
|
|
+ for (String part : parts) {
|
|
|
|
|
+ if (current instanceof JSONObject) {
|
|
|
|
|
+ current = ((JSONObject) current).get(part);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return current;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 判断置信度是否满足条件
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean isConfidenceQualified(JSONObject item, String confidenceField, double threshold) {
|
|
|
|
|
+ Object value = item.get(confidenceField);
|
|
|
|
|
+ if (value == null) {
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (value instanceof String) {
|
|
|
|
|
+ return "high".equalsIgnoreCase((String) value);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (value instanceof Number) {
|
|
|
|
|
+ return ((Number) value).doubleValue() >= threshold;
|
|
|
|
|
+ }
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ========================== 数据源查询方法 ==========================
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量查询 content_profile 的 raw_result
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<Long, String> batchQueryVideoRawResults(List<Long> videoIds) {
|
|
|
|
|
+ Map<Long, String> result = new HashMap<>();
|
|
|
|
|
+ for (List<Long> partition : Lists.partition(videoIds, 200)) {
|
|
|
|
|
+ String idsStr = partition.stream()
|
|
|
|
|
+ .map(String::valueOf)
|
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT content_id, raw_result " +
|
|
|
|
|
+ "FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE content_id IN (%s);",
|
|
|
|
|
+ idsStr);
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records != null) {
|
|
|
|
|
+ for (Record record : records) {
|
|
|
|
|
+ Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
+ String rawResult = record.getString(1);
|
|
|
|
|
+ if (videoId != null && rawResult != null) {
|
|
|
|
|
+ result.put(videoId, rawResult);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("查询 content_profile raw_result 失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量查询 result_log 的 data 字段
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<Long, String> batchQueryResultLogData(List<Long> videoIds) {
|
|
|
|
|
+ Map<Long, String> result = new HashMap<>();
|
|
|
|
|
+ for (List<Long> partition : Lists.partition(videoIds, 200)) {
|
|
|
|
|
+ String idsStr = partition.stream()
|
|
|
|
|
+ .map(String::valueOf)
|
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id, data " +
|
|
|
|
|
+ "FROM loghubods.result_log " +
|
|
|
|
|
+ "WHERE video_id IN (%s) AND dt > 20240001;",
|
|
|
|
|
+ idsStr);
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records != null) {
|
|
|
|
|
+ for (Record record : records) {
|
|
|
|
|
+ Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
+ String data = record.getString(1);
|
|
|
|
|
+ if (videoId != null && data != null) {
|
|
|
|
|
+ result.put(videoId, data);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("查询 result_log data 失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量查询 AIGC 解构数据
|
|
|
|
|
+ * 通过 AIGC API 获取 dataContent
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<Long, String> batchQueryAigcData(List<Long> videoIds) {
|
|
|
|
|
+ Map<Long, String> result = new HashMap<>();
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 获取 AIGC 任务输入列表
|
|
|
|
|
+ List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(46);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 构建 videoId -> taskInstanceId 映射
|
|
|
|
|
+ Map<Long, Long> videoIdToTaskInstanceId = new HashMap<>();
|
|
|
|
|
+ for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long videoId = Long.parseLong(input.getBizUniqueId());
|
|
|
|
|
+ videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ // 忽略格式非法的
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 对需要的 videoId 查询 dataContent
|
|
|
|
|
+ Set<Long> targetIds = new HashSet<>(videoIds);
|
|
|
|
|
+ for (Map.Entry<Long, Long> entry : videoIdToTaskInstanceId.entrySet()) {
|
|
|
|
|
+ Long videoId = entry.getKey();
|
|
|
|
|
+ if (!targetIds.contains(videoId)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(entry.getValue());
|
|
|
|
|
+ if (dataContent != null) {
|
|
|
|
|
+ result.put(videoId, dataContent.toJSONString());
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("查询 AIGC dataContent 失败, videoId={}: {}", videoId, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批量查询 AIGC 数据失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取所有启用的向量化配置
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<DeconstructVectorConfig> getAllEnabledConfigs() {
|
|
|
|
|
+ DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
|
|
+ example.createCriteria().andEnabledEqualTo((short) 1);
|
|
|
|
|
+ example.setOrderByClause("priority ASC");
|
|
|
|
|
+ return vectorConfigMapper.selectByExample(example);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|