wangyunpeng пре 1 дан
родитељ
комит
cc69db6708
1 измењених фајлова са 110 додато и 81 уклоњено
  1. 110 81
      core/src/main/java/com/tzld/videoVector/job/VideoArticleMatchJob.java

+ 110 - 81
core/src/main/java/com/tzld/videoVector/job/VideoArticleMatchJob.java

@@ -184,14 +184,14 @@ public class VideoArticleMatchJob {
             Map<Long, String> videoTitleMap = fetchVideoTitles(records);
 
             // 3.1 构建 videoId → channelLevel3 映射(用于匹配时过滤已发送文章)
-            Map<Long, String> videoChannelMap = buildVideoChannelMap(records);
+            Map<Long, Set<String>> videoChannelMap = buildVideoChannelMap(records);
 
             // 4. 视频标题 → 长文向量召回(过滤已发送文章)
-            Map<Long, List<ArticleMatchItem>> videoArticleMatches =
+            Map<Long, Map<String, List<ArticleMatchItem>>> videoArticleMatches =
                     matchArticlesByTitles(videoTitleMap, videoChannelMap, sentSourceIds);
 
             // 5. 1v1 去重配对
-            Map<Long, ArticleMatchItem> finalPairs = dedupOneToOne(videoArticleMatches);
+            Map<Long, Map<String, ArticleMatchItem>> finalPairs = dedupOneToOne(videoArticleMatches);
 
             // 6. 构建结果对象 → 清理旧数据 → 批量写入
             saveResults(dt, records, videoTitleMap, finalPairs);
@@ -229,8 +229,9 @@ public class VideoArticleMatchJob {
     private List<ChannelDemandMatchResult> fetchDemandRecords(String dt) {
         ChannelDemandMatchResultExample example = new ChannelDemandMatchResultExample();
         example.createCriteria()
-                .andChannelNameEqualTo(channelName)
                 .andDtEqualTo(dt)
+                .andChannelNameEqualTo(channelName)
+                .andDemandStrategyEqualTo("人群需求")
                 .andChannelLevel3In(targetAccounts);
 
         List<ChannelDemandMatchResult> records = channelDemandMatchResultMapper.selectByExample(example);
@@ -350,12 +351,13 @@ public class VideoArticleMatchJob {
 
     /**
      * 从匹配记录中提取 videoId → channelLevel3 映射。
-     * <p>同一视频可能对应多条记录,取第一条的 channelLevel3。
+     * <p>同一视频可能对应多条记录(不同账号),收集所有 channelLevel3。
      */
-    private Map<Long, String> buildVideoChannelMap(List<ChannelDemandMatchResult> records) {
-        Map<Long, String> map = new LinkedHashMap<>();
+    private Map<Long, Set<String>> buildVideoChannelMap(List<ChannelDemandMatchResult> records) {
+        Map<Long, Set<String>> map = new LinkedHashMap<>();
         for (ChannelDemandMatchResult r : records) {
-            map.putIfAbsent(r.getMatchVideoId(), r.getChannelLevel3());
+            map.computeIfAbsent(r.getMatchVideoId(), k -> new LinkedHashSet<>())
+                    .add(r.getChannelLevel3());
         }
         return map;
     }
@@ -370,18 +372,18 @@ public class VideoArticleMatchJob {
      * <p>使用线程池并发执行,单条失败不影响整体流程。
      * 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
      * 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
-     * 召回结果从上往下(按 score 降序)排除该账号已发送的文章,取剩余的第一条
+     * 同一视频可能属于多个账号,对每个账号独立过滤已发送文章,结果按账号分组
      *
-     * @param videoTitleMap  videoId → title 映射
-     * @param videoChannelMap videoId → channelLevel3 映射
+     * @param videoTitleMap   videoId → title 映射
+     * @param videoChannelMap videoId → channelLevel3 集合
      * @param sentSourceIds   channelLevel3 → 已发送素材 ID 集合
-     * @return videoId → 文章匹配列表 映射(已过滤已发送,按 score 降序)
+     * @return videoId → (channelLevel3 → 文章匹配列表) 映射(已过滤已发送,按 score 降序)
      */
-    private Map<Long, List<ArticleMatchItem>> matchArticlesByTitles(
+    private Map<Long, Map<String, List<ArticleMatchItem>>> matchArticlesByTitles(
             Map<Long, String> videoTitleMap,
-            Map<Long, String> videoChannelMap,
+            Map<Long, Set<String>> videoChannelMap,
             Map<String, Set<String>> sentSourceIds) {
-        ConcurrentHashMap<Long, List<ArticleMatchItem>> resultMap = new ConcurrentHashMap<>();
+        ConcurrentHashMap<Long, Map<String, List<ArticleMatchItem>>> resultMap = new ConcurrentHashMap<>();
         RankingSpec ranking = buildRankingSpec();
 
         int totalVideos = videoTitleMap.size();
@@ -394,7 +396,7 @@ public class VideoArticleMatchJob {
         for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
             Long videoId = entry.getKey();
             String title = entry.getValue();
-            String channelLevel3 = videoChannelMap.get(videoId);
+            Set<String> channelLevel3s = videoChannelMap.getOrDefault(videoId, Collections.emptySet());
 
             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                 try {
@@ -402,16 +404,21 @@ public class VideoArticleMatchJob {
                     RecallResultVO recallResult = vectorRecallTestService.batchByText(batchParam);
                     List<ArticleMatchItem> articles = extractArticleItems(recallResult);
 
-                    if (!articles.isEmpty()) {
-                        // 从上往下(score 降序)排除已发送的文章,保留剩余列表
-                        List<ArticleMatchItem> filtered = filterSentArticles(articles, channelLevel3, sentSourceIds);
-                        int skipped = articles.size() - filtered.size();
-                        if (skipped > 0) {
-                            skippedSentCount.addAndGet(skipped);
+                    if (!articles.isEmpty() && !channelLevel3s.isEmpty()) {
+                        Map<String, List<ArticleMatchItem>> perChannelResults = new LinkedHashMap<>();
+                        for (String channelLevel3 : channelLevel3s) {
+                            List<ArticleMatchItem> filtered = filterSentArticles(articles, channelLevel3, sentSourceIds);
+                            int skipped = articles.size() - filtered.size();
+                            if (skipped > 0) {
+                                skippedSentCount.addAndGet(skipped);
+                            }
+                            if (!filtered.isEmpty()) {
+                                perChannelResults.put(channelLevel3, filtered);
+                                matchedCount.incrementAndGet();
+                            }
                         }
-                        if (!filtered.isEmpty()) {
-                            resultMap.put(videoId, filtered);
-                            matchedCount.incrementAndGet();
+                        if (!perChannelResults.isEmpty()) {
+                            resultMap.put(videoId, perChannelResults);
                         }
                     }
                 } catch (Exception e) {
@@ -436,7 +443,7 @@ public class VideoArticleMatchJob {
             log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
         }
 
-        Map<Long, List<ArticleMatchItem>> result = new LinkedHashMap<>(resultMap);
+        Map<Long, Map<String, List<ArticleMatchItem>>> result = new LinkedHashMap<>(resultMap);
         log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇",
                 matchedCount.get(), totalVideos, skippedSentCount.get());
         return result;
@@ -504,50 +511,62 @@ public class VideoArticleMatchJob {
     /**
      * 对视频→文章多对多匹配结果进行 1v1 去重。
      *
-     * <p>两轮约束:
+     * <p>两轮约束(全局,跨账号)
      * <ol>
-     *   <li><b>每视频取最佳文章</b>:每个视频只保留 score 最高的文章(视频→文章 N→1)</li>
-     *   <li><b>每文章保留最佳视频</b>:每篇文章只保留 score 最高的视频(文章→视频 N→1)</li>
+     *   <li><b>每 (视频, 账号) 取最佳文章</b>:每个 (video, channelLevel3) 只保留 score 最高的文章</li>
+     *   <li><b>每文章保留最佳 (视频, 账号)</b>:每篇文章只保留 score 最高的 (video, channelLevel3)</li>
      * </ol>
-     * 最终结果中每个视频最多对应一篇文章,每篇文章最多对应一个视频
+     * 最终结果中每个 (video, channelLevel3) 最多对应一篇文章,每篇文章最多对应一个 (video, channelLevel3)
      *
-     * @param videoArticleMatches videoId → 文章列表(已按 score 降序)
-     * @return videoId → 唯一最佳文章 的最终配对
+     * @param videoArticleMatches videoId → (channelLevel3 → 文章列表)(已按 score 降序)
+     * @return videoId → (channelLevel3 → 唯一最佳文章) 的最终配对
      */
-    private Map<Long, ArticleMatchItem> dedupOneToOne(
-            Map<Long, List<ArticleMatchItem>> videoArticleMatches) {
-
-        // 第一轮约束:每视频取最佳文章 (top 1 by score)
-        Map<Long, ArticleMatchItem> videoBestArticle = new LinkedHashMap<>();
-        for (Map.Entry<Long, List<ArticleMatchItem>> entry : videoArticleMatches.entrySet()) {
-            videoBestArticle.put(entry.getKey(), entry.getValue().get(0));
+    private Map<Long, Map<String, ArticleMatchItem>> dedupOneToOne(
+            Map<Long, Map<String, List<ArticleMatchItem>>> videoArticleMatches) {
+
+        // 第一轮约束:每 (video, channelLevel3) 取最佳文章 (top 1 by score)
+        // 使用复合键 "videoId|channelLevel3" 作为扁平化标识
+        Map<String, ArticleMatchItem> videoChannelBestArticle = new LinkedHashMap<>();
+        for (Map.Entry<Long, Map<String, List<ArticleMatchItem>>> videoEntry : videoArticleMatches.entrySet()) {
+            Long videoId = videoEntry.getKey();
+            for (Map.Entry<String, List<ArticleMatchItem>> channelEntry : videoEntry.getValue().entrySet()) {
+                String channelLevel3 = channelEntry.getKey();
+                List<ArticleMatchItem> articles = channelEntry.getValue();
+                if (!articles.isEmpty()) {
+                    videoChannelBestArticle.put(videoId + "|" + channelLevel3, articles.get(0));
+                }
+            }
         }
 
-        // 第二轮约束:每文章保留最佳视频 (最高 score)
-        Map<String, Long> articleBestVideo = new LinkedHashMap<>();
+        // 第二轮约束:每文章保留最佳 (video, channelLevel3)(最高 score)
+        Map<String, String> articleBestVideoChannel = new LinkedHashMap<>();
         Map<String, Double> articleBestScore = new LinkedHashMap<>();
-        for (Map.Entry<Long, ArticleMatchItem> entry : videoBestArticle.entrySet()) {
-            Long videoId = entry.getKey();
+        for (Map.Entry<String, ArticleMatchItem> entry : videoChannelBestArticle.entrySet()) {
+            String vcKey = entry.getKey();
             ArticleMatchItem article = entry.getValue();
             Double currentBest = articleBestScore.get(article.articleId);
             if (currentBest == null || article.score > currentBest) {
-                articleBestVideo.put(article.articleId, videoId);
+                articleBestVideoChannel.put(article.articleId, vcKey);
                 articleBestScore.put(article.articleId, article.score);
             }
         }
 
-        // 仅保留被文章选中的视频(视频一定是该文章的最佳匹配才保留)
-        Map<Long, ArticleMatchItem> finalPairs = new LinkedHashMap<>();
-        for (Map.Entry<Long, ArticleMatchItem> entry : videoBestArticle.entrySet()) {
-            Long videoId = entry.getKey();
+        // 仅保留被文章选中的 (video, channelLevel3) 配对
+        Map<Long, Map<String, ArticleMatchItem>> finalPairs = new LinkedHashMap<>();
+        for (Map.Entry<String, ArticleMatchItem> entry : videoChannelBestArticle.entrySet()) {
+            String vcKey = entry.getKey();
             ArticleMatchItem article = entry.getValue();
-            if (videoId.equals(articleBestVideo.get(article.articleId))) {
-                finalPairs.put(videoId, article);
+            if (vcKey.equals(articleBestVideoChannel.get(article.articleId))) {
+                int sepIdx = vcKey.indexOf('|');
+                Long videoId = Long.parseLong(vcKey.substring(0, sepIdx));
+                String channelLevel3 = vcKey.substring(sepIdx + 1);
+                finalPairs.computeIfAbsent(videoId, k -> new LinkedHashMap<>())
+                        .put(channelLevel3, article);
             }
         }
 
-        log.info("1v1 去重完成: {} 个视频 → {} 篇唯一文章 → {} 对最终配对",
-                videoBestArticle.size(), articleBestVideo.size(), finalPairs.size());
+        log.info("1v1 去重完成: {} 个 (视频,账号) → {} 篇唯一文章 → {} 对最终配对",
+                videoChannelBestArticle.size(), articleBestVideoChannel.size(), finalPairs.size());
         return finalPairs;
     }
 
@@ -558,39 +577,45 @@ public class VideoArticleMatchJob {
     /**
      * 将最终配对结果写入 video_article_match_result 表。
      *
-     * <p>每对 (video, article) 产生一条记录(不关联需求维度),
+     * <p>每对 (video, channelLevel3, article) 产生一条记录(不关联需求维度),
      * channelLevel3 / account 取自原始匹配记录。
+     * 同一视频可能对应多个账号,为每个账号各生成一行。
      * 先清理同日 dt 旧数据(幂等重跑),再分批插入新结果。
      */
     private void saveResults(String dt,
                              List<ChannelDemandMatchResult> records,
                              Map<Long, String> videoTitleMap,
-                             Map<Long, ArticleMatchItem> finalPairs) {
+                             Map<Long, Map<String, ArticleMatchItem>> finalPairs) {
 
-        // 构建 videoId → (channelLevel3, account) 映射,取第一条匹配记录的值
-        Map<Long, ChannelDemandMatchResult> videoRecordMap = new LinkedHashMap<>();
+        // 构建 "videoId|channelLevel3" → (channelLevel3, account) 映射
+        Map<String, ChannelDemandMatchResult> videoChannelRecordMap = new LinkedHashMap<>();
         for (ChannelDemandMatchResult r : records) {
-            videoRecordMap.putIfAbsent(r.getMatchVideoId(), r);
+            String key = r.getMatchVideoId() + "|" + r.getChannelLevel3();
+            videoChannelRecordMap.putIfAbsent(key, r);
         }
 
         // 序列化排名参数(所有行共享)
         String rankingJson = JSON.toJSONString(buildRankingSpec());
         String configCodesStr = String.join(",", ARTICLE_CONFIG_CODES);
 
-        // 每对 (video, article)
+        // 每对 (video, channelLevel3, article)
         List<VideoArticleMatchResult> resultList = new ArrayList<>();
-        for (Map.Entry<Long, ArticleMatchItem> pair : finalPairs.entrySet()) {
-            Long videoId = pair.getKey();
-            ArticleMatchItem article = pair.getValue();
+        for (Map.Entry<Long, Map<String, ArticleMatchItem>> videoEntry : finalPairs.entrySet()) {
+            Long videoId = videoEntry.getKey();
             String videoTitle = videoTitleMap.get(videoId);
-            ChannelDemandMatchResult record = videoRecordMap.get(videoId);
-            if (record == null) {
-                continue;
-            }
+            for (Map.Entry<String, ArticleMatchItem> channelEntry : videoEntry.getValue().entrySet()) {
+                String channelLevel3 = channelEntry.getKey();
+                ArticleMatchItem article = channelEntry.getValue();
+                String vcKey = videoId + "|" + channelLevel3;
+                ChannelDemandMatchResult record = videoChannelRecordMap.get(vcKey);
+                if (record == null) {
+                    continue;
+                }
 
-            resultList.add(buildResultRow(dt, videoId, videoTitle, article,
-                    record.getChannelLevel3(), record.getAccount(),
-                    rankingJson, configCodesStr));
+                resultList.add(buildResultRow(dt, videoId, videoTitle, article,
+                        channelLevel3, record.getAccount(),
+                        rankingJson, configCodesStr));
+            }
         }
 
         log.info("构建完成: {} 条结果记录", resultList.size());
@@ -656,9 +681,9 @@ public class VideoArticleMatchJob {
     private void logSummary(String dt,
                             List<ChannelDemandMatchResult> records,
                             Map<Long, String> videoTitleMap,
-                            Map<Long, ArticleMatchItem> finalPairs) {
+                            Map<Long, Map<String, ArticleMatchItem>> finalPairs) {
 
-        // 按 channelLevel3 分组聚合原始记录
+        // 按 channelLevel3 分组聚合原始记录(需求点+视频数)
         Map<String, ChannelLevel3Stats> statsMap = new LinkedHashMap<>();
         for (ChannelDemandMatchResult r : records) {
             String cl3 = r.getChannelLevel3() != null ? r.getChannelLevel3() : "未知";
@@ -671,16 +696,20 @@ public class VideoArticleMatchJob {
             }
         }
 
-        // 计算每组的匹配情况
+        // 遍历 finalPairs 计算每组的匹配情况
         Set<String> allMatchedArticles = new HashSet<>();
-        for (ChannelLevel3Stats stats : statsMap.values()) {
-            for (Long videoId : stats.videoIds) {
-                ArticleMatchItem matched = finalPairs.get(videoId);
-                if (matched != null) {
-                    stats.matchedVideoCount++;
-                    stats.matchedArticleIds.add(matched.articleId);
-                }
+        int totalResultRows = 0;
+        for (Map.Entry<Long, Map<String, ArticleMatchItem>> videoEntry : finalPairs.entrySet()) {
+            for (Map.Entry<String, ArticleMatchItem> channelEntry : videoEntry.getValue().entrySet()) {
+                String cl3 = channelEntry.getKey() != null ? channelEntry.getKey() : "未知";
+                ArticleMatchItem article = channelEntry.getValue();
+                ChannelLevel3Stats stats = statsMap.computeIfAbsent(cl3, k -> new ChannelLevel3Stats());
+                stats.matchedVideoCount++;
+                stats.matchedArticleIds.add(article.articleId);
+                totalResultRows++;
             }
+        }
+        for (ChannelLevel3Stats stats : statsMap.values()) {
             allMatchedArticles.addAll(stats.matchedArticleIds);
         }
 
@@ -714,10 +743,10 @@ public class VideoArticleMatchJob {
         log.info(String.format("  %-25s %8d %8d %8d %8d",
                 "【合计】", totalDemands, totalVideos, totalMatched, allMatchedArticles.size()));
         log.info("============================================================");
-        log.info("  查询记录: {} | 唯一视频: {} | 有效标题: {} | 1v1配对: {} | 结果行数: {}",
+        log.info("  查询记录: {} | 唯一视频: {} | 有效标题: {} | 1v1配对: {} 个 (video,账号) | 结果行数: {}",
                 records.size(), allVideoIds.size(), videoTitleMap.size(),
-                finalPairs.size(),
-                finalPairs.size() * (records.size() / Math.max(allVideoIds.size(), 1)));
+                finalPairs.values().stream().mapToInt(Map::size).sum(),
+                totalResultRows);
         log.info("============================================================\n");
     }