Prechádzať zdrojové kódy

Merge branch 'feature_20260610_v567_stack_v566_v536' of algorithm/recommend-server into master

yangxiaohui 1 deň pred
rodič
commit
cf16634529

+ 3 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -435,7 +435,7 @@ public class RecommendService {
             boolean isNoneUserRisk = isNoneUserRisk(request, param);
             boolean hitRiskScene = riskScenes.contains(request.getHotSceneType());
             boolean hitRiskUidCache = riskUserCache.getUnchecked(RedisKeyConstants.Recommend.riskUserUid).contains(param.getUid());
-            // V563/V566 实验补丁: 命中其一时, 把 String "null" uid 的命中改回 false,
+            // V563/V566/V567 实验补丁: 命中其一时, 把 String "null" uid 的命中改回 false,
             // 修 RickUserCacheJob 将 BIGINT NULL 转 String.valueOf(null)="null" 写入
             // Redis Set, 导致 client 传 "null" 字符串 uid 的 guest user 全部错杀的 bug.
             if (hitRiskUidCache && "null".equals(param.getUid()) && isHitNullUidFixExp(request, param)) {
@@ -496,12 +496,12 @@ public class RecommendService {
     }
 
     /**
-     * null uid 风控错杀修复的实验集 (V562/V563/V565/V566/...). 命中其中任一实验时走精准修复路径;
+     * null uid 风控错杀修复的实验集 (V562/V563/V565/V566/V567/...). 命中其中任一实验时走精准修复路径;
      * 多实验共享同一修复, 加新实验只需扩这个 Set。
      *
      * 走 judgeHitAlgoExp, 同时覆盖 abExpCodes 通道和 rootSessionId 尾号通道.
      */
-    private static final Set<String> NULL_UID_FIX_EXP_CODES = new HashSet<>(Arrays.asList("562", "563", "565", "566"));
+    private static final Set<String> NULL_UID_FIX_EXP_CODES = new HashSet<>(Arrays.asList("562", "563", "565", "566", "567"));
 
     private boolean isHitNullUidFixExp(RecommendRequest request, RecommendParam param) {
         for (String code : NULL_UID_FIX_EXP_CODES) {

+ 213 - 36
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV567.java

@@ -7,7 +7,9 @@ import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.model.MachineInfo;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.FeatureService;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
+import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.rank.extractor.ExtractVideoMergeCate;
 import com.tzld.piaoquan.recommend.server.service.rank.tansform.FeatureV6;
@@ -17,6 +19,7 @@ import com.tzld.piaoquan.recommend.server.util.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -36,6 +39,51 @@ public class RankStrategy4RegionMergeModelV567 extends RankStrategy4RegionMergeM
     @Autowired
     private FeatureService featureService;
 
+    /**
+     * V567 个性化召回白名单 (6 路):召回 key 含 mid/uid,依赖该用户行为信号。
+     * 与 V566 同源 — V567 叠加 V566 粗排截断模式,保持白名单一致便于 AB 对齐。
+     */
+    private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            UserCate1RecallStrategy.PUSH_FORM,
+            UserCate2RecallStrategy.PUSH_FORM,
+            Return1Cate2RosRecallStrategy.PUSH_FORM,
+            Return1Cate2StrRecallStrategy.PUSH_FORM,
+            YearShareCate1RecallStrategy.PUSH_FROM,
+            YearShareCate2RecallStrategy.PUSH_FROM
+    ));
+
+    /**
+     * V567 非个性化召回白名单 (17 路):只依赖 headVid + 地域/品类/相似度(vid-vid CF 也归此类)。
+     * 含 5 路旧地域、新地域、城市、head province/cate、先验省份、return 相似、scene CF、YearReturnCate2。
+     */
+    private static final Set<String> NON_PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            RegionHRecallStrategy.PUSH_FORM,
+            RegionHDupRecallStrategy.PUSH_FORM,
+            Region24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HDupRecallStrategy.PUSH_FORM,
+            RegionRealtimeRecallStrategyV1.PUSH_FORM,
+            CityRovnRecallStrategy.PUSH_FROM,
+            HeadProvinceCate1RecallStrategy.PUSH_FORM,
+            HeadProvinceCate2RecallStrategy.PUSH_FORM,
+            HeadCate2RovRecallStrategy.PUSH_FROM,
+            PrioriProvinceRovnRecallStrategy.PUSH_FROM,
+            PrioriProvinceStrRecallStrategy.PUSH_FROM,
+            PrioriProvinceRosRecallStrategy.PUSH_FROM,
+            ReturnVideoRecallStrategy.PUSH_FORM,
+            SceneCFRovnRecallStrategy.PUSH_FORM,
+            SceneCFRosnRecallStrategy.PUSH_FORM,
+            YearReturnCate2RecallStrategy.PUSH_FROM
+    ));
+
+    /** PERSONAL ∪ NON_PERSONAL = 23 路。用于 fetchCoarseRankScores 跳过流量池等不参与截断的 vid。 */
+    private static final Set<String> ALL_ROV_PUSH_FROMS;
+    static {
+        Set<String> all = new HashSet<>(PERSONAL_RECALL_PUSH_FROMS);
+        all.addAll(NON_PERSONAL_RECALL_PUSH_FROMS);
+        ALL_ROV_PUSH_FROMS = Collections.unmodifiableSet(all);
+    }
+
     @Override
     public List<Video> mergeAndRankRovRecall(RankParam param) {
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
@@ -49,42 +97,29 @@ public class RankStrategy4RegionMergeModelV567 extends RankStrategy4RegionMergeM
         Set<Long> setVideo = new HashSet<>();
         setVideo.add(param.getHeadVid());
         List<Video> rovRecallRank = new ArrayList<>();
-        // -------------------5路特殊旧召回------------------
-        RecallUtils.extractOldSpecialRecall(mergeWeight.getOrDefault("oldSpecialN", (double) param.getSize()).intValue(), param, setVideo, rovRecallRank);
-        //-------------------return相似召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v6", 5.0).intValue(), param, ReturnVideoRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------新地域召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v1", 5.0).intValue(), param, RegionRealtimeRecallStrategyV1.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRovn", 5.0).intValue(), param, SceneCFRovnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rosn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRosn", 5.0).intValue(), param, SceneCFRosnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate1RecallN", 5.0).intValue(), param, UserCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate2RecallN", 5.0).intValue(), param, UserCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate1RecallN", 3.0).intValue(), param, HeadProvinceCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2RecallN", 3.0).intValue(), param, HeadProvinceCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------head cate2 of rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2Rov", 5.0).intValue(), param, HeadCate2RovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------city rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cityRov", 5.0).intValue(), param, CityRovnRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRov", 3.0).intValue(), param, PrioriProvinceRovnRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceStr", 1.0).intValue(), param, PrioriProvinceStrRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRos", 1.0).intValue(), param, PrioriProvinceRosRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Ros", 5.0).intValue(), param, Return1Cate2RosRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Str", 5.0).intValue(), param, Return1Cate2StrRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate1", 5.0).intValue(), param, YearShareCate1RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate2", 5.0).intValue(), param, YearShareCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearReturnCate2", 5.0).intValue(), param, YearReturnCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
+
+        // ============================================================
+        // V567 叠加 V566 粗排截断:个性化 / 非个性化 两配额,动态补足
+        // 总配额 coarseRankTopN,个性化占 personalRatio。先个性化按上限抢位,
+        // 个性化不足时剩余名额转给非个性化,保证精排算力满载。
+        // 粗排分 = alg_vid_recommend_exp_feature_20250212.rovn_1h / rovn_24h 平均
+        // ============================================================
+        int totalTopN = mergeWeight.getOrDefault("coarseRankTopN", 80.0).intValue();
+        double personalRatio = mergeWeight.getOrDefault("personalRatio", 0.4);
+        int personalTopN = (int) Math.round(totalTopN * personalRatio);
+        Map<Long, Double> coarseRankMap = fetchCoarseRankScores(param);
+
+        int personalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforePersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(personalTopN, param, setVideo, rovRecallRank, coarseRankMap, PERSONAL_RECALL_PUSH_FROMS);
+        int personalActual = rovRecallRank.size() - sizeBeforePersonal;
+        int nonPersonalBudget = totalTopN - personalActual;
+        int nonPersonalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforeNonPersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(nonPersonalBudget, param, setVideo, rovRecallRank, coarseRankMap, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int nonPersonalActual = rovRecallRank.size() - sizeBeforeNonPersonal;
+        log.info("coarse_rank_summary exp=567 quota={} pc={} ps={} nc={} ns={}",
+                totalTopN, personalCandidates, personalActual, nonPersonalCandidates, nonPersonalActual);
 
         // 记录召回源中的视频
         this.rankBeforePostProcessor(rovRecallRank);
@@ -289,6 +324,148 @@ public class RankStrategy4RegionMergeModelV567 extends RankStrategy4RegionMergeM
         return result;
     }
 
+    /**
+     * V567 叠加 V536 fusion:只保留"流量池相关 + 兜底相关"逻辑
+     *   1. rov 空兜底:rov 池为空时流量池直接顶上 (Basic 段 1)
+     *   7. 流量池按比例强插:topK 头部锁 rov + topK..size 按 flowPoolP / newFlowPoolSelectRate 概率门
+     *      混入 flowVideos / douHotFlowPoolVideos,否则用 rov 中段;一侧用光时另一侧兜底回填 (Basic 段 7)
+     *
+     * 删除(相对 Basic):标签 filter / rov boost / 强插 / 品类降权 / 节日降权 / 密度控制
+     */
+    @Override
+    public RankResult mergeAndSort(RankParam param, List<Video> rovVideos, List<Video> flowVideos, List<Video> douHotFlowPoolVideos) {
+
+        // 1 兜底策略,rov池子不足时,用冷启池填补。直接返回。
+        if (CollectionUtils.isEmpty(rovVideos)) {
+            if (param.getSize() < flowVideos.size()) {
+                return new RankResult(flowVideos.subList(0, param.getSize()));
+            } else {
+                return new RankResult(flowVideos);
+            }
+        }
+
+        // 7 流量池按比例强插
+        FunnelContext funnelCtx = param.getFunnelContext();
+        List<Video> result = new ArrayList<>();
+        for (int i = 0; i < param.getTopK() && i < rovVideos.size(); i++) {
+            result.add(rovVideos.get(i));
+        }
+        double flowPoolP = getFlowPoolP(param);
+        int flowPoolIndex = 0;
+        int rovPoolIndex = param.getTopK();
+        for (int i = 0; i < param.getSize() - param.getTopK(); i++) {
+            double rand = RandomUtils.nextDouble(0, 1);
+            if (rand < flowPoolP) {
+                if (flowPoolIndex < flowVideos.size()) {
+                    Video v = flowVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
+                } else {
+                    break;
+                }
+            } else if (this.isInsertDouHotFlowPoolVideo()) {
+                if (flowPoolIndex < douHotFlowPoolVideos.size()) {
+                    Video v = douHotFlowPoolVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
+                } else {
+                    break;
+                }
+            } else {
+                if (rovPoolIndex < rovVideos.size()) {
+                    result.add(rovVideos.get(rovPoolIndex++));
+                } else {
+                    break;
+                }
+            }
+        }
+        if (rovPoolIndex >= rovVideos.size()) {
+            for (int i = flowPoolIndex; i < flowVideos.size() && result.size() < param.getSize(); i++) {
+                Video v = flowVideos.get(i);
+                result.add(v);
+                markColdStartInserted(funnelCtx, v);
+            }
+        }
+        if (flowPoolIndex >= flowVideos.size()) {
+            for (int i = rovPoolIndex; i < rovVideos.size() && result.size() < param.getSize(); i++) {
+                result.add(rovVideos.get(i));
+            }
+        }
+
+        return new RankResult(result);
+    }
+
+    /**
+     * V567 叠加 V566 粗排截断:拉取粗排分(按 vid → score 返回)。
+     *
+     * 数据源:alg_vid_recommend_exp_feature_20250212。
+     * 表里没有现成 rovn 字段,需要从原子字段 (return_n_uv_*, exp_*) 用 plusSmooth 算出来。
+     * 公式 = FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     * 默认 plus=30 与 FeatureV6.largerSmoothPlus 对齐,AB 对比不会因口径不同污染结论。
+     *
+     * Apollo 可调维度:
+     *   - coarseRovn1hW / coarseRovn24hW:1h 和 24h 的加权(默认 0.5/0.5)
+     *   - coarseRovn1hSmoothPlus / coarseRovn24hSmoothPlus:贝叶斯平滑系数(默认 30/30)
+     *
+     * 缺失自动归一化:单值缺失时剩下的撑起全部权重;两值都缺失则 caller 兜底 RovScore。
+     */
+    private Map<Long, Double> fetchCoarseRankScores(RankParam param) {
+        if (param == null || param.getRecallResult() == null
+                || CollectionUtils.isEmpty(param.getRecallResult().getData())) {
+            return Collections.emptyMap();
+        }
+        Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : Collections.emptyMap();
+        double w1h = mergeWeight.getOrDefault("coarseRovn1hW", 0.5);
+        double w24h = mergeWeight.getOrDefault("coarseRovn24hW", 0.5);
+        double plus1h = mergeWeight.getOrDefault("coarseRovn1hSmoothPlus", 30.0);
+        double plus24h = mergeWeight.getOrDefault("coarseRovn24hSmoothPlus", 30.0);
+        // 只对参与统一截断的 23 路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
+        List<String> vids = param.getRecallResult().getData().stream()
+                .filter(d -> d != null && CollectionUtils.isNotEmpty(d.getVideos()))
+                .filter(d -> ALL_ROV_PUSH_FROMS.contains(d.getPushFrom()))
+                .flatMap(d -> d.getVideos().stream())
+                .map(v -> String.valueOf(v.getVideoId()))
+                .distinct()
+                .collect(Collectors.toList());
+        if (vids.isEmpty()) return Collections.emptyMap();
+
+        Map<String, Map<String, Map<String, String>>> feats = featureService.getVideoCoarseRankFeature(vids);
+        Map<Long, Double> result = new HashMap<>(vids.size());
+        for (String vid : vids) {
+            Map<String, String> row = feats.getOrDefault(vid, Collections.emptyMap())
+                    .getOrDefault("alg_vid_recommend_exp_feature_20250212", Collections.emptyMap());
+            Double rovn1h = computeRovn(row, "1h", plus1h);
+            Double rovn24h = computeRovn(row, "24h", plus24h);
+            // 加权平均,缺失自动归一化
+            double sumW = (rovn1h != null ? w1h : 0) + (rovn24h != null ? w24h : 0);
+            if (sumW <= 0) continue;
+            double sumWS = (rovn1h != null ? rovn1h * w1h : 0) + (rovn24h != null ? rovn24h * w24h : 0);
+            try {
+                result.put(Long.parseLong(vid), sumWS / sumW);
+            } catch (NumberFormatException ignore) { }
+        }
+        return result;
+    }
+
+    /**
+     * 与 FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     *
+     * 字段语义(区分 0 vs null):
+     *   - exp 是 period 有效性 anchor:null 或 ≤0 → 整个 period 无效(return null)
+     *   - return_n_uv 缺失视为 0(真实信号"无回访"):rovn=0,参与加权(不会让另一时段兜底)
+     */
+    private static Double computeRovn(Map<String, String> row, String period, double smoothPlus) {
+        Double exp = parseDoubleOrNull(row.get("exp_" + period));
+        if (exp == null || exp <= 0) return null;
+        Double returnNuv = parseDoubleOrNull(row.get("return_n_uv_" + period));
+        return FeatureUtils.plusSmooth(returnNuv != null ? returnNuv : 0, exp, smoothPlus, 1);
+    }
+
+    private static Double parseDoubleOrNull(String s) {
+        if (StringUtils.isBlank(s)) return null;
+        try { return Double.parseDouble(s); } catch (NumberFormatException e) { return null; }
+    }
+
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
         if (null != userOriginInfo) {
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");

+ 4 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -175,7 +175,7 @@ public class RecallService implements ApplicationContextAware {
             strategies.add(strategyMap.get(UserDeconstructionKeywordsRecallStrategy.class.getSimpleName()));
         }
 
-        // V562/V565 同步成 V566 模式 (rank 类完全 cp 自 V566, 粗排分统一截断 + 老 region/city 白名单),
+        // V562/V565/V567 同步成 V566 模式 (rank 类粗排分统一截断 + 老 region/city 白名单),
         // 召回侧不再做"追加 all_rov + removeIf 老 region/city"的对调 —— 与 V564/V566 一致行为, 让公共池
         // 22 路召回都跑, rank 类 extractAllAndTruncateByCoarseRank 按白名单 + 粗排分挑选.
         //
@@ -195,7 +195,7 @@ public class RecallService implements ApplicationContextAware {
         // 由 V564 rank 类 (mergeAndRankRovRecall) 在 extractAllAndTruncateByCoarseRank
         // 里按全局粗排分统一截断。
 
-        // V562/V563/V565/V566 命中且 uid="null" 的 guest user 不走流量池: 这几个实验修了
+        // V562/V563/V565/V566/V567 命中且 uid="null" 的 guest user 不走流量池: 这几个实验修了
         // risk uid "null" 错杀, 这批 guest 不再 setRiskUser(true), 不在此处隔离会涌进流量池
         // 稀释真实有效 uid 的曝光。非这几个实验的用户继续走 bug 路径 (riskUser=true), 已被
         // 第一个条件挡掉, 此 gate 与风控修复 AB 边界严格对齐。
@@ -203,7 +203,8 @@ public class RecallService implements ApplicationContextAware {
                 && (experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "562")
                     || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "563")
                     || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "565")
-                    || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "566"));
+                    || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "566")
+                    || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "567"));
 
         // 命中用户黑名单不走流量池
         // 命中安全测试风险地域不走流量池