Ver código fonte

视频数据查询缓存

wangyunpeng 8 horas atrás
pai
commit
b53aeace16

+ 6 - 0
core/src/main/java/com/tzld/videoVector/common/constant/VectorConstants.java

@@ -18,6 +18,12 @@ public interface VectorConstants {
     /** 向量存储 Redis Key 前缀 */
     String VECTOR_KEY_PREFIX = "video:vector:";
 
+    /** 视频基础信息 Redis Key 前缀,格式: video:detail:{videoId} */
+    String VIDEO_DETAIL_KEY_PREFIX = "video:detail:";
+
+    /** 视频基础信息 Redis 过期时间:2天(秒) */
+    long VIDEO_DETAIL_EXPIRE_SECONDS = 2L * 24 * 60 * 60;
+
     // ========================== 批处理参数 ==========================
 
     /** 每页查询数量 */

+ 5 - 0
core/src/main/java/com/tzld/videoVector/dao/mapper/pgVector/ext/VideoVectorMapperExt.java

@@ -93,4 +93,9 @@ public interface VideoVectorMapperExt {
      * @param text 文本内容
      */
     int updateTextById(@Param("id") Long id, @Param("text") String text);
+
+    /**
+     * 查询所有不重复的 video_id(跨所有 configCode)
+     */
+    List<Long> selectAllDistinctVideoIds();
 }

+ 240 - 0
core/src/main/java/com/tzld/videoVector/job/VideoDetailSyncJob.java

@@ -0,0 +1,240 @@
+package com.tzld.videoVector.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.Record;
+import com.tzld.videoVector.common.constant.VectorConstants;
+import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoVectorMapperExt;
+import com.tzld.videoVector.util.OdpsUtil;
+import com.tzld.videoVector.util.RedisUtils;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * 视频基础信息同步任务
+ * <p>
+ * 每日定时执行:
+ * 1. 查询 video_vectors 表中所有 video_id(去重)
+ * 2. 通过 video_id 查询大数据表 loghubods.video_dimension_detail_add_column
+ *    参照 query_video_dimension_metrics.py 的查询逻辑,按视频维度字段 GROUP BY 并聚合指标
+ * 3. 将视频基础信息写入 Redis,供匹配返回时附带
+ */
+@Slf4j
+@Component
+public class VideoDetailSyncJob {
+
+    @Resource
+    private VideoVectorMapperExt videoVectorMapperExt;
+
+    @Resource
+    private RedisUtils redisUtils;
+
+    /** ODPS SQL IN 子句批次大小 */
+    private static final int ODPS_BATCH_SIZE = 200;
+
+    /** 查询近一年数据的 dt 偏移天数 */
+    private static final int DT_RANGE_DAYS = 365;
+
+    // ========================== 维度字段(取最新日期) ==========================
+
+    /** 维度字段列表(参照 query_video_dimension_metrics.py 的 GROUP_FIELDS,排除视频id) */
+    private static final List<String> DIMENSION_FIELDS = Arrays.asList(
+            "标题", "首次推荐时间", "merge二级品类",
+            "aidit详情", "项目名称", "首发aidit详情",
+            "解构选题", "元素merge", "分类merge", "top1元素", "top1分类",
+            "内容选题", "视频主题", "视频关键词", "视频主体", "视频场景",
+            "情感倾向", "视频风格", "时效性_有无时效", "推荐状态"
+    );
+
+    /** 维度字段 SELECT 部分(含视频id) */
+    private static final String DIMENSION_SELECT = "视频id, " + String.join(", ", DIMENSION_FIELDS);
+
+    // ========================== 聚合指标 ==========================
+
+    /** 聚合指标 SQL 片段(参照 query_video_dimension_metrics.py 的核心指标) */
+    private static final String METRICS_SQL = String.join(",\n",
+            "sum(当日分发曝光pv) as 分发曝光pv",
+            "sum(累计分享回流uv) AS 总回流",
+            "sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as rov",
+            "sum(当日分发分享pv)/sum(当日分发曝光pv) as str",
+            "sum(当日分发回流uv)/sum(当日分发分享pv) as ros",
+            "sum(当日分发拉回曝光pv)/sum(当日分发回流uv) as vor_t0",
+            "sum(当日分发拉回曝光pv)/sum(当日分发曝光pv) as vov0",
+            "sum(0_1日分发拉回曝光pv)/sum(当日分发曝光pv) as vov1",
+            "count(DISTINCT 视频id) as 分发视频量",
+            "AVG(视频时长) as 视频时长",
+            "sum(当日分发回流uv) AS 当日分发回流uv",
+            "sum(当日分发分享pv) AS 当日分发分享pv",
+            "sum(当日分发曝光人数) AS 分发曝光uv",
+            "SUM(总回流uv) AS 总回流uv",
+            "SUM(总分享pv) AS 总分享pv",
+            "sum(流量池曝光) AS 流量池曝光",
+            "sum(流量池回流) AS 流量池回流",
+            "sum(流量池分享) AS 流量池分享",
+            "sum(推荐曝光) AS 推荐曝光",
+            "sum(推荐回流) AS 推荐回流"
+    );
+
+    /**
+     * 同步视频基础信息到 Redis
+     * XxlJob handler: syncVideoDetailJob
+     */
+    @XxlJob("syncVideoDetailJob")
+    public ReturnT<String> syncVideoDetailJob(String param) {
+        log.info("开始执行视频基础信息同步任务, param: {}", param);
+
+        try {
+            // 1. 查询 video_vectors 表中所有不重复的 video_id
+            List<Long> allVideoIds = videoVectorMapperExt.selectAllDistinctVideoIds();
+            if (CollectionUtils.isEmpty(allVideoIds)) {
+                log.info("video_vectors 表中无数据,跳过");
+                return ReturnT.SUCCESS;
+            }
+            log.info("查询到 {} 个不重复的 video_id", allVideoIds.size());
+
+            // 计算 dt 范围(近一年)
+            String dtMax = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
+            String dtMin = LocalDate.now().minusDays(DT_RANGE_DAYS).format(DateTimeFormatter.BASIC_ISO_DATE);
+            log.info("ODPS 查询 dt 范围: {} ~ {}", dtMin, dtMax);
+
+            // 2. 分批查询大数据表并流式写入 Redis(拆分为维度查询 + 指标查询)
+            AtomicInteger totalSuccess = new AtomicInteger(0);
+            AtomicInteger totalFail = new AtomicInteger(0);
+
+            for (int i = 0; i < allVideoIds.size(); i += ODPS_BATCH_SIZE) {
+                int end = Math.min(i + ODPS_BATCH_SIZE, allVideoIds.size());
+                List<Long> batchIds = allVideoIds.subList(i, end);
+
+                try {
+                    String idsStr = batchIds.stream()
+                            .map(String::valueOf)
+                            .collect(Collectors.joining(","));
+
+                    // 2.1 查询维度字段(按日期取最大那天的数据)
+                    Map<Long, JSONObject> dimensionMap = new HashMap<>();
+                    String dimensionSql = buildDimensionSql(idsStr, dtMin, dtMax);
+                    OdpsUtil.getOdpsDataStream(dimensionSql, record -> {
+                        try {
+                            Long videoId = record.getBigint("视频id");
+                            if (videoId != null) {
+                                dimensionMap.put(videoId, buildVideoDetail(record));
+                            }
+                        } catch (Exception e) {
+                            log.error("处理维度字段记录失败: {}", e.getMessage());
+                        }
+                    });
+                    log.info("批次 {}-{} 维度查询完成,获取 {} 条记录", i, end, dimensionMap.size());
+
+                    // 2.2 查询聚合指标(只按视频id分组)
+                    String metricsSql = buildMetricsSql(idsStr, dtMin, dtMax);
+                    long processed = OdpsUtil.getOdpsDataStream(metricsSql, record -> {
+                        try {
+                            Long videoId = record.getBigint("视频id");
+                            if (videoId == null) {
+                                return;
+                            }
+                            // 合并维度字段 + 指标数据
+                            JSONObject detail = dimensionMap.getOrDefault(videoId, new JSONObject());
+                            JSONObject metrics = buildVideoDetail(record);
+                            if (metrics != null) {
+                                detail.putAll(metrics);
+                            }
+                            if (!detail.isEmpty()) {
+                                String redisKey = VectorConstants.VIDEO_DETAIL_KEY_PREFIX + videoId;
+                                redisUtils.set(redisKey, detail.toJSONString(),
+                                        VectorConstants.VIDEO_DETAIL_EXPIRE_SECONDS);
+                                totalSuccess.incrementAndGet();
+                            }
+                        } catch (Exception e) {
+                            log.error("处理指标记录失败: {}", e.getMessage());
+                            totalFail.incrementAndGet();
+                        }
+                    });
+
+                    log.info("批次 {}-{} 指标查询完成,处理 {} 条记录", i, end, processed);
+                } catch (Exception e) {
+                    log.error("批次 {}-{} 查询ODPS失败: {}", i, end, e.getMessage(), e);
+                    totalFail.addAndGet(batchIds.size());
+                }
+            }
+
+            log.info("视频基础信息同步任务完成,总成功: {}, 总失败: {}", totalSuccess.get(), totalFail.get());
+            return ReturnT.SUCCESS;
+        } catch (Exception e) {
+            log.error("视频基础信息同步任务执行失败: {}", e.getMessage(), e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 构建维度字段查询 SQL
+     * 按视频id分组,取最大日期(max(dt))那天的维度数据
+     */
+    private String buildDimensionSql(String idsStr, String dtMin, String dtMax) {
+        // 子查询:先找到每个视频id的最大日期
+        // 外层查询:用最大日期关联原表取出维度字段
+        return String.format(
+                "SELECT %s FROM loghubods.video_dimension_detail_add_column t " +
+                        "INNER JOIN (" +
+                        "  SELECT 视频id AS vid, MAX(dt) AS max_dt " +
+                        "  FROM loghubods.video_dimension_detail_add_column " +
+                        "  WHERE dt >= '%s' AND dt <= '%s' AND 视频id IN (%s) " +
+                        "  GROUP BY 视频id" +
+                        ") m ON t.视频id = m.vid AND t.dt = m.max_dt " +
+                        "WHERE t.dt >= '%s' AND t.dt <= '%s' AND t.视频id IN (%s)",
+                DIMENSION_SELECT,
+                dtMin, dtMax, idsStr,
+                dtMin, dtMax, idsStr);
+    }
+
+    /**
+     * 构建指标聚合查询 SQL
+     * 只按视频id分组,聚合核心指标
+     */
+    private String buildMetricsSql(String idsStr, String dtMin, String dtMax) {
+        return String.format(
+                "SELECT 视频id, %s " +
+                        "FROM loghubods.video_dimension_detail_add_column " +
+                        "WHERE dt >= '%s' AND dt <= '%s' AND 视频id IN (%s) " +
+                        "GROUP BY 视频id",
+                METRICS_SQL,
+                dtMin, dtMax, idsStr);
+    }
+
+    /**
+     * 将 ODPS Record 转换为视频详情 JSON
+     * 包含维度字段 + 聚合指标
+     *
+     * @param record ODPS 查询结果行
+     * @return 视频详情JSON
+     */
+    private JSONObject buildVideoDetail(Record record) {
+        JSONObject detail = new JSONObject();
+        try {
+            Column[] columns = record.getColumns();
+            if (columns != null) {
+                for (int i = 0; i < columns.length; i++) {
+                    String colName = columns[i].getName();
+                    // 排除视频id本身,其余字段全部写入
+                    if (!"视频id".equals(colName)) {
+                        detail.put(colName, record.getString(i));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("buildVideoDetail 字段映射异常: {}", e.getMessage());
+        }
+        return detail;
+    }
+}

+ 5 - 0
core/src/main/java/com/tzld/videoVector/model/vo/VideoMatchResult.java

@@ -2,6 +2,8 @@ package com.tzld.videoVector.model.vo;
 
 import lombok.Data;
 
+import java.util.Map;
+
 /**
  * 视频匹配结果包装类
  * 替代原来手动拼接 JSONObject 的返回方式,提供类型安全
@@ -18,6 +20,9 @@ public class VideoMatchResult {
     /** 余弦相似度分值 */
     private Double score;
 
+    /** 视频基础信息(从大数据表同步至Redis,字段待确定后补充) */
+    private Map<String, Object> videoDetail;
+
     public VideoMatchResult() {
     }
 

+ 31 - 0
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -20,6 +20,7 @@ import com.tzld.videoVector.model.vo.VideoMatchResult;
 import com.tzld.videoVector.common.constant.VectorConstants;
 import com.tzld.videoVector.service.*;
 import com.tzld.videoVector.util.Md5Util;
+import com.tzld.videoVector.util.RedisUtils;
 import com.tzld.videoVector.util.VectorUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -56,6 +57,9 @@ public class VideoSearchServiceImpl implements VideoSearchService {
     @Resource
     private DeconstructVectorConfigMapper deconstructVectorConfigMapper;
 
+    @Resource
+    private RedisUtils redisUtils;
+
     @Override
     public String deconstruct(DeconstructParam param) {
         if (param == null) {
@@ -461,6 +465,10 @@ public class VideoSearchServiceImpl implements VideoSearchService {
         }
 
         log.info("匹配完成,configCode: {},共返回 {} 条结果", param.getConfigCode(), result.size());
+
+        // 从 Redis 获取视频基础信息并填充到结果中
+        enrichVideoDetail(result);
+
         return result;
     }
 
@@ -760,4 +768,27 @@ public class VideoSearchServiceImpl implements VideoSearchService {
             return Collections.emptyMap();
         }
     }
+
+    /**
+     * 从 Redis 获取视频基础信息并填充到匹配结果中
+     * 数据来源:syncVideoDetailJob 定时任务从大数据表同步到 Redis
+     */
+    private void enrichVideoDetail(List<VideoMatchResult> results) {
+        if (results == null || results.isEmpty()) {
+            return;
+        }
+
+        for (VideoMatchResult matchResult : results) {
+            try {
+                String redisKey = VIDEO_DETAIL_KEY_PREFIX + matchResult.getVideoId();
+                String detailJson = redisUtils.get(redisKey);
+                if (detailJson != null) {
+                    Map<String, Object> detail = JSONObject.parseObject(detailJson, Map.class);
+                    matchResult.setVideoDetail(detail);
+                }
+            } catch (Exception e) {
+                log.error("获取视频详情失败,videoId={}: {}", matchResult.getVideoId(), e.getMessage());
+            }
+        }
+    }
 }

+ 70 - 7
core/src/main/java/com/tzld/videoVector/util/OdpsUtil.java

@@ -7,19 +7,34 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+import com.aliyun.odps.tunnel.InstanceTunnel;
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.function.Consumer;
 
 public class OdpsUtil {
 
-    public static List<Record> getOdpsData(String sql) {
-        String accessId = "LTAI9EBa0bd5PrDa";
-        String accessKey = "vAalxds7YxhfOA2yVv8GziCg3Y87v5";
-        String endpoint = "http://service.odps.aliyun.com/api";
-        Account account = new AliyunAccount(accessId, accessKey);
+    private static final Logger log = LoggerFactory.getLogger(OdpsUtil.class);
+
+    private static final String ACCESS_ID = "LTAI9EBa0bd5PrDa";
+    private static final String ACCESS_KEY = "vAalxds7YxhfOA2yVv8GziCg3Y87v5";
+    private static final String ENDPOINT = "http://service.odps.aliyun.com/api";
+    private static final String PROJECT = "loghubods";
+
+    private static Odps getOdps() {
+        Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
         Odps odps = new Odps(account);
-        odps.setEndpoint(endpoint);
-        odps.setDefaultProject("loghubods");
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject(PROJECT);
+        return odps;
+    }
+
+    public static List<Record> getOdpsData(String sql) {
+        Odps odps = getOdps();
         Instance i;
         try {
             i = SQLTask.run(odps, sql);
@@ -30,4 +45,52 @@ public class OdpsUtil {
         }
         return null;
     }
+
+    /**
+     * 流式读取 ODPS 查询结果(通过 InstanceTunnel,无记录数上限)
+     * <p>
+     * 与 getOdpsData 的区别:
+     * - getOdpsData 使用 SQLTask.getResult(),最多返回 1万条,全量加载到内存
+     * - 此方法使用 InstanceTunnel 流式读取,无记录数限制,逐条回调处理
+     *
+     * @param sql            查询 SQL
+     * @param recordConsumer 每条记录的回调处理器
+     * @return 处理的记录总数
+     */
+    public static long getOdpsDataStream(String sql, Consumer<Record> recordConsumer) {
+        Odps odps = getOdps();
+        try {
+            Instance instance = SQLTask.run(odps, sql);
+            instance.waitForSuccess();
+
+            InstanceTunnel tunnel = new InstanceTunnel(odps);
+            InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(
+                    odps.getDefaultProject(), instance.getId());
+
+            long totalCount = session.getRecordCount();
+            log.info("InstanceTunnel 流式读取开始,总记录数: {}", totalCount);
+
+            if (totalCount == 0) {
+                return 0;
+            }
+
+            long processed = 0;
+            TunnelRecordReader reader = session.openRecordReader(0, totalCount);
+            try {
+                Record record;
+                while ((record = reader.read()) != null) {
+                    recordConsumer.accept(record);
+                    processed++;
+                }
+            } finally {
+                reader.close();
+            }
+
+            log.info("InstanceTunnel 流式读取完成,实际处理: {} 条", processed);
+            return processed;
+        } catch (OdpsException | IOException e) {
+            log.error("ODPS 流式读取失败: {}", e.getMessage(), e);
+        }
+        return 0;
+    }
 }

+ 5 - 0
core/src/main/resources/mapper/pgVector/ext/VideoVectorMapperExt.xml

@@ -109,4 +109,9 @@
     WHERE id = #{id}
   </update>
 
+  <!-- 查询所有不重复的 video_id(跨所有 configCode) -->
+  <select id="selectAllDistinctVideoIds" resultType="java.lang.Long">
+    SELECT DISTINCT video_id FROM video_vectors
+  </select>
+
 </mapper>