Przeglądaj źródła

需求视频匹配长文fix

wangyunpeng 8 godzin temu
rodzic
commit
a538e4c4ff

+ 72 - 48
core/src/main/java/com/tzld/videoVector/job/VideoArticleMatchJob.java

@@ -15,6 +15,7 @@ import com.tzld.videoVector.model.po.pgVector.ChannelDemandMatchResultExample;
 import com.tzld.videoVector.model.po.pgVector.VideoArticleMatchResult;
 import com.tzld.videoVector.model.vo.recall.RecallResultVO;
 import com.tzld.videoVector.service.recall.VectorRecallTestService;
+import com.tzld.videoVector.util.Md5Util;
 import com.tzld.videoVector.util.RedisUtils;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -42,7 +43,7 @@ import java.util.stream.Collectors;
  *   <li>查询 channel_demand_match_result 中指定渠道的匹配记录</li>
  *   <li>通过 VideoApiService 批量获取视频标题</li>
  *   <li>对每个视频标题调用 batchByText 进行长文向量召回</li>
- *   <li>1v1 去重:每视频取最佳文章 → 每文章保留最佳视频</li>
+ *   <li>去重:每 (视频, 账号) 取最佳文章,文章可被多个视频匹配</li>
  *   <li>清理旧数据 + 批量写入 video_article_match_result 表</li>
  *   <li>按 channelLevel3 分组统计并输出日志</li>
  * </ol>
