Переглянути джерело

feat: V562 将 dk_elements 召回改为保送 (独立配额, 不进个性化/非个性化)

- dk_elements (YearShareDkElements) 从 V562 个性化白名单移除, 改为末尾保送一路
- 新增 RecallUtils.extractRecallGuaranteed: 按粗排分选 topN, 不回填/不加重复,
  覆盖 Video.rovScore + 漏斗 entry.score, 标 self/other; 跑在两路截断之后, 不扰动前两路
- 新增 COARSE_RANK_FETCH_PUSH_FROMS, 让粗排分拉取覆盖 dk 保送路
- 现有 coarse_rank_summary 日志不变, 保送观测单独打 baosong_summary
- 3 个 Apollo 开关 (默认): dkElementsBaosongTopN=10 / dkBaosongUseCoarseScore=1 / dkBaosongLog=0

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
yangxiaohui 1 тиждень тому
батько
коміт
955c54df85

+ 39 - 8
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV562.java

@@ -40,10 +40,10 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
     private FeatureService featureService;
 
     /**
-     * V562 个性化召回白名单 (7 路: V566 基础 6 路 + 1 路 dk_elements 行为路实验):召回 key 含 mid/uid,
-     * 依赖该用户行为信号。
-     * V562 实验路径: YearShareDkElements (用户近期 share 行为 join dk_elements)
+     * V562 个性化召回白名单 (6 路: V566 基础 6 路):召回 key 含 mid/uid,依赖该用户行为信号。
      * 注:YearReturnCate2 因线上效果不佳, 2026-06-04 起移到非个性化白名单。
+     * 注:YearShareDkElements (dk_elements 行为路实验) 2026-06-18 起改为保送 (独立配额, 不参与粗排截断),
+     *     不再属于个性化/非个性化白名单, 见 mergeAndRankRovRecall 里 dkElementsBaosongTopN 段。
      */
     private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
             UserCate1RecallStrategy.PUSH_FORM,
@@ -51,8 +51,7 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
             Return1Cate2RosRecallStrategy.PUSH_FORM,
             Return1Cate2StrRecallStrategy.PUSH_FORM,
             YearShareCate1RecallStrategy.PUSH_FROM,
-            YearShareCate2RecallStrategy.PUSH_FROM,
-            YearShareDkElementsRecallStrategy.PUSH_FROM
+            YearShareCate2RecallStrategy.PUSH_FROM
     ));
 
     /**
@@ -79,7 +78,7 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
             YearReturnCate2RecallStrategy.PUSH_FROM
     ));
 
-    /** PERSONAL ∪ NON_PERSONAL = 23 路。用于 fetchCoarseRankScores 跳过流量池等不参与截断的 vid。 */
+    /** PERSONAL ∪ NON_PERSONAL = 22 路(参与粗排截断竞争的路)。 */
     private static final Set<String> ALL_ROV_PUSH_FROMS;
     static {
         Set<String> all = new HashSet<>(PERSONAL_RECALL_PUSH_FROMS);
@@ -87,10 +86,22 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
         ALL_ROV_PUSH_FROMS = Collections.unmodifiableSet(all);
     }
 
+    /**
+     * 粗排分拉取范围 = 22 路截断 + dk 保送路。dk 保送虽不参与截断竞争,但其 top-10 也按粗排分挑,
+     * 故需为它拉粗排分。仍跳过流量池 3 路(独立通道,不需要粗排分)。
+     */
+    private static final Set<String> COARSE_RANK_FETCH_PUSH_FROMS;
+    static {
+        Set<String> s = new HashSet<>(ALL_ROV_PUSH_FROMS);
+        s.add(YearShareDkElementsRecallStrategy.PUSH_FROM);
+        COARSE_RANK_FETCH_PUSH_FROMS = Collections.unmodifiableSet(s);
+    }
+
     /*
      * 设计要点:
      *   - fail-closed 白名单:RecallService 未来加新路不会自动进 V562,避免污染 vs V568 AB 对比
      *   - 流量池 3 路 (flow_pool / quick_flow_pool / recall_strategy_hotspot) 不在任何名单——独立通道
+     *   - dk_elements 行为路保送:独立配额 (默认 10),叠加在两路截断之后(不扰动前两路, 便于单路增益 AB),不在个性化/非个性化名单
      *   - 调用顺序 = 个性化优先:同 vid 双类命中时归个性化,保护用户兴趣信号
      */
 
@@ -132,6 +143,26 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
         log.info("coarse_rank_summary exp=562 quota={} pc={} ps={} nc={} ns={}",
                 totalTopN, personalCandidates, personalActual, nonPersonalCandidates, nonPersonalActual);
 
