|
@@ -0,0 +1,216 @@
|
|
|
|
|
+package com.tzld.videoVector.job;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoVectorMapperExt;
|
|
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
|
|
+import com.tzld.videoVector.util.RedisUtils;
|
|
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 视频AI理解信息同步任务
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 功能:
|
|
|
|
|
+ * 1. 查询 video_vectors 表中所有 video_id(去重)
|
|
|
|
|
+ * 2. 分批从 ODPS result_log 获取老解构结果(r.json 格式)
|
|
|
|
|
+ * 3. 提取"内容选题"、"视频主题"、"视频关键词"、"视频口播"
|
|
|
|
|
+ * 4. 写入 Redis 缓存(key: video:ai_understanding:{videoId})
|
|
|
|
|
+ * 5. 清除逻辑:先获取上次缓存的ID集合,本次同步后删除不再存在的旧ID缓存
|
|
|
|
|
+ */
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+public class AiUnderstandingSyncJob {
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private VideoVectorMapperExt videoVectorMapperExt;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private RedisUtils redisUtils;
|
|
|
|
|
+
|
|
|
|
|
+ /** ODPS SQL IN 子句批次大小 */
|
|
|
|
|
+ private static final int ODPS_BATCH_SIZE = 1000;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步视频AI理解信息到 Redis
|
|
|
|
|
+ * XxlJob handler: syncAiUnderstandingJob
|
|
|
|
|
+ */
|
|
|
|
|
+ @XxlJob("syncAiUnderstandingJob")
|
|
|
|
|
+ public ReturnT<String> syncAiUnderstandingJob(String param) {
|
|
|
|
|
+ log.info("开始执行视频AI理解信息同步任务, param: {}", param);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 查询 video_vectors 表中所有不重复的 video_id
|
|
|
|
|
+ List<Long> allVideoIds = videoVectorMapperExt.selectAllDistinctVideoIds();
|
|
|
|
|
+ if (CollectionUtils.isEmpty(allVideoIds)) {
|
|
|
|
|
+ log.info("video_vectors 表中无数据,跳过");
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("查询到 {} 个不重复的 video_id", allVideoIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 获取上次缓存的 videoId 集合(用于清除逻辑)
|
|
|
|
|
+ Set<String> previousCachedIds = redisUtils.sMembers(VectorConstants.AI_OLD_UNDERSTANDING_IDS_SET_KEY);
|
|
|
|
|
+ if (previousCachedIds == null) {
|
|
|
|
|
+ previousCachedIds = Collections.emptySet();
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("上次缓存的 AI 理解 videoId 数量: {}", previousCachedIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 分批查询 result_log 并写入 Redis
|
|
|
|
|
+ AtomicInteger totalSuccess = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger totalFail = new AtomicInteger(0);
|
|
|
|
|
+ Set<String> currentCachedIds = Collections.synchronizedSet(new HashSet<>());
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < allVideoIds.size(); i += ODPS_BATCH_SIZE) {
|
|
|
|
|
+ int end = Math.min(i + ODPS_BATCH_SIZE, allVideoIds.size());
|
|
|
|
|
+ List<Long> batchIds = allVideoIds.subList(i, end);
|
|
|
|
|
+ processBatch(batchIds, totalSuccess, totalFail, currentCachedIds, i, end);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 清除逻辑:删除本次不再存在的旧缓存
|
|
|
|
|
+ Set<String> toRemove = new HashSet<>(previousCachedIds);
|
|
|
|
|
+ toRemove.removeAll(currentCachedIds);
|
|
|
|
|
+ if (!toRemove.isEmpty()) {
|
|
|
|
|
+ log.info("清除 {} 个不再存在的AI理解缓存", toRemove.size());
|
|
|
|
|
+ for (String oldId : toRemove) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ redisUtils.del(VectorConstants.AI_OLD_UNDERSTANDING_KEY_PREFIX + oldId);
|
|
|
|
|
+ redisUtils.sRemove(VectorConstants.AI_OLD_UNDERSTANDING_IDS_SET_KEY, oldId);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("清除旧缓存失败, videoId={}: {}", oldId, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("视频AI理解信息同步任务完成,总成功: {}, 总失败: {}, 清除旧缓存: {}",
|
|
|
|
|
+ totalSuccess.get(), totalFail.get(), toRemove.size());
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("视频AI理解信息同步任务执行失败: {}", e.getMessage(), e);
|
|
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理单批次:查询 ODPS result_log 并写入 Redis
|
|
|
|
|
+ */
|
|
|
|
|
+ private void processBatch(List<Long> batchIds, AtomicInteger totalSuccess, AtomicInteger totalFail,
|
|
|
|
|
+ Set<String> currentCachedIds, int batchStart, int batchEnd) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String idsStr = batchIds.stream()
|
|
|
|
|
+ .map(String::valueOf)
|
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
|
+
|
|
|
|
|
+ // 查询 result_log(取最新一条数据)
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
|
|
+ "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String videoIdStr = record.getString("video_id");
|
|
|
|
|
+ if (!StringUtils.hasText(videoIdStr)) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ Long videoId = Long.parseLong(videoIdStr.trim());
|
|
|
|
|
+
|
|
|
|
|
+ String data = record.getString("data");
|
|
|
|
|
+ if (!StringUtils.hasText(data)) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 解析并提取字段
|
|
|
|
|
+ JSONObject cacheObj = extractAiUnderstanding(videoId, data);
|
|
|
|
|
+ if (cacheObj == null) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 写入 Redis(不设置过期时间,通过每日任务清除逻辑手动控制删除)
|
|
|
|
|
+ String redisKey = VectorConstants.AI_OLD_UNDERSTANDING_KEY_PREFIX + videoId;
|
|
|
|
|
+ redisUtils.set(redisKey, cacheObj.toJSONString(), 0);
|
|
|
|
|
+
|
|
|
|
|
+ // 记录到 SET 中
|
|
|
|
|
+ redisUtils.sAdd(VectorConstants.AI_OLD_UNDERSTANDING_IDS_SET_KEY, videoIdStr);
|
|
|
|
|
+ currentCachedIds.add(videoIdStr);
|
|
|
|
|
+ totalSuccess.incrementAndGet();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 result_log 记录失败: {}", e.getMessage());
|
|
|
|
|
+ totalFail.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ log.info("批次 {}-{} 处理完成", batchStart, batchEnd);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批次 {}-{} 查询ODPS失败: {}", batchStart, batchEnd, e.getMessage(), e);
|
|
|
|
|
+ totalFail.addAndGet(batchIds.size());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从 result_log 的 data 字段中提取 AI 理解信息
|
|
|
|
|
+ * data 结构参考 r.json:
|
|
|
|
|
+ * - "一、基础信息"."内容选题"
|
|
|
|
|
+ * - "一、基础信息"."视频主题"
|
|
|
|
|
+ * - "一、基础信息"."视频关键词"
|
|
|
|
|
+ * - "五、音画细节"."视频口播"
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param videoId 视频ID
|
|
|
|
|
+ * @param data result_log 的 data JSON 字符串
|
|
|
|
|
+ * @return 缓存 JSON 对象,全部字段为空时返回 null
|
|
|
|
|
+ */
|
|
|
|
|
+ private JSONObject extractAiUnderstanding(Long videoId, String data) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject json = JSON.parseObject(data);
|
|
|
|
|
+ if (json == null) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String contentTopic = null;
|
|
|
|
|
+ String videoTheme = null;
|
|
|
|
|
+ String videoKeywords = null;
|
|
|
|
|
+ String videoNarration = null;
|
|
|
|
|
+
|
|
|
|
|
+ // 提取"一、基础信息"中的字段
|
|
|
|
|
+ JSONObject basicInfo = json.getJSONObject("一、基础信息");
|
|
|
|
|
+ if (basicInfo != null) {
|
|
|
|
|
+ contentTopic = basicInfo.getString("内容选题");
|
|
|
|
|
+ videoTheme = basicInfo.getString("视频主题");
|
|
|
|
|
+ videoKeywords = basicInfo.getString("视频关键词");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 提取"五、音画细节"中的"视频口播"
|
|
|
|
|
+ JSONObject audioVisual = json.getJSONObject("五、音画细节");
|
|
|
|
|
+ if (audioVisual != null) {
|
|
|
|
|
+ videoNarration = audioVisual.getString("视频口播");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 全部字段为空则不缓存
|
|
|
|
|
+ if (!StringUtils.hasText(contentTopic)
|
|
|
|
|
+ && !StringUtils.hasText(videoTheme)
|
|
|
|
|
+ && !StringUtils.hasText(videoKeywords)
|
|
|
|
|
+ && !StringUtils.hasText(videoNarration)) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ JSONObject cacheObj = new JSONObject();
|
|
|
|
|
+ cacheObj.put("videoId", videoId);
|
|
|
|
|
+ cacheObj.put("contentTopic", contentTopic);
|
|
|
|
|
+ cacheObj.put("videoTheme", videoTheme);
|
|
|
|
|
+ cacheObj.put("videoKeywords", videoKeywords);
|
|
|
|
|
+ cacheObj.put("videoNarration", videoNarration);
|
|
|
|
|
+ return cacheObj;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解析 result_log data 失败, videoId={}: {}", videoId, e.getMessage());
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|