@@ -125,6 +126,12 @@ public class VideoArticleMatchJob {
     /** 视频标题缓存过期时间(秒),默认 7 天 */
     private static final long VIDEO_TITLE_CACHE_EXPIRE = 7 * 24 * 60 * 60;
 
+    /** 长文召回结果 Redis 缓存 key 前缀 */
+    private static final String ARTICLE_RECALL_CACHE_PREFIX = "video_article_match:article_recall:";
+
+    /** 长文召回结果缓存过期时间(秒),默认 7 天 */
+    private static final long ARTICLE_RECALL_CACHE_EXPIRE = 7 * 24 * 60 * 60;
+
     /** 长文匹配并发线程数 */
     @Value("${video.article.match.thread-pool-size:10}")
     private int matchThreadPoolSize;
@@ -367,9 +374,10 @@ public class VideoArticleMatchJob {
     // =====================================================
 
     /**
-     * 对每个视频标题调用 batchByText 进行长文相似度召回。
+     * 对每个视频标题调用 batchByText 进行长文相似度召回,带 Redis 缓存
      *
-     * <p>使用线程池并发执行,单条失败不影响整体流程。
+     * <p>缓存 key: video_article_match:article_recall:{md5(title)},过期 7 天。
+     * 先去重标题后批量 mGet,命中直接复用;未命中并发调用 batchByText 后批量回写。
      * 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
      * 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
      * 同一视频可能属于多个账号,对每个账号独立过滤已发送文章,结果按账号分组。
@@ -386,12 +394,41 @@ public class VideoArticleMatchJob {
         ConcurrentHashMap<Long, Map<String, List<ArticleMatchItem>>> resultMap = new ConcurrentHashMap<>();
         RankingSpec ranking = buildRankingSpec();
 
+        // ---- 第一阶段:批量 Redis 缓存查找(按去重标题) ----
+        Set<String> uniqueTitles = new LinkedHashSet<>(videoTitleMap.values());
+        Map<String, String> titleToCacheKey = new LinkedHashMap<>((int) (uniqueTitles.size() / 0.75f) + 1);
+        List<String> cacheKeys = new ArrayList<>(uniqueTitles.size());
+        for (String title : uniqueTitles) {
+            String cacheKey = ARTICLE_RECALL_CACHE_PREFIX + Md5Util.encoderByMd5(title);
+            titleToCacheKey.put(title, cacheKey);
+            cacheKeys.add(cacheKey);
+        }
+
+        List<String> cachedValues = redisUtils.mGet(cacheKeys);
+        Map<String, List<ArticleMatchItem>> titleToArticles = new ConcurrentHashMap<>();
+        int idx = 0;
+        int cacheHitTitles = 0;
+        for (String title : uniqueTitles) {
+            String cached = (idx < cachedValues.size()) ? cachedValues.get(idx) : null;
+            if (StringUtils.hasText(cached)) {
+                titleToArticles.put(title, JSON.parseArray(cached, ArticleMatchItem.class));
+                cacheHitTitles++;
+            }
+            idx++;
+        }
+        log.info("长文召回缓存: {} 个去重标题, 命中 {} 个, 未命中 {} 个",
+                uniqueTitles.size(), cacheHitTitles, uniqueTitles.size() - cacheHitTitles);
+
+        // 缓存未命中的标题 → 回写集合(并发安全)
+        Map<String, String> newCacheEntries = new ConcurrentHashMap<>();
+
         int totalVideos = videoTitleMap.size();
         AtomicInteger processed = new AtomicInteger(0);
         AtomicInteger matchedCount = new AtomicInteger(0);
         AtomicInteger skippedSentCount = new AtomicInteger(0);
+        AtomicInteger cacheHitCount = new AtomicInteger(0);
 
-        // 构建并发任务
+        // ---- 第二阶段:并发处理每个视频 ----
         List<CompletableFuture<Void>> futures = new ArrayList<>(totalVideos);
         for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
             Long videoId = entry.getKey();
@@ -400,9 +437,17 @@ public class VideoArticleMatchJob {
 
             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                 try {
-                    BatchByTextParam batchParam = buildBatchByTextParam(title, ranking);
-                    RecallResultVO recallResult = vectorRecallTestService.batchByText(batchParam);
-                    List<ArticleMatchItem> articles = extractArticleItems(recallResult);
+                    // 从缓存获取或实时召回
+                    List<ArticleMatchItem> articles = titleToArticles.get(title);
+                    if (articles != null) {
+                        cacheHitCount.incrementAndGet();
+                    } else {
+                        BatchByTextParam batchParam = buildBatchByTextParam(title, ranking);
+                        RecallResultVO recallResult = vectorRecallTestService.batchByText(batchParam);
+                        articles = extractArticleItems(recallResult);
+                        titleToArticles.put(title, articles);
+                        newCacheEntries.put(titleToCacheKey.get(title), JSON.toJSONString(articles));
+                    }
 
                     if (!articles.isEmpty() && !channelLevel3s.isEmpty()) {
                         Map<String, List<ArticleMatchItem>> perChannelResults = new LinkedHashMap<>();
@@ -426,8 +471,8 @@ public class VideoArticleMatchJob {
                 } finally {
                     int done = processed.incrementAndGet();
                     if (done % 10 == 0 || done == totalVideos) {
-                        log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中, 跳过已发送 {} 个",
-                                done, totalVideos, matchedCount.get(), skippedSentCount.get());
+                        log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中, 跳过已发送 {} 个, 缓存命中 {} 个",
+                                done, totalVideos, matchedCount.get(), skippedSentCount.get(), cacheHitCount.get());
                     }
                 }
             }, matchExecutor);
@@ -438,14 +483,20 @@ public class VideoArticleMatchJob {
         // 等待所有任务完成
         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
 
+        // 批量回写缓存
+        if (!newCacheEntries.isEmpty()) {
+            redisUtils.batchSetWithExpire(newCacheEntries, ARTICLE_RECALL_CACHE_EXPIRE);
+            log.info("长文召回缓存回写 {} 条", newCacheEntries.size());
+        }
+
         int finalProcessed = processed.get();
         if (finalProcessed != totalVideos) {
             log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
         }
 
         Map<Long, Map<String, List<ArticleMatchItem>>> result = new LinkedHashMap<>(resultMap);
-        log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇",
-                matchedCount.get(), totalVideos, skippedSentCount.get());
+        log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇, 缓存命中 {} 次",
+                matchedCount.get(), totalVideos, skippedSentCount.get(), cacheHitCount.get());
         return result;
     }
 
@@ -509,14 +560,14 @@ public class VideoArticleMatchJob {
     // =====================================================
 
     /**
-     * 对视频→文章多对多匹配结果进行 1v1 去重。
+     * 对视频→文章多对多匹配结果进行去重。
      *
-     * <p>两轮约束(全局,跨账号):
+     * <p>约束(全局,跨账号):
      * <ol>
      *   <li><b>每 (视频, 账号) 取最佳文章</b>:每个 (video, channelLevel3) 只保留 score 最高的文章</li>
-     *   <li><b>每文章保留最佳 (视频, 账号)</b>:每篇文章只保留 score 最高的 (video, channelLevel3)</li>
+     *   <li><b>每篇文章可匹配多个 (视频, 账号)</b>:不限制文章被不同视频/账号重复匹配</li>
      * </ol>
-     * 最终结果中每个 (video, channelLevel3) 最多对应一篇文章,每篇文章最多对应一个 (video, channelLevel3)。
+     * 最终结果中每个 (video, channelLevel3) 最多对应一篇文章,每篇文章可对应多个 (video, channelLevel3)。
      *
      * @param videoArticleMatches videoId → (channelLevel3 → 文章列表)(已按 score 降序)
      * @return videoId → (channelLevel3 → 唯一最佳文章) 的最终配对
@@ -524,49 +575,22 @@ public class VideoArticleMatchJob {
     private Map<Long, Map<String, ArticleMatchItem>> dedupOneToOne(
             Map<Long, Map<String, List<ArticleMatchItem>>> videoArticleMatches) {
 
-        // 第一轮约束:每 (video, channelLevel3) 取最佳文章 (top 1 by score)
-        // 使用复合键 "videoId|channelLevel3" 作为扁平化标识
-        Map<String, ArticleMatchItem> videoChannelBestArticle = new LinkedHashMap<>();
+        // 每 (video, channelLevel3) 取最佳文章 (top 1 by score)
+        Map<Long, Map<String, ArticleMatchItem>> finalPairs = new LinkedHashMap<>();
         for (Map.Entry<Long, Map<String, List<ArticleMatchItem>>> videoEntry : videoArticleMatches.entrySet()) {
             Long videoId = videoEntry.getKey();
             for (Map.Entry<String, List<ArticleMatchItem>> channelEntry : videoEntry.getValue().entrySet()) {
                 String channelLevel3 = channelEntry.getKey();
                 List<ArticleMatchItem> articles = channelEntry.getValue();
                 if (!articles.isEmpty()) {
-                    videoChannelBestArticle.put(videoId + "|" + channelLevel3, articles.get(0));
+                    finalPairs.computeIfAbsent(videoId, k -> new LinkedHashMap<>())
+                            .put(channelLevel3, articles.get(0));
                 }
             }
         }
 
-        // 第二轮约束:每文章保留最佳 (video, channelLevel3)(最高 score)
-        Map<String, String> articleBestVideoChannel = new LinkedHashMap<>();
-        Map<String, Double> articleBestScore = new LinkedHashMap<>();
-        for (Map.Entry<String, ArticleMatchItem> entry : videoChannelBestArticle.entrySet()) {
-            String vcKey = entry.getKey();
-            ArticleMatchItem article = entry.getValue();
-            Double currentBest = articleBestScore.get(article.articleId);
-            if (currentBest == null || article.score > currentBest) {
-                articleBestVideoChannel.put(article.articleId, vcKey);
-                articleBestScore.put(article.articleId, article.score);
-            }
-        }
-
-        // 仅保留被文章选中的 (video, channelLevel3) 配对
-        Map<Long, Map<String, ArticleMatchItem>> finalPairs = new LinkedHashMap<>();
-        for (Map.Entry<String, ArticleMatchItem> entry : videoChannelBestArticle.entrySet()) {
-            String vcKey = entry.getKey();
-            ArticleMatchItem article = entry.getValue();
-            if (vcKey.equals(articleBestVideoChannel.get(article.articleId))) {
-                int sepIdx = vcKey.indexOf('|');
-                Long videoId = Long.parseLong(vcKey.substring(0, sepIdx));
-                String channelLevel3 = vcKey.substring(sepIdx + 1);
-                finalPairs.computeIfAbsent(videoId, k -> new LinkedHashMap<>())
-                        .put(channelLevel3, article);
-            }
-        }
-
-        log.info("1v1 去重完成: {} 个 (视频,账号) → {} 篇唯一文章 → {} 对最终配对",
-                videoChannelBestArticle.size(), articleBestVideoChannel.size(), finalPairs.size());
+        log.info("去重完成: {} 个视频 → {} 对最终配对 (每 (视频,账号) 取最佳文章)",
+                finalPairs.size(), finalPairs.values().stream().mapToInt(Map::size).sum());
         return finalPairs;
     }