Prechádzať zdrojové kódy

特征点泛化 doLibraryRecall cache

wangyunpeng 11 hodín pred
rodič
commit
db026ee8c0

+ 97 - 40
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -23,6 +23,7 @@ import com.tzld.videoVector.util.RedisUtils;
 import com.tzld.videoVector.util.VectorUtils;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -72,6 +73,11 @@ public class ChannelDemandMatchJob {
      */
     private static final String RECALL_CACHE_PREFIX = "channel_demand:recall:";
 
+    /**
+     * Library API 召回结果Redis缓存前缀
+     */
+    private static final String LIBRARY_RECALL_CACHE_PREFIX = "channel_demand:library_recall:";
+
     /**
      * 召回结果缓存过期时间(秒)
      */
@@ -616,40 +622,49 @@ public class ChannelDemandMatchJob {
     }
 
     /**
-     * Library API 召回:通过话题元素搜索 + 帖子列表获取召回视频
+     * Library API 召回:通过话题元素搜索 + 帖子列表获取召回视频(带 Redis 缓存)
      * <p>
      * 流程:
-     * 1. 用泛化元素名称搜索话题元素,获取元素 ID
-     * 2. 仅保留名称全等匹配的元素
-     * 3. 遍历元素 ID,调用帖子列表接口获取视频帖子
-     * 4. 按 post_id 去重
-     * 5. 从 Redis 批量获取视频详情指标(按 video.detail.metrics.days 天数维度)
-     * 6. 按 rov 降序排列,取 topN
+     * 1. 检查 Redis 缓存,命中则直接复用
+     * 2. 用泛化元素名称搜索话题元素,获取元素 ID
+     * 3. 仅保留名称全等匹配的元素
+     * 4. 遍历元素 ID,调用帖子列表接口获取视频帖子
+     * 5. 按 post_id 去重
+     * 6. 从 Redis 批量获取视频详情指标(按 video.detail.metrics.days 天数维度)
+     * 7. 写入 Redis 缓存
+     * 8. 按 rov 降序排列,取 topN
      */
     private List<ChannelDemandMatchResult> doLibraryRecall(ChannelDemandMatchResult demand, int topN) {
-        List<ChannelDemandMatchResult> batchRows = new ArrayList<>();
         String elementName = demand.getMatchGeneralizedElement();
+        String cacheKey = LIBRARY_RECALL_CACHE_PREFIX + Md5Util.encoderByMd5(elementName);
+
+        // 1. 尝试从缓存读取
+        List<CachedPost> cachedPosts = loadLibraryRecallCache(cacheKey);
+        if (cachedPosts != null) {
+            log.info("Library API 召回命中缓存, elementName={}, 缓存条数={}", elementName, cachedPosts.size());
+            return buildLibraryRecallRows(demand, topN, cachedPosts);
+        }
 
-        // 1. 搜索话题元素
+        // 2. 搜索话题元素
         LibraryApiService.TopicElementSearchResponse elementResp = libraryApiService.searchTopicElements(
                 libraryExecutionId, elementName, elementSearchLimit, "all");
         if (elementResp == null || CollectionUtils.isEmpty(elementResp.getItems())) {
             log.info("Library API 话题元素搜索无结果, executionId={}, elementName={}", libraryExecutionId, elementName);
-            return batchRows;
+            return Collections.emptyList();
         }
         log.info("Library API 话题元素搜索到 {} 个元素, elementName={}", elementResp.getItems().size(), elementName);
 
-        // 2. 仅保留名称全等匹配的元素
+        // 3. 仅保留名称全等匹配的元素
         List<LibraryApiService.TopicElementItem> matchedItems = elementResp.getItems().stream()
                 .filter(e -> elementName.equals(e.getName()))
                 .collect(Collectors.toList());
         if (matchedItems.isEmpty()) {
             log.info("Library API 话题元素无全等匹配, elementName={}", elementName);
-            return batchRows;
+            return Collections.emptyList();
         }
         log.info("Library API 话题元素全等匹配 {} 个, elementName={}", matchedItems.size(), elementName);
 
-        // 3. 遍历匹配的元素获取帖子,按 post_id 去重
+        // 4. 遍历匹配的元素获取帖子,按 post_id 去重
         Map<Long, LibraryApiService.PostItem> postMap = new LinkedHashMap<>();
         for (LibraryApiService.TopicElementItem element : matchedItems) {
             if (element.getId() == null) {
@@ -676,19 +691,19 @@ public class ChannelDemandMatchJob {
         }
         if (postMap.isEmpty()) {
             log.info("Library API 帖子列表无结果, elementName={}", elementName);
-            return batchRows;
+            return Collections.emptyList();
         }
         log.info("Library API 去重后获取到 {} 个帖子, elementName={}", postMap.size(), elementName);
 
-        // 4. 从 Redis 批量获取视频详情指标
+        // 5. 从 Redis 批量获取视频详情指标
         List<Long> postIdList = new ArrayList<>(postMap.keySet());
         List<String> redisKeys = postIdList.stream()
                 .map(id -> VectorConstants.VIDEO_DETAIL_DAYS_KEY_PREFIX + metricsDays + "d:" + id)
                 .collect(Collectors.toList());
         List<String> redisValues = redisUtils.mGet(redisKeys);
 
-        // 5. 解析指标并构建结果行
-        List<PostWithMetrics> postWithMetricsList = new ArrayList<>();
+        // 6. 构建可缓存的中间结果
+        List<CachedPost> newCachedPosts = new ArrayList<>();
         for (int i = 0; i < postIdList.size(); i++) {
             Long postId = postIdList.get(i);
             LibraryApiService.PostItem post = postMap.get(postId);
@@ -709,35 +724,66 @@ public class ChannelDemandMatchJob {
                     log.warn("解析视频详情失败, postId={}: {}", postId, e.getMessage());
                 }
             }
-            postWithMetricsList.add(new PostWithMetrics(post, videoDetail, rov));
+
+            CachedPost cp = new CachedPost();
+            cp.setPostId(post.getPostId());
+            cp.setTitle(post.getTitle());
+            cp.setDetailJson(videoDetail != null ? JSON.toJSONString(videoDetail) : null);
+            cp.setRov(rov);
+            newCachedPosts.add(cp);
         }
 
-        // 6. 按 rov 降序排列(无 rov 数据的排在最后),取 topN
-        postWithMetricsList.sort((a, b) -> {
-            Double aRov = a.rov != null ? a.rov : -1.0;
-            Double bRov = b.rov != null ? b.rov : -1.0;
+        // 7. 写入缓存
+        try {
+            redisUtils.set(cacheKey, JSON.toJSONString(newCachedPosts), RECALL_CACHE_EXPIRE);
+        } catch (Exception e) {
+            log.warn("写入Library API召回缓存失败, key={}: {}", cacheKey, e.getMessage());
+        }
+
+        // 8. 按 rov 降序构建结果
+        return buildLibraryRecallRows(demand, topN, newCachedPosts);
+    }
+
+    /**
+     * 从缓存的中间结果构建 ChannelDemandMatchResult 列表
+     */
+    private List<ChannelDemandMatchResult> buildLibraryRecallRows(ChannelDemandMatchResult demand,
+                                                                   int topN, List<CachedPost> cachedPosts) {
+        List<ChannelDemandMatchResult> batchRows = new ArrayList<>();
+        String elementName = demand.getMatchGeneralizedElement();
+
+        // 按 rov 降序排列(无 rov 数据的排在最后)
+        cachedPosts.sort((a, b) -> {
+            Double aRov = a.getRov() != null ? a.getRov() : -1.0;
+            Double bRov = b.getRov() != null ? b.getRov() : -1.0;
             return bRov.compareTo(aRov);
         });
 
         int count = 0;
-        for (PostWithMetrics pm : postWithMetricsList) {
+        for (CachedPost cp : cachedPosts) {
             if (count >= topN) {
                 break;
             }
-            LibraryApiService.PostItem post = pm.post;
-            Map<String, Object> detail = pm.videoDetail;
-            Long postId = Long.parseLong(post.getPostId());
+            Map<String, Object> detail = null;
+            if (cp.getDetailJson() != null) {
+                try {
+                    detail = JSONObject.parseObject(cp.getDetailJson(), Map.class);
+                } catch (Exception e) {
+                    log.warn("解析缓存视频详情失败, postId={}: {}", cp.getPostId(), e.getMessage());
+                }
+            }
+            Long postId = Long.parseLong(cp.getPostId());
 
             ChannelDemandMatchResult row = copyDemandFields(demand);
             row.setMatchVideoId(postId);
             row.setMatchConfigCode("LIBRARY_TOPIC_ELEMENT");
-            row.setMatchRov(pm.rov);
-            row.setMatchScore(pm.rov);
+            row.setMatchRov(cp.getRov());
+            row.setMatchScore(cp.getRov());
             row.setMatchSim(null);
             row.setMatchExposurePv(extractNumber(detail, "分发曝光pv", Long.class));
             row.setMatchHeadSingleReturnRate(extractNumber(detail, "头部单层回流率", Double.class));
             row.setMatchHeadDistributionSingleReturnRate(extractNumber(detail, "头部进分发单层回流率", Double.class));
-            row.setMatchText(post.getTitle());
+            row.setMatchText(cp.getTitle());
             row.setMatchStatus((short) 1);
             row.setExperimentId(generateExperimentId(demand, postId, "LIBRARY_TOPIC_ELEMENT"));
             batchRows.add(row);
@@ -745,23 +791,34 @@ public class ChannelDemandMatchJob {
         }
 
         log.info("Library API 召回完成, elementName={}, 候选{}条, 返回{}条",
-                elementName, postWithMetricsList.size(), batchRows.size());
+                elementName, cachedPosts.size(), batchRows.size());
         return batchRows;
     }
 
     /**
-     * 帖子与指标数据组装
+     * 从 Redis 读取 Library API 召回缓存
      */
-    private static class PostWithMetrics {
-        final LibraryApiService.PostItem post;
-        final Map<String, Object> videoDetail;
-        final Double rov;
-
-        PostWithMetrics(LibraryApiService.PostItem post, Map<String, Object> videoDetail, Double rov) {
-            this.post = post;
-            this.videoDetail = videoDetail;
-            this.rov = rov;
+    private List<CachedPost> loadLibraryRecallCache(String cacheKey) {
+        try {
+            String cached = redisUtils.get(cacheKey);
+            if (cached != null) {
+                return JSON.parseArray(cached, CachedPost.class);
+            }
+        } catch (Exception e) {
+            log.warn("读取Library API召回缓存失败, key={}: {}", cacheKey, e.getMessage());
         }
+        return null;
+    }
+
+    /**
+     * Library API 召回缓存中间结果(可 JSON 序列化)
+     */
+    @Data
+    private static class CachedPost {
+        private String postId;
+        private String title;
+        private String detailJson;
+        private Double rov;
     }
 
     /**