Преглед изворни кода

feat: V564 实验改为召回侧统一粗排分截断

替代每路 extractRecall(n) 配额截断, 改成 23 路 (个性化 7 + 非个性化 16)
按粗排分加权排序后统一截断 topN, 个性化优先 + 不足时非个性化动态补足。

粗排分 = alg_vid_recommend_exp_feature_20250212 表 rovn_1h/24h 加权平均,
公式 plusSmooth(return_n_uv, exp, 30, 1) 与 FeatureV6 同口径。

Apollo 可调 6 维: coarseRankTopN/personalRatio/coarseRovn(1h|24h)(W|SmoothPlus)。

附带清理: RecallService 删除 if (isHit564Exp) 老 9 路剔除 (V564 现在需要全量
召回参与粗排 PK), 不再剔除 + 不再额外加载 provinceRovn 路。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
yangxiaohui пре 1 недеља
родитељ
комит
91c74a78f5

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/FeatureService.java

@@ -47,6 +47,19 @@ public class FeatureService {
         return feature.getVideoFeature();
     }
 
+    /** V570 实验:截断前批量拉粗排分特征(仅 alg_vid_recommend_exp_feature_20250212 单表) */
+    public Map<String, Map<String, Map<String, String>>> getVideoCoarseRankFeature(List<String> vids) {
+        if (CollectionUtils.isEmpty(vids)) {
+            return new HashMap<>();
+        }
+        List<FeatureKeyProto> protos = new ArrayList<>(vids.size());
+        for (String vid : vids) {
+            protos.add(genWithKeyMap("alg_vid_recommend_exp_feature_20250212", vid, ImmutableMap.of("vid", vid)));
+        }
+        Feature feature = getFeatureByProto(protos);
+        return feature.getVideoFeature();
+    }
+
     public Map<String, Map<String, Map<String, String>>> getVideoBaseInfo(String headVid, List<String> vidList) {
         List<FeatureKeyProto> protos = new ArrayList<>();
         if (null != headVid && !headVid.isEmpty()) {

+ 134 - 36
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV564.java

@@ -36,6 +36,57 @@ public class RankStrategy4RegionMergeModelV564 extends RankStrategy4RegionMergeM
     @Autowired
     private FeatureService featureService;
 
+    /**
+     * V564 个性化召回白名单 (7 路):召回 key 含 mid/uid,依赖该用户行为信号。
+     */
+    private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            UserCate1RecallStrategy.PUSH_FORM,
+            UserCate2RecallStrategy.PUSH_FORM,
+            Return1Cate2RosRecallStrategy.PUSH_FORM,
+            Return1Cate2StrRecallStrategy.PUSH_FORM,
+            YearShareCate1RecallStrategy.PUSH_FROM,
+            YearShareCate2RecallStrategy.PUSH_FROM,
+            YearReturnCate2RecallStrategy.PUSH_FROM
+    ));
+
+    /**
+     * V564 非个性化召回白名单 (16 路):只依赖 headVid + 地域/品类/相似度(vid-vid CF 也归此类)。
+     * 含 5 路旧地域、新地域、城市、head province/cate、先验省份、return 相似、scene CF。
+     */
+    private static final Set<String> NON_PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            RegionHRecallStrategy.PUSH_FORM,
+            RegionHDupRecallStrategy.PUSH_FORM,
+            Region24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HDupRecallStrategy.PUSH_FORM,
+            RegionRealtimeRecallStrategyV1.PUSH_FORM,
+            CityRovnRecallStrategy.PUSH_FROM,
+            HeadProvinceCate1RecallStrategy.PUSH_FORM,
+            HeadProvinceCate2RecallStrategy.PUSH_FORM,
+            HeadCate2RovRecallStrategy.PUSH_FROM,
+            PrioriProvinceRovnRecallStrategy.PUSH_FROM,
+            PrioriProvinceStrRecallStrategy.PUSH_FROM,
+            PrioriProvinceRosRecallStrategy.PUSH_FROM,
+            ReturnVideoRecallStrategy.PUSH_FORM,
+            SceneCFRovnRecallStrategy.PUSH_FORM,
+            SceneCFRosnRecallStrategy.PUSH_FORM
+    ));
+
+    /** PERSONAL ∪ NON_PERSONAL = 23 路。用于 fetchCoarseRankScores 跳过流量池等不参与截断的 vid。 */
+    private static final Set<String> ALL_ROV_PUSH_FROMS;
+    static {
+        Set<String> all = new HashSet<>(PERSONAL_RECALL_PUSH_FROMS);
+        all.addAll(NON_PERSONAL_RECALL_PUSH_FROMS);
+        ALL_ROV_PUSH_FROMS = Collections.unmodifiableSet(all);
+    }
+
+    /*
+     * 设计要点:
+     *   - fail-closed 白名单:RecallService 未来加新路不会自动进 V564,避免污染 vs V568 AB 对比
+     *   - 流量池 3 路 (flow_pool / quick_flow_pool / recall_strategy_hotspot) 不在任何名单——独立通道
+     *   - 调用顺序 = 个性化优先:同 vid 双类命中时归个性化,保护用户兴趣信号
+     */
+
     @Override
     public List<Video> mergeAndRankRovRecall(RankParam param) {
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
@@ -49,42 +100,24 @@ public class RankStrategy4RegionMergeModelV564 extends RankStrategy4RegionMergeM
         Set<Long> setVideo = new HashSet<>();
         setVideo.add(param.getHeadVid());
         List<Video> rovRecallRank = new ArrayList<>();
-        // -------------------5路特殊旧召回------------------
-        RecallUtils.extractOldSpecialRecall(mergeWeight.getOrDefault("oldSpecialN", (double) param.getSize()).intValue(), param, setVideo, rovRecallRank);
-        //-------------------return相似召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v6", 5.0).intValue(), param, ReturnVideoRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------新地域召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v1", 5.0).intValue(), param, RegionRealtimeRecallStrategyV1.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRovn", 5.0).intValue(), param, SceneCFRovnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rosn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRosn", 5.0).intValue(), param, SceneCFRosnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate1RecallN", 5.0).intValue(), param, UserCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate2RecallN", 5.0).intValue(), param, UserCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate1RecallN", 3.0).intValue(), param, HeadProvinceCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2RecallN", 3.0).intValue(), param, HeadProvinceCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------head cate2 of rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2Rov", 5.0).intValue(), param, HeadCate2RovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------city rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cityRov", 5.0).intValue(), param, CityRovnRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRov", 3.0).intValue(), param, PrioriProvinceRovnRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceStr", 1.0).intValue(), param, PrioriProvinceStrRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRos", 1.0).intValue(), param, PrioriProvinceRosRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Ros", 5.0).intValue(), param, Return1Cate2RosRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Str", 5.0).intValue(), param, Return1Cate2StrRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate1", 5.0).intValue(), param, YearShareCate1RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate2", 5.0).intValue(), param, YearShareCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearReturnCate2", 5.0).intValue(), param, YearReturnCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
+
+        // ============================================================
+        // V564 实验:统一粗排分截断 (个性化 / 非个性化 两配额, 动态补足)
+        // 总配额 coarseRankTopN,个性化占 personalRatio。先个性化按上限抢位,
+        // 个性化不足时剩余名额转给非个性化,保证精排算力满载。
+        //
+        // 粗排分 = alg_vid_recommend_exp_feature_20250212.rovn_1h / rovn_24h 平均
+        // ============================================================
+        int totalTopN = mergeWeight.getOrDefault("coarseRankTopN", 100.0).intValue();
+        double personalRatio = mergeWeight.getOrDefault("personalRatio", 0.4);
+        int personalTopN = (int) Math.round(totalTopN * personalRatio);
+        Map<Long, Double> coarseRankMap = fetchCoarseRankScores(param);
+
+        int sizeBeforePersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(personalTopN, param, setVideo, rovRecallRank, coarseRankMap, PERSONAL_RECALL_PUSH_FROMS);
+        int personalActual = rovRecallRank.size() - sizeBeforePersonal;
+        int nonPersonalBudget = totalTopN - personalActual;  // 个性化不足时, 名额转给非个性化
+        RecallUtils.extractAllAndTruncateByCoarseRank(nonPersonalBudget, param, setVideo, rovRecallRank, coarseRankMap, NON_PERSONAL_RECALL_PUSH_FROMS);
 
         // 记录召回源中的视频
         this.rankBeforePostProcessor(rovRecallRank);
@@ -289,6 +322,71 @@ public class RankStrategy4RegionMergeModelV564 extends RankStrategy4RegionMergeM
         return result;
     }
 