+        // ============================================================
+        // V562 实验:dk_elements 行为路保送 (独立配额, 叠加在两路截断之后)。
+        // 跑在最后面:个性化/非个性化先按基线行为选完, 不被这一路扰动;
+        // 现有日志/过程数据/归因均不改, 只把这一路纯叠加进来。
+        // top-N 选择口径由 Apollo 开关 dkBaosongUseCoarseScore 控制 (默认 1=粗排分; 0=dk 自身 rovn 分),
+        // 选择不受 setVideo 影响, 插入时仅跳过已在池中的 vid 以避免重复视频, 不回填后续名次。
+        // 观测单独打一条新日志, 不污染上面的 coarse_rank_summary。
+        // ============================================================
+        int dkElementsBaosongTopN = mergeWeight.getOrDefault("dkElementsBaosongTopN", 10.0).intValue();
+        // 1=按粗排分挑 (与两路截断同口径);0=按 dk 自身召回 rovn 分。传空 map 即回退到 rovScore 排序且不覆盖。
+        boolean dkBaosongUseCoarse = mergeWeight.getOrDefault("dkBaosongUseCoarseScore", 1.0) > 0.5;
+        Map<Long, Double> baosongScoreMap = dkBaosongUseCoarse ? coarseRankMap : Collections.emptyMap();
+        int sizeBeforeBaosong = rovRecallRank.size();
+        RecallUtils.extractRecallGuaranteed(dkElementsBaosongTopN, param, YearShareDkElementsRecallStrategy.PUSH_FROM, setVideo, rovRecallRank, baosongScoreMap);
+        int baosongActual = rovRecallRank.size() - sizeBeforeBaosong;
+        // 保送观测日志开关: 默认 0=不打; 1=打 (Apollo dkBaosongLog)
+        if (mergeWeight.getOrDefault("dkBaosongLog", 0.0) > 0.5) {
+            log.info("baosong_summary exp=562 dk_quota={} dk_actual={} use_coarse={}", dkElementsBaosongTopN, baosongActual, dkBaosongUseCoarse);
+        }
+
         // 记录召回源中的视频
         this.rankBeforePostProcessor(rovRecallRank);
 
@@ -430,10 +461,10 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
         double w24h = mergeWeight.getOrDefault("coarseRovn24hW", 0.5);
         double plus1h = mergeWeight.getOrDefault("coarseRovn1hSmoothPlus", 30.0);
         double plus24h = mergeWeight.getOrDefault("coarseRovn24hSmoothPlus", 30.0);
-        // 只对参与统一截断的 23 路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
+        // 对参与统一截断的 22 路 + dk 保送路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
         List<String> vids = param.getRecallResult().getData().stream()
                 .filter(d -> d != null && CollectionUtils.isNotEmpty(d.getVideos()))
-                .filter(d -> ALL_ROV_PUSH_FROMS.contains(d.getPushFrom()))
+                .filter(d -> COARSE_RANK_FETCH_PUSH_FROMS.contains(d.getPushFrom()))
                 .flatMap(d -> d.getVideos().stream())
                 .map(v -> String.valueOf(v.getVideoId()))
                 .distinct()

+ 46 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -216,6 +216,52 @@ public class RecallUtils {
         }
     }
 
+    /**
+     * 保送式抽取:用于"在不扰动其它召回路的前提下,验证单路召回增益"的实验。
+     *
+     * 选择口径:按粗排分 coarseRankMap 对该 pushFrom 的候选排序取 topN(与 {@link #extractAllAndTruncateByCoarseRank}
+     * 同口径;coarseRankMap 缺失的 vid 兜底用召回阶段 rovScore)。选择不受 setVideo 影响(对该路公平)。
+     *
+     * 与 {@link #extractRecall} 的区别:
+     *   - extractRecall: 先按 setVideo 去重再取 topN —— 撞了已选 vid 会回填后续名次,凑满 topN 个净新增;
+     *   - 本方法: 先按粗排分定出该路 topN,插入时才跳过已在池中的 vid 以避免同一视频重复,但**不回填**后续名次。
+     *
+     * 因此本方法的净新增数 = topN - (topN 内与已选池重叠的数量),反映"这一路 topN 真实带来的增量视频"。
+     * 命中的 vid 同截断路一样把 Video.rovScore 覆盖为粗排分;漏斗 entry.score 也一并覆盖
+     * (对齐 {@link #markCoarseRankAttribution}),让后续阶段与漏斗看到统一粗排信号。
+     * 调用方应放在其它召回路之后(保送跑最后),让前序路与基线行为完全一致。
+     */
+    public static void extractRecallGuaranteed(int recallNum, RankParam param, String pushFrom, Set<Long> setVideo,
+                                               List<Video> rovRecallRank, Map<Long, Double> coarseRankMap) {
+        if (recallNum <= 0) return;
+        Map<Long, Double> coarseMap = coarseRankMap != null ? coarseRankMap : Collections.emptyMap();
+        List<Video> sorted = new ArrayList<>(extractAndSort(param, pushFrom));
+        sorted.sort(Comparator.comparingDouble((Video v) -> coarseMap.getOrDefault(v.getVideoId(), v.getRovScore())).reversed());
+        List<Video> topN = sorted.subList(0, Math.min(recallNum, sorted.size()));
+        List<Video> added = new ArrayList<>();
+        for (Video v : topN) {
+            if (setVideo.add(v.getVideoId())) {  // 仅插入未在池中的,避免重复视频;不回填
+                Double coarse = coarseMap.get(v.getVideoId());
+                if (coarse != null) v.setRovScore(coarse);  // 与截断路一致,写回统一粗排信号
+                rovRecallRank.add(v);
+                added.add(v);
+            }
+        }
+        // 漏斗 entry.score 也覆盖成粗排分(markStep3Select 只标 select 不动 score,这里补上,与截断路一致)。
+        // coarseMap 为空(保送切到自身分数口径)时不覆盖,entry 保留召回阶段原分。
+        if (param != null && param.getFunnelContext() != null) {
+            List<RecallVideoEntry> entries = param.getFunnelContext().getStages123RecallByStrategy().get(pushFrom);
+            if (CollectionUtils.isNotEmpty(entries)) {
+                for (RecallVideoEntry e : entries) {
+                    if (!e.isFilteredIn()) continue;
+                    Double coarse = coarseMap.get(e.getVideoId());
+                    if (coarse != null) e.setScore(coarse);
+                }
+            }
+        }
+        markStep3Select(param, pushFrom, recallNum, sorted, added);
+    }
+
     /**
      * 阶段 3 截断标记 (单 pushFrom)。业务行为不变:
      * - SELF  = selfList 全集(业务实际贡献给合并的视频)