+    /**
+     * V564 实验:拉取粗排分(按 vid → score 返回)。
+     *
+     * 数据源:alg_vid_recommend_exp_feature_20250212。
+     * 表里没有现成 rovn 字段,需要从原子字段 (return_n_uv_*, exp_*) 用 plusSmooth 算出来。
+     * 公式 = FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     * 默认 plus=30 与 FeatureV6.largerSmoothPlus 对齐,AB 对比不会因口径不同污染结论。
+     *
+     * Apollo 可调维度:
+     *   - coarseRovn1hW / coarseRovn24hW:1h 和 24h 的加权(默认 0.5/0.5)
+     *   - coarseRovn1hSmoothPlus / coarseRovn24hSmoothPlus:贝叶斯平滑系数(默认 30/30)
+     *
+     * 缺失自动归一化:单值缺失时剩下的撑起全部权重;两值都缺失则 caller 兜底 RovScore。
+     */
+    private Map<Long, Double> fetchCoarseRankScores(RankParam param) {
+        if (param == null || param.getRecallResult() == null
+                || CollectionUtils.isEmpty(param.getRecallResult().getData())) {
+            return Collections.emptyMap();
+        }
+        Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : Collections.emptyMap();
+        double w1h = mergeWeight.getOrDefault("coarseRovn1hW", 0.5);
+        double w24h = mergeWeight.getOrDefault("coarseRovn24hW", 0.5);
+        double plus1h = mergeWeight.getOrDefault("coarseRovn1hSmoothPlus", 30.0);
+        double plus24h = mergeWeight.getOrDefault("coarseRovn24hSmoothPlus", 30.0);
+        // 只对参与统一截断的 23 路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
+        List<String> vids = param.getRecallResult().getData().stream()
+                .filter(d -> d != null && CollectionUtils.isNotEmpty(d.getVideos()))
+                .filter(d -> ALL_ROV_PUSH_FROMS.contains(d.getPushFrom()))
+                .flatMap(d -> d.getVideos().stream())
+                .map(v -> String.valueOf(v.getVideoId()))
+                .distinct()
+                .collect(Collectors.toList());
+        if (vids.isEmpty()) return Collections.emptyMap();
+
+        Map<String, Map<String, Map<String, String>>> feats = featureService.getVideoCoarseRankFeature(vids);
+        Map<Long, Double> result = new HashMap<>(vids.size());
+        for (String vid : vids) {
+            Map<String, String> row = feats.getOrDefault(vid, Collections.emptyMap())
+                    .getOrDefault("alg_vid_recommend_exp_feature_20250212", Collections.emptyMap());
+            Double rovn1h = computeRovn(row, "1h", plus1h);
+            Double rovn24h = computeRovn(row, "24h", plus24h);
+            // 加权平均,缺失自动归一化
+            double sumW = (rovn1h != null ? w1h : 0) + (rovn24h != null ? w24h : 0);
+            if (sumW <= 0) continue;
+            double sumWS = (rovn1h != null ? rovn1h * w1h : 0) + (rovn24h != null ? rovn24h * w24h : 0);
+            try {
+                result.put(Long.parseLong(vid), sumWS / sumW);
+            } catch (NumberFormatException ignore) { }
+        }
+        return result;
+    }
+
+    /** 与 FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1) */
+    private static Double computeRovn(Map<String, String> row, String period, double smoothPlus) {
+        Double exp = parseDoubleOrNull(row.get("exp_" + period));
+        Double returnNuv = parseDoubleOrNull(row.get("return_n_uv_" + period));
+        if (exp == null || exp <= 0 || returnNuv == null || returnNuv <= 0) return null;
+        return FeatureUtils.plusSmooth(returnNuv, exp, smoothPlus, 1);
+    }
+
+    private static Double parseDoubleOrNull(String s) {
+        if (StringUtils.isBlank(s)) return null;
+        try { return Double.parseDouble(s); } catch (NumberFormatException e) { return null; }
+    }
+
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
         if (null != userOriginInfo) {
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");

+ 3 - 17
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -207,23 +207,9 @@ public class RecallService implements ApplicationContextAware {
             strategies.removeIf(s -> s != null && v565RemoveSet.contains(s.getClass().getSimpleName()));
         }
 
-        boolean isHit564Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "564");
-        if (isHit564Exp) {
-            strategies.add(strategyMap.get(ProvinceRovnRecallStrategy.class.getSimpleName()));
-            // V564: rank 侧不再 extract 以下 9 路召回,这里直接剔除避免无效 Redis 调用
-            Set<String> v564RemoveSet = new HashSet<>(Arrays.asList(
-                    RegionRealtimeRecallStrategyV1.class.getSimpleName(),
-                    RegionHRecallStrategy.class.getSimpleName(),
-                    Region24HRecallStrategy.class.getSimpleName(),
-                    RegionHDupRecallStrategy.class.getSimpleName(),
-                    RegionRelative24HRecallStrategy.class.getSimpleName(),
-                    RegionRelative24HDupRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceRovnRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceStrRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceRosRecallStrategy.class.getSimpleName()
-            ));
-            strategies.removeIf(s -> s != null && v564RemoveSet.contains(s.getClass().getSimpleName()));
-        }
+        // V564 实验:召回侧不做任何剔除/新增——让所有公共池召回都跑,
+        // 由 V564 rank 类 (mergeAndRankRovRecall) 在 extractAllAndTruncateByCoarseRank
+        // 里按全局粗排分统一截断。
 
         // 命中用户黑名单不走流量池
         // 命中安全测试风险地域不走流量池

+ 85 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -66,6 +66,91 @@ public class RecallUtils {
         markSelfMixed(param, v0, sizeReturn);
     }
 
+    /**
+     * V564 实验:统一粗排分截断。
+     * 把 RecallResult 里 includedPushFroms 命中的路 dedupe 合并,按 coarseRankScorer 排序,取 topN。
+     * 不限制各路配额——分高的路可多上,分低的路可 0 上。
+     *
+     * 白名单 (fail-closed):未来 RecallService 新增召回路不会自动进 V564,必须显式加入白名单。
+     * 这保证 V564 vs V568 AB 对比时召回源严格一致。
+     *
+     * @param topN              截断后保留的总数
+     * @param param             RankParam(提供 RecallResult / FunnelContext)
+     * @param setVideo          已选 vid 黑名单(含 headVid)
+     * @param rovRecallRank     输出列表
+     * @param coarseRankMap     vid → 粗排分 全集(caller 负责数据源;缺失的 vid 在排序时用 RovScore 兜底)
+     * @param includedPushFroms 参与统一截断的 pushFrom 白名单(null/empty → 不挑任何路)
+     */
+    public static void extractAllAndTruncateByCoarseRank(int topN,
+                                                          RankParam param,
+                                                          Set<Long> setVideo,
+                                                          List<Video> rovRecallRank,
+                                                          Map<Long, Double> coarseRankMap,
+                                                          Set<String> includedPushFroms) {
+        if (topN <= 0 || param == null || param.getRecallResult() == null
+                || CollectionUtils.isEmpty(param.getRecallResult().getData())
+                || CollectionUtils.isEmpty(includedPushFroms)) {
+            return;
+        }
+        Map<Long, Double> coarseMap = coarseRankMap != null ? coarseRankMap : Collections.emptyMap();
+        // 1. 全并 + dedupe(首次命中保留,只挑 includedPushFroms 命中的路)
+        Map<Long, Video> mergedById = new LinkedHashMap<>();
+        for (RecallResult.RecallData data : param.getRecallResult().getData()) {
+            if (data == null || CollectionUtils.isEmpty(data.getVideos())) continue;
+            if (!includedPushFroms.contains(data.getPushFrom())) continue;
+            for (Video v : data.getVideos()) {
+                if (v == null) continue;
+                long vid = v.getVideoId();
+                if (setVideo.contains(vid)) continue;
+                mergedById.putIfAbsent(vid, v);
+            }
+        }
+        if (mergedById.isEmpty()) {
+            // 仍然给 includedPushFroms 范围内 entry 归因:本次没新增 vid,但要标 OTHER(被前面池抢光 + 没命中本类)
+            markCoarseRankAttribution(param, includedPushFroms, Collections.emptySet(), topN, coarseMap);
+            return;
+        }
+
+        // 2. 按粗排分排序(缺失兜底 RovScore),取 topN
+        List<Video> all = new ArrayList<>(mergedById.values());
+        all.sort(Comparator.comparingDouble((Video v) -> coarseMap.getOrDefault(v.getVideoId(), v.getRovScore())).reversed());
+        List<Video> picked = all.size() <= topN ? all : all.subList(0, topN);
+
+        Set<Long> pickedIds = picked.stream().map(Video::getVideoId).collect(Collectors.toSet());
+        rovRecallRank.addAll(picked);
+        setVideo.addAll(pickedIds);
+
+        // 3. 漏斗归因(仅 includedPushFroms 范围内的 entry):
+        //    - entry.score 覆盖为粗排分(命运分;同 vid 在所有命中路 entry 都覆盖同一个值)
+        //    - entry.select = SELF/OTHER 复用 V568 二元语义:vid 在本次截断胜出 → SELF,否则 OTHER
+        //      (跨类抢占场景: vid 同时在个性化+非个性化命中, 个性化先抢走时, 非个性化路的 vid entry
+        //       在第二次调用时 pickedIds 不含 it → 自动标 OTHER, 含义="vid 被前面池抢走", 与 V568 一致)
+        //    - entry.truncate = 本次调用的 topN
+        markCoarseRankAttribution(param, includedPushFroms, pickedIds, topN, coarseMap);
+    }
+
+    /**
+     * V564:覆盖 entry.score + 写 SELF/OTHER + truncate(仅 includedPushFroms 范围)。
+     * 流量池 3 路等 includedPushFroms 之外的 entry 保持原状(select=null, 不参与 V564 截断)。
+     */
+    private static void markCoarseRankAttribution(RankParam param, Set<String> includedPushFroms,
+                                                   Set<Long> pickedIds, int truncate,
+                                                   Map<Long, Double> coarseRankMap) {
+        if (param == null || param.getFunnelContext() == null
+                || CollectionUtils.isEmpty(includedPushFroms)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (String pf : includedPushFroms) {
+            List<RecallVideoEntry> entries = ctx.getStages123RecallByStrategy().get(pf);
+            if (CollectionUtils.isEmpty(entries)) continue;
+            for (RecallVideoEntry e : entries) {
+                Double coarse = coarseRankMap.get(e.getVideoId());
+                if (coarse != null) e.setScore(coarse);
+                e.setSelect(pickedIds.contains(e.getVideoId()) ? SelectKind.SELF : SelectKind.OTHER);
+                e.setTruncate(truncate);
+            }
+        }
+    }
+
     public static void extractRecall(int recallNum, RankParam param, String pushFrom, Set<Long> setVideo, List<Video> rovRecallRank) {
         if (recallNum > 0) {
             List<Video> rawFiltered = extractAndSort(param, pushFrom);