浏览代码

Merge branch 'feature_20260609_dk_elements_parsing' of algorithm/recommend-server into master

yangxiaohui 2 天之前
父节点
当前提交
8db5ce7b7b

+ 2 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -496,12 +496,12 @@ public class RecommendService {
     }
     }
 
 
     /**
     /**
-     * null uid 风控错杀修复的实验集 (V563/V566/...). 命中其中任一实验时走精准修复路径;
+     * null uid 风控错杀修复的实验集 (V562/V563/V565/V566/...). 命中其中任一实验时走精准修复路径;
      * 多实验共享同一修复, 加新实验只需扩这个 Set。
      * 多实验共享同一修复, 加新实验只需扩这个 Set。
      *
      *
      * 走 judgeHitAlgoExp, 同时覆盖 abExpCodes 通道和 rootSessionId 尾号通道.
      * 走 judgeHitAlgoExp, 同时覆盖 abExpCodes 通道和 rootSessionId 尾号通道.
      */
      */
-    private static final Set<String> NULL_UID_FIX_EXP_CODES = new HashSet<>(Arrays.asList("563", "566"));
+    private static final Set<String> NULL_UID_FIX_EXP_CODES = new HashSet<>(Arrays.asList("562", "563", "565", "566"));
 
 
     private boolean isHitNullUidFixExp(RecommendRequest request, RecommendParam param) {
     private boolean isHitNullUidFixExp(RecommendRequest request, RecommendParam param) {
         for (String code : NULL_UID_FIX_EXP_CODES) {
         for (String code : NULL_UID_FIX_EXP_CODES) {

+ 150 - 36
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV562.java

@@ -36,6 +36,61 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
     @Autowired
     @Autowired
     private FeatureService featureService;
     private FeatureService featureService;
 
 
+    /**
+     * V562 个性化召回白名单 (7 路: V566 基础 6 路 + 1 路 dk_elements 行为路实验):召回 key 含 mid/uid,
+     * 依赖该用户行为信号。
+     * V562 实验路径: YearShareDkElements (用户近期 share 行为 join dk_elements)
+     * 注:YearReturnCate2 因线上效果不佳, 2026-06-04 起移到非个性化白名单。
+     */
+    private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            UserCate1RecallStrategy.PUSH_FORM,
+            UserCate2RecallStrategy.PUSH_FORM,
+            Return1Cate2RosRecallStrategy.PUSH_FORM,
+            Return1Cate2StrRecallStrategy.PUSH_FORM,
+            YearShareCate1RecallStrategy.PUSH_FROM,
+            YearShareCate2RecallStrategy.PUSH_FROM,
+            YearShareDkElementsRecallStrategy.PUSH_FROM
+    ));
+
+    /**
+     * V562 非个性化召回白名单 (17 路):只依赖 headVid + 地域/品类/相似度(vid-vid CF 也归此类)。
+     * 含 5 路旧地域、新地域、城市、head province/cate、先验省份、return 相似、scene CF、YearReturnCate2。
+     */
+    private static final Set<String> NON_PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            RegionHRecallStrategy.PUSH_FORM,
+            RegionHDupRecallStrategy.PUSH_FORM,
+            Region24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HDupRecallStrategy.PUSH_FORM,
+            RegionRealtimeRecallStrategyV1.PUSH_FORM,
+            CityRovnRecallStrategy.PUSH_FROM,
+            HeadProvinceCate1RecallStrategy.PUSH_FORM,
+            HeadProvinceCate2RecallStrategy.PUSH_FORM,
+            HeadCate2RovRecallStrategy.PUSH_FROM,
+            PrioriProvinceRovnRecallStrategy.PUSH_FROM,
+            PrioriProvinceStrRecallStrategy.PUSH_FROM,
+            PrioriProvinceRosRecallStrategy.PUSH_FROM,
+            ReturnVideoRecallStrategy.PUSH_FORM,
+            SceneCFRovnRecallStrategy.PUSH_FORM,
+            SceneCFRosnRecallStrategy.PUSH_FORM,
+            YearReturnCate2RecallStrategy.PUSH_FROM
+    ));
+
+    /** PERSONAL ∪ NON_PERSONAL = 23 路。用于 fetchCoarseRankScores 跳过流量池等不参与截断的 vid。 */
+    private static final Set<String> ALL_ROV_PUSH_FROMS;
+    static {
+        Set<String> all = new HashSet<>(PERSONAL_RECALL_PUSH_FROMS);
+        all.addAll(NON_PERSONAL_RECALL_PUSH_FROMS);
+        ALL_ROV_PUSH_FROMS = Collections.unmodifiableSet(all);
+    }
+
+    /*
+     * 设计要点:
+     *   - fail-closed 白名单:RecallService 未来加新路不会自动进 V562,避免污染 vs V568 AB 对比
+     *   - 流量池 3 路 (flow_pool / quick_flow_pool / recall_strategy_hotspot) 不在任何名单——独立通道
+     *   - 调用顺序 = 个性化优先:同 vid 双类命中时归个性化,保护用户兴趣信号
+     */
+
     @Override
     @Override
     public List<Video> mergeAndRankRovRecall(RankParam param) {
     public List<Video> mergeAndRankRovRecall(RankParam param) {
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
@@ -49,42 +104,30 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
         Set<Long> setVideo = new HashSet<>();
         Set<Long> setVideo = new HashSet<>();
         setVideo.add(param.getHeadVid());
         setVideo.add(param.getHeadVid());
         List<Video> rovRecallRank = new ArrayList<>();
         List<Video> rovRecallRank = new ArrayList<>();
-        // -------------------5路特殊旧召回------------------
-        RecallUtils.extractOldSpecialRecall(mergeWeight.getOrDefault("oldSpecialN", (double) param.getSize()).intValue(), param, setVideo, rovRecallRank);
-        //-------------------return相似召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v6", 5.0).intValue(), param, ReturnVideoRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------新地域召回 (V562: all_rov, V568 base 用 V1)------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v1", 5.0).intValue(), param, RegionRealtimeRecallStrategyV1AllRov.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------scene cf rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRovn", 5.0).intValue(), param, SceneCFRovnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rosn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRosn", 5.0).intValue(), param, SceneCFRosnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate1RecallN", 5.0).intValue(), param, UserCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate2RecallN", 5.0).intValue(), param, UserCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate1RecallN", 3.0).intValue(), param, HeadProvinceCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2RecallN", 3.0).intValue(), param, HeadProvinceCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------head cate2 of rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2Rov", 5.0).intValue(), param, HeadCate2RovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------city rovn (V562: all_rov, V568 base 用 v1)------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cityRov", 5.0).intValue(), param, CityRovnAllRovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRov", 3.0).intValue(), param, PrioriProvinceRovnRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceStr", 1.0).intValue(), param, PrioriProvinceStrRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------priori province ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("prioriProvinceRos", 1.0).intValue(), param, PrioriProvinceRosRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Ros", 5.0).intValue(), param, Return1Cate2RosRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Str", 5.0).intValue(), param, Return1Cate2StrRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate1", 5.0).intValue(), param, YearShareCate1RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate2", 5.0).intValue(), param, YearShareCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearReturnCate2", 5.0).intValue(), param, YearReturnCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
+
+        // ============================================================
+        // V562 实验:统一粗排分截断 (个性化 / 非个性化 两配额, 动态补足)
+        // 总配额 coarseRankTopN,个性化占 personalRatio。先个性化按上限抢位,
+        // 个性化不足时剩余名额转给非个性化,保证精排算力满载。
+        //
+        // 粗排分 = alg_vid_recommend_exp_feature_20250212.rovn_1h / rovn_24h 平均
+        // ============================================================
+        int totalTopN = mergeWeight.getOrDefault("coarseRankTopN", 80.0).intValue();
+        double personalRatio = mergeWeight.getOrDefault("personalRatio", 0.4);
+        int personalTopN = (int) Math.round(totalTopN * personalRatio);
+        Map<Long, Double> coarseRankMap = fetchCoarseRankScores(param);
+
+        int personalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforePersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(personalTopN, param, setVideo, rovRecallRank, coarseRankMap, PERSONAL_RECALL_PUSH_FROMS);
+        int personalActual = rovRecallRank.size() - sizeBeforePersonal;
+        int nonPersonalBudget = totalTopN - personalActual;  // 个性化不足时, 名额转给非个性化
+        int nonPersonalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforeNonPersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(nonPersonalBudget, param, setVideo, rovRecallRank, coarseRankMap, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int nonPersonalActual = rovRecallRank.size() - sizeBeforeNonPersonal;
+        log.info("coarse_rank_summary exp=562 quota={} pc={} ps={} nc={} ns={}",
+                totalTopN, personalCandidates, personalActual, nonPersonalCandidates, nonPersonalActual);
 
 
         // 记录召回源中的视频
         // 记录召回源中的视频
         this.rankBeforePostProcessor(rovRecallRank);
         this.rankBeforePostProcessor(rovRecallRank);
@@ -289,6 +332,77 @@ public class RankStrategy4RegionMergeModelV562 extends RankStrategy4RegionMergeM
         return result;
         return result;
     }
     }
 
 
+    /**
+     * V562 实验:拉取粗排分(按 vid → score 返回)。
+     *
+     * 数据源:alg_vid_recommend_exp_feature_20250212。
+     * 表里没有现成 rovn 字段,需要从原子字段 (return_n_uv_*, exp_*) 用 plusSmooth 算出来。
+     * 公式 = FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     * 默认 plus=30 与 FeatureV6.largerSmoothPlus 对齐,AB 对比不会因口径不同污染结论。
+     *
+     * Apollo 可调维度:
+     *   - coarseRovn1hW / coarseRovn24hW:1h 和 24h 的加权(默认 0.5/0.5)
+     *   - coarseRovn1hSmoothPlus / coarseRovn24hSmoothPlus:贝叶斯平滑系数(默认 30/30)
+     *
+     * 缺失自动归一化:单值缺失时剩下的撑起全部权重;两值都缺失则 caller 兜底 RovScore。
+     */
+    private Map<Long, Double> fetchCoarseRankScores(RankParam param) {
+        if (param == null || param.getRecallResult() == null
+                || CollectionUtils.isEmpty(param.getRecallResult().getData())) {
+            return Collections.emptyMap();
+        }
+        Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : Collections.emptyMap();
+        double w1h = mergeWeight.getOrDefault("coarseRovn1hW", 0.5);
+        double w24h = mergeWeight.getOrDefault("coarseRovn24hW", 0.5);
+        double plus1h = mergeWeight.getOrDefault("coarseRovn1hSmoothPlus", 30.0);
+        double plus24h = mergeWeight.getOrDefault("coarseRovn24hSmoothPlus", 30.0);
+        // 只对参与统一截断的 23 路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
+        List<String> vids = param.getRecallResult().getData().stream()
+                .filter(d -> d != null && CollectionUtils.isNotEmpty(d.getVideos()))
+                .filter(d -> ALL_ROV_PUSH_FROMS.contains(d.getPushFrom()))
+                .flatMap(d -> d.getVideos().stream())
+                .map(v -> String.valueOf(v.getVideoId()))
+                .distinct()
+                .collect(Collectors.toList());
+        if (vids.isEmpty()) return Collections.emptyMap();
+
+        Map<String, Map<String, Map<String, String>>> feats = featureService.getVideoCoarseRankFeature(vids);
+        Map<Long, Double> result = new HashMap<>(vids.size());
+        for (String vid : vids) {
+            Map<String, String> row = feats.getOrDefault(vid, Collections.emptyMap())
+                    .getOrDefault("alg_vid_recommend_exp_feature_20250212", Collections.emptyMap());
+            Double rovn1h = computeRovn(row, "1h", plus1h);
+            Double rovn24h = computeRovn(row, "24h", plus24h);
+            // 加权平均,缺失自动归一化
+            double sumW = (rovn1h != null ? w1h : 0) + (rovn24h != null ? w24h : 0);
+            if (sumW <= 0) continue;
+            double sumWS = (rovn1h != null ? rovn1h * w1h : 0) + (rovn24h != null ? rovn24h * w24h : 0);
+            try {
+                result.put(Long.parseLong(vid), sumWS / sumW);
+            } catch (NumberFormatException ignore) { }
+        }
+        return result;
+    }
+
+    /**
+     * 与 FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     *
+     * 字段语义(区分 0 vs null):
+     *   - exp 是 period 有效性 anchor:null 或 ≤0 → 整个 period 无效(return null)
+     *   - return_n_uv 缺失视为 0(真实信号"无回访"):rovn=0,参与加权(不会让另一时段兜底)
+     */
+    private static Double computeRovn(Map<String, String> row, String period, double smoothPlus) {
+        Double exp = parseDoubleOrNull(row.get("exp_" + period));
+        if (exp == null || exp <= 0) return null;
+        Double returnNuv = parseDoubleOrNull(row.get("return_n_uv_" + period));
+        return FeatureUtils.plusSmooth(returnNuv != null ? returnNuv : 0, exp, smoothPlus, 1);
+    }
+
+    private static Double parseDoubleOrNull(String s) {
+        if (StringUtils.isBlank(s)) return null;
+        try { return Double.parseDouble(s); } catch (NumberFormatException e) { return null; }
+    }
+
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
         if (null != userOriginInfo) {
         if (null != userOriginInfo) {
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");

+ 150 - 28
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV565.java

@@ -36,6 +36,61 @@ public class RankStrategy4RegionMergeModelV565 extends RankStrategy4RegionMergeM
     @Autowired
     @Autowired
     private FeatureService featureService;
     private FeatureService featureService;
 
 
+    /**
+     * V565 个性化召回白名单 (7 路: V566 基础 6 路 + 1 路 dk_elements 画像路实验):召回 key 含 mid/uid,
+     * 依赖该用户行为信号。
+     * V565 实验路径: UserProfileDkElements (用户元素画像 s_z_y_s/zt_gyf)
+     * 注:YearReturnCate2 因线上效果不佳, 2026-06-04 起移到非个性化白名单。
+     */
+    private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            UserCate1RecallStrategy.PUSH_FORM,
+            UserCate2RecallStrategy.PUSH_FORM,
+            Return1Cate2RosRecallStrategy.PUSH_FORM,
+            Return1Cate2StrRecallStrategy.PUSH_FORM,
+            YearShareCate1RecallStrategy.PUSH_FROM,
+            YearShareCate2RecallStrategy.PUSH_FROM,
+            UserProfileDkElementsRecallStrategy.PUSH_FROM
+    ));
+
+    /**
+     * V565 非个性化召回白名单 (17 路):只依赖 headVid + 地域/品类/相似度(vid-vid CF 也归此类)。
+     * 含 5 路旧地域、新地域、城市、head province/cate、先验省份、return 相似、scene CF、YearReturnCate2。
+     */
+    private static final Set<String> NON_PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
+            RegionHRecallStrategy.PUSH_FORM,
+            RegionHDupRecallStrategy.PUSH_FORM,
+            Region24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HRecallStrategy.PUSH_FORM,
+            RegionRelative24HDupRecallStrategy.PUSH_FORM,
+            RegionRealtimeRecallStrategyV1.PUSH_FORM,
+            CityRovnRecallStrategy.PUSH_FROM,
+            HeadProvinceCate1RecallStrategy.PUSH_FORM,
+            HeadProvinceCate2RecallStrategy.PUSH_FORM,
+            HeadCate2RovRecallStrategy.PUSH_FROM,
+            PrioriProvinceRovnRecallStrategy.PUSH_FROM,
+            PrioriProvinceStrRecallStrategy.PUSH_FROM,
+            PrioriProvinceRosRecallStrategy.PUSH_FROM,
+            ReturnVideoRecallStrategy.PUSH_FORM,
+            SceneCFRovnRecallStrategy.PUSH_FORM,
+            SceneCFRosnRecallStrategy.PUSH_FORM,
+            YearReturnCate2RecallStrategy.PUSH_FROM
+    ));
+
+    /** PERSONAL ∪ NON_PERSONAL = 23 路。用于 fetchCoarseRankScores 跳过流量池等不参与截断的 vid。 */
+    private static final Set<String> ALL_ROV_PUSH_FROMS;
+    static {
+        Set<String> all = new HashSet<>(PERSONAL_RECALL_PUSH_FROMS);
+        all.addAll(NON_PERSONAL_RECALL_PUSH_FROMS);
+        ALL_ROV_PUSH_FROMS = Collections.unmodifiableSet(all);
+    }
+
+    /*
+     * 设计要点:
+     *   - fail-closed 白名单:RecallService 未来加新路不会自动进 V565,避免污染 vs V568 AB 对比
+     *   - 流量池 3 路 (flow_pool / quick_flow_pool / recall_strategy_hotspot) 不在任何名单——独立通道
+     *   - 调用顺序 = 个性化优先:同 vid 双类命中时归个性化,保护用户兴趣信号
+     */
+
     @Override
     @Override
     public List<Video> mergeAndRankRovRecall(RankParam param) {
     public List<Video> mergeAndRankRovRecall(RankParam param) {
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
         Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : new HashMap<>(0);
@@ -49,34 +104,30 @@ public class RankStrategy4RegionMergeModelV565 extends RankStrategy4RegionMergeM
         Set<Long> setVideo = new HashSet<>();
         Set<Long> setVideo = new HashSet<>();
         setVideo.add(param.getHeadVid());
         setVideo.add(param.getHeadVid());
         List<Video> rovRecallRank = new ArrayList<>();
         List<Video> rovRecallRank = new ArrayList<>();
-        //-------------------return相似召回------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v6", 5.0).intValue(), param, ReturnVideoRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------新地域召回 (V565: all_rov, V568 base 用 V1)------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("v1", 5.0).intValue(), param, RegionRealtimeRecallStrategyV1AllRov.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------scene cf rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRovn", 5.0).intValue(), param, SceneCFRovnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------scene cf rosn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("sceneCFRosn", 5.0).intValue(), param, SceneCFRosnRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate1RecallN", 5.0).intValue(), param, UserCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------user cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cate2RecallN", 5.0).intValue(), param, UserCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate1------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate1RecallN", 3.0).intValue(), param, HeadProvinceCate1RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        // -------------------head province cate2------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2RecallN", 3.0).intValue(), param, HeadProvinceCate2RecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------head cate2 of rovn------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("headCate2Rov", 5.0).intValue(), param, HeadCate2RovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------city rovn (V565: all_rov, V568 base 用 v1)------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("cityRov", 5.0).intValue(), param, CityRovnAllRovRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 ros------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Ros", 5.0).intValue(), param, Return1Cate2RosRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-        //-------------------return1 cate2 str------------------
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("return1Cate2Str", 5.0).intValue(), param, Return1Cate2StrRecallStrategy.PUSH_FORM, setVideo, rovRecallRank);
-
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate1", 5.0).intValue(), param, YearShareCate1RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearShareCate2", 5.0).intValue(), param, YearShareCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
-        RecallUtils.extractRecall(mergeWeight.getOrDefault("yearReturnCate2", 5.0).intValue(), param, YearReturnCate2RecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
+
+        // ============================================================
+        // V565 实验:统一粗排分截断 (个性化 / 非个性化 两配额, 动态补足)
+        // 总配额 coarseRankTopN,个性化占 personalRatio。先个性化按上限抢位,
+        // 个性化不足时剩余名额转给非个性化,保证精排算力满载。
+        //
+        // 粗排分 = alg_vid_recommend_exp_feature_20250212.rovn_1h / rovn_24h 平均
+        // ============================================================
+        int totalTopN = mergeWeight.getOrDefault("coarseRankTopN", 80.0).intValue();
+        double personalRatio = mergeWeight.getOrDefault("personalRatio", 0.4);
+        int personalTopN = (int) Math.round(totalTopN * personalRatio);
+        Map<Long, Double> coarseRankMap = fetchCoarseRankScores(param);
+
+        int personalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforePersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(personalTopN, param, setVideo, rovRecallRank, coarseRankMap, PERSONAL_RECALL_PUSH_FROMS);
+        int personalActual = rovRecallRank.size() - sizeBeforePersonal;
+        int nonPersonalBudget = totalTopN - personalActual;  // 个性化不足时, 名额转给非个性化
+        int nonPersonalCandidates = RecallUtils.countDistinctCandidates(param, setVideo, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int sizeBeforeNonPersonal = rovRecallRank.size();
+        RecallUtils.extractAllAndTruncateByCoarseRank(nonPersonalBudget, param, setVideo, rovRecallRank, coarseRankMap, NON_PERSONAL_RECALL_PUSH_FROMS);
+        int nonPersonalActual = rovRecallRank.size() - sizeBeforeNonPersonal;
+        log.info("coarse_rank_summary exp=565 quota={} pc={} ps={} nc={} ns={}",
+                totalTopN, personalCandidates, personalActual, nonPersonalCandidates, nonPersonalActual);
 
 
         // 记录召回源中的视频
         // 记录召回源中的视频
         this.rankBeforePostProcessor(rovRecallRank);
         this.rankBeforePostProcessor(rovRecallRank);
@@ -281,6 +332,77 @@ public class RankStrategy4RegionMergeModelV565 extends RankStrategy4RegionMergeM
         return result;
         return result;
     }
     }
 
 
+    /**
+     * V565 实验:拉取粗排分(按 vid → score 返回)。
+     *
+     * 数据源:alg_vid_recommend_exp_feature_20250212。
+     * 表里没有现成 rovn 字段,需要从原子字段 (return_n_uv_*, exp_*) 用 plusSmooth 算出来。
+     * 公式 = FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     * 默认 plus=30 与 FeatureV6.largerSmoothPlus 对齐,AB 对比不会因口径不同污染结论。
+     *
+     * Apollo 可调维度:
+     *   - coarseRovn1hW / coarseRovn24hW:1h 和 24h 的加权(默认 0.5/0.5)
+     *   - coarseRovn1hSmoothPlus / coarseRovn24hSmoothPlus:贝叶斯平滑系数(默认 30/30)
+     *
+     * 缺失自动归一化:单值缺失时剩下的撑起全部权重;两值都缺失则 caller 兜底 RovScore。
+     */
+    private Map<Long, Double> fetchCoarseRankScores(RankParam param) {
+        if (param == null || param.getRecallResult() == null
+                || CollectionUtils.isEmpty(param.getRecallResult().getData())) {
+            return Collections.emptyMap();
+        }
+        Map<String, Double> mergeWeight = this.mergeWeight != null ? this.mergeWeight : Collections.emptyMap();
+        double w1h = mergeWeight.getOrDefault("coarseRovn1hW", 0.5);
+        double w24h = mergeWeight.getOrDefault("coarseRovn24hW", 0.5);
+        double plus1h = mergeWeight.getOrDefault("coarseRovn1hSmoothPlus", 30.0);
+        double plus24h = mergeWeight.getOrDefault("coarseRovn24hSmoothPlus", 30.0);
+        // 只对参与统一截断的 23 路 vid 拉粗排分(跳过流量池 3 路,省 proto + RPC 延迟)
+        List<String> vids = param.getRecallResult().getData().stream()
+                .filter(d -> d != null && CollectionUtils.isNotEmpty(d.getVideos()))
+                .filter(d -> ALL_ROV_PUSH_FROMS.contains(d.getPushFrom()))
+                .flatMap(d -> d.getVideos().stream())
+                .map(v -> String.valueOf(v.getVideoId()))
+                .distinct()
+                .collect(Collectors.toList());
+        if (vids.isEmpty()) return Collections.emptyMap();
+
+        Map<String, Map<String, Map<String, String>>> feats = featureService.getVideoCoarseRankFeature(vids);
+        Map<Long, Double> result = new HashMap<>(vids.size());
+        for (String vid : vids) {
+            Map<String, String> row = feats.getOrDefault(vid, Collections.emptyMap())
+                    .getOrDefault("alg_vid_recommend_exp_feature_20250212", Collections.emptyMap());
+            Double rovn1h = computeRovn(row, "1h", plus1h);
+            Double rovn24h = computeRovn(row, "24h", plus24h);
+            // 加权平均,缺失自动归一化
+            double sumW = (rovn1h != null ? w1h : 0) + (rovn24h != null ? w24h : 0);
+            if (sumW <= 0) continue;
+            double sumWS = (rovn1h != null ? rovn1h * w1h : 0) + (rovn24h != null ? rovn24h * w24h : 0);
+            try {
+                result.put(Long.parseLong(vid), sumWS / sumW);
+            } catch (NumberFormatException ignore) { }
+        }
+        return result;
+    }
+
+    /**
+     * 与 FeatureV6.oneTypeStatFeature 同口径:rovn = plusSmooth(return_n_uv, exp, plus, 1)
+     *
+     * 字段语义(区分 0 vs null):
+     *   - exp 是 period 有效性 anchor:null 或 ≤0 → 整个 period 无效(return null)
+     *   - return_n_uv 缺失视为 0(真实信号"无回访"):rovn=0,参与加权(不会让另一时段兜底)
+     */
+    private static Double computeRovn(Map<String, String> row, String period, double smoothPlus) {
+        Double exp = parseDoubleOrNull(row.get("exp_" + period));
+        if (exp == null || exp <= 0) return null;
+        Double returnNuv = parseDoubleOrNull(row.get("return_n_uv_" + period));
+        return FeatureUtils.plusSmooth(returnNuv != null ? returnNuv : 0, exp, smoothPlus, 1);
+    }
+
+    private static Double parseDoubleOrNull(String s) {
+        if (StringUtils.isBlank(s)) return null;
+        try { return Double.parseDouble(s); } catch (NumberFormatException e) { return null; }
+    }
+
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
     private UserShareReturnProfile parseUserProfile(Map<String, Map<String, String>> userOriginInfo) {
         if (null != userOriginInfo) {
         if (null != userOriginInfo) {
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");
             Map<String, String> c9 = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");

+ 15 - 29
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -175,48 +175,34 @@ public class RecallService implements ApplicationContextAware {
             strategies.add(strategyMap.get(UserDeconstructionKeywordsRecallStrategy.class.getSimpleName()));
             strategies.add(strategyMap.get(UserDeconstructionKeywordsRecallStrategy.class.getSimpleName()));
         }
         }
 
 
+        // V562/V565 同步成 V566 模式 (rank 类完全 cp 自 V566, 粗排分统一截断 + 老 region/city 白名单),
+        // 召回侧不再做"追加 all_rov + removeIf 老 region/city"的对调 —— 与 V564/V566 一致行为, 让公共池
+        // 22 路召回都跑, rank 类 extractAllAndTruncateByCoarseRank 按白名单 + 粗排分挑选.
+        //
+        // V562/V565 各承载一个 dk_elements 实验, 互相隔离便于独立归因:
+        //   V562 → YearShareDkElements:   用户近期 share 行为 join dk_elements -> elements_rovn_recall 倒排
+        //   V565 → UserProfileDkElements: 用户元素画像 (s_z_y_s/zt_gyf)     -> elements_rovn_recall 倒排
         boolean isHit562Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "562");
         boolean isHit562Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "562");
         if (isHit562Exp) {
         if (isHit562Exp) {
-            strategies.add(strategyMap.get(RegionRealtimeRecallStrategyV1AllRov.class.getSimpleName()));
-            strategies.add(strategyMap.get(CityRovnAllRovRecallStrategy.class.getSimpleName()));
-            // V562: rank 侧用 all_rov 系列替代 region_1h + city_rovn, 这里直接剔除老召回避免无效 OSS/Redis 调用
-            Set<String> v562RemoveSet = new HashSet<>(Arrays.asList(
-                    RegionRealtimeRecallStrategyV1.class.getSimpleName(),
-                    CityRovnRecallStrategy.class.getSimpleName()
-            ));
-            strategies.removeIf(s -> s != null && v562RemoveSet.contains(s.getClass().getSimpleName()));
+            strategies.add(strategyMap.get(YearShareDkElementsRecallStrategy.class.getSimpleName()));
         }
         }
-
         boolean isHit565Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "565");
         boolean isHit565Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "565");
         if (isHit565Exp) {
         if (isHit565Exp) {
-            strategies.add(strategyMap.get(RegionRealtimeRecallStrategyV1AllRov.class.getSimpleName()));
-            strategies.add(strategyMap.get(CityRovnAllRovRecallStrategy.class.getSimpleName()));
-            // V565: all_rov 替代 region_1h + city_rovn, 额外剔除 5 路 region 旧召回 (rank 侧已删 extractOldSpecial) + 3 路 priori province
-            Set<String> v565RemoveSet = new HashSet<>(Arrays.asList(
-                    RegionRealtimeRecallStrategyV1.class.getSimpleName(),
-                    CityRovnRecallStrategy.class.getSimpleName(),
-                    RegionHRecallStrategy.class.getSimpleName(),
-                    Region24HRecallStrategy.class.getSimpleName(),
-                    RegionHDupRecallStrategy.class.getSimpleName(),
-                    RegionRelative24HRecallStrategy.class.getSimpleName(),
-                    RegionRelative24HDupRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceRovnRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceStrRecallStrategy.class.getSimpleName(),
-                    PrioriProvinceRosRecallStrategy.class.getSimpleName()
-            ));
-            strategies.removeIf(s -> s != null && v565RemoveSet.contains(s.getClass().getSimpleName()));
+            strategies.add(strategyMap.get(UserProfileDkElementsRecallStrategy.class.getSimpleName()));
         }
         }
 
 
         // V564 实验:召回侧不做任何剔除/新增——让所有公共池召回都跑,
         // V564 实验:召回侧不做任何剔除/新增——让所有公共池召回都跑,
         // 由 V564 rank 类 (mergeAndRankRovRecall) 在 extractAllAndTruncateByCoarseRank
         // 由 V564 rank 类 (mergeAndRankRovRecall) 在 extractAllAndTruncateByCoarseRank
         // 里按全局粗排分统一截断。
         // 里按全局粗排分统一截断。
 
 
-        // V563/V566 命中且 uid="null" 的 guest user 不走流量池: V563/V566 修了 risk uid
-        // "null" 错杀, 这批 guest 不再 setRiskUser(true), 不在此处隔离会涌进流量池稀释
-        // 真实有效 uid 的曝光。非 V563/V566 用户继续走 bug 路径 (riskUser=true), 已被
+        // V562/V563/V565/V566 命中且 uid="null" 的 guest user 不走流量池: 这几个实验修了
+        // risk uid "null" 错杀, 这批 guest 不再 setRiskUser(true), 不在此处隔离会涌进流量池
+        // 稀释真实有效 uid 的曝光。非这几个实验的用户继续走 bug 路径 (riskUser=true), 已被
         // 第一个条件挡掉, 此 gate 与风控修复 AB 边界严格对齐。
         // 第一个条件挡掉, 此 gate 与风控修复 AB 边界严格对齐。
         boolean isHitNullUidFixExp = "null".equals(param.getUid())
         boolean isHitNullUidFixExp = "null".equals(param.getUid())
-                && (experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "563")
+                && (experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "562")
+                    || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "563")
+                    || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "565")
                     || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "566"));
                     || experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "566"));
 
 
         // 命中用户黑名单不走流量池
         // 命中用户黑名单不走流量池

+ 181 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserProfileDkElementsRecallStrategy.java

@@ -0,0 +1,181 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterService;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.util.FeatureUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 用户画像 实质元素 rovn 召回
+ *   数据源: param.userNetworkSeqFeature 里的 s_z_y_s (元素列表) + zt_gyf (归一分列表)
+ *           上游 alg_user_network_seq_feature 已新增, 来自 user_element_profile_hot
+ *           top_elements = UNION ALL(positive_ranked, negative_ranked), 所以归一分可能为负
+ *
+ *   逻辑: (element, score) pair 按 score DESC 取前 topN=30 正向元素 -> 一次 multiGet
+ *         elements_rovn_recall:{kw} 倒排 -> 同 vid 取 max score -> 排序 -> filter
+ *   只取正向 (score > 0), 避免召回用户厌恶元素
+ *
+ *   跟 YearShareDkElementsRecallStrategy 共用 Redis 倒排 key, 仅用户兴趣源 + 取法不同
+ */
+@Slf4j
+@Component
+public class UserProfileDkElementsRecallStrategy implements RecallStrategy {
+
+    @Autowired
+    @Qualifier("redisTemplate")
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    private FilterService filterService;
+
+    private final String CLASS_NAME = this.getClass().getSimpleName();
+
+    public static final int topN = 30;
+    public static final String PUSH_FROM = "recall_user_profile_dk_elements";
+    public static final String redisKeyPrefix = "elements_rovn_recall";
+
+    public static final String KEY_ELEMENTS = "s_z_y_s";
+    public static final String KEY_SCORES = "zt_gyf";
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FROM;
+    }
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        List<Video> videosResult = new ArrayList<>();
+        try {
+            if (MapUtils.isEmpty(param.getUserNetworkSeqFeature())) {
+                return videosResult;
+            }
+
+            List<String> elements = FeatureUtils.extractVidsFromUserNetworkSeqFeature(param.getUserNetworkSeqFeature(), KEY_ELEMENTS);
+            List<String> scores = FeatureUtils.extractVidsFromUserNetworkSeqFeature(param.getUserNetworkSeqFeature(), KEY_SCORES);
+            if (CollectionUtils.isEmpty(elements) || elements.size() != scores.size()) {
+                return videosResult;
+            }
+
+            List<String> topElements = pickTopPositiveElements(elements, scores);
+            if (CollectionUtils.isEmpty(topElements)) {
+                return videosResult;
+            }
+
+            List<String> keys = getRedisKey(topElements);
+            List<String> values = redisTemplate.opsForValue().multiGet(keys);
+
+            // 保留 Redis 倒排的真实 rovn 分 (而非位置分): scoresMap 的 score 会写到 Video.rovScore,
+            // 粗排截断 coarseMap miss 的 vid 会 fallback 用 Video.rovScore 排序, 真实分更有信号.
+            Map<Long, Double> scoresMap = recall(param.getVideoId(), values);
+            List<Long> ids = scoresMap.entrySet().stream()
+                    .sorted(Comparator.comparingDouble((Map.Entry<Long, Double> e) -> e.getValue()).reversed())
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
+
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
+            FilterResult filterResult = filterService.filter(filterParam);
+            if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+                for (Long vid : filterResult.getVideoIds()) {
+                    Video video = new Video();
+                    video.setVideoId(vid);
+                    video.setRovScore(scoresMap.getOrDefault(vid, 0.0));
+                    video.setPushFrom(pushFrom());
+                    videosResult.add(video);
+                }
+            }
+        } catch (Exception e) {
+            log.error("recall is wrong in {}, error={}", CLASS_NAME, e);
+        }
+        return videosResult;
+    }
+
+    /** 组对 + 过滤负向 + 按归一分降序 + 取前 topN 个 element */
+    private List<String> pickTopPositiveElements(List<String> elements, List<String> scores) {
+        List<Pair<String, Double>> pairs = new ArrayList<>();
+        for (int i = 0; i < elements.size(); i++) {
+            String element = elements.get(i);
+            if (StringUtils.isBlank(element)) {
+                continue;
+            }
+            double score = NumberUtils.toDouble(scores.get(i), 0.0);
+            if (score <= 0) {
+                continue;
+            }
+            pairs.add(Pair.of(element, score));
+        }
+        if (pairs.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return pairs.stream()
+                .sorted(Comparator.comparingDouble((Pair<String, Double> p) -> p.getValue()).reversed())
+                .map(Pair::getKey)
+                .distinct()
+                .limit(topN)
+                .collect(Collectors.toList());
+    }
+
+    private List<String> getRedisKey(List<String> elementList) {
+        List<String> keys = new ArrayList<>();
+        for (String element : elementList) {
+            keys.add(String.format("%s:%s", redisKeyPrefix, element));
+        }
+        return keys;
+    }
+
+    /**
+     * 解析 multiGet 拿到的 N 个 Redis value, 拼成 vid -> 真实 score map.
+     * value 格式: vid1,vid2,...\tscore1,score2,...  (rovn 真实分)
+     * 同 vid 在多个 element 倒排里出现时, 取 max score (跟 AbstractRedisRecallStrategy 一致).
+     */
+    private Map<Long, Double> recall(Long headVid, List<String> values) {
+        Map<Long, Double> scoresMap = new HashMap<>();
+        if (CollectionUtils.isEmpty(values)) {
+            return scoresMap;
+        }
+        for (String value : values) {
+            if (StringUtils.isBlank(value)) {
+                continue;
+            }
+            String[] cells = value.split("\t");
+            if (cells.length != 2) {
+                continue;
+            }
+            List<Long> ids;
+            List<Double> scores;
+            try {
+                ids = Arrays.stream(cells[0].split(",")).map(Long::valueOf).collect(Collectors.toList());
+                scores = Arrays.stream(cells[1].split(",")).map(Double::valueOf).collect(Collectors.toList());
+            } catch (NumberFormatException nfe) {
+                continue;
+            }
+            if (ids.isEmpty() || ids.size() != scores.size()) {
+                continue;
+            }
+            for (int i = 0; i < ids.size(); i++) {
+                long id = ids.get(i);
+                if (headVid != null && headVid == id) {
+                    continue;
+                }
+                scoresMap.merge(id, scores.get(i), Math::max);
+            }
+        }
+        return scoresMap;
+    }
+}

+ 197 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareDkElementsRecallStrategy.java

@@ -0,0 +1,197 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterService;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.util.DkElementsUtils;
+import com.tzld.piaoquan.recommend.server.util.FeatureUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 视频解构 实质元素 rovn 召回 (用户近期 share 行为 -> dk_elements)
+ *   每个 share vid 一般有多个 element, parseUserActionVideoAndElements 返回扁平的 (vid, element) pair 列表;
+ *   parse 时已按 c >= MIN_CONTRIB_SCORE (0.8) 过滤掉低贡献分元素, 噪声不进召回.
+ *
+ *   挑 kw 逻辑: 按近期 share 行为时间序遍历摊平 element, distinct 取前 topN (30) 个 -> 一次 multiGet
+ *               elements_rovn_recall:{kw} 倒排. 不再做"最近+最频"并集 (元素粒度比 cate2 细, 取近期 30 更直接).
+ *
+ *   上游 ODPS: alg_recsys_recall_elements_rovn (原始元素 -> top-50 vid + rovn 得分)
+ *   Redis key: elements_rovn_recall:{原始元素}
+ *   value: vid1,vid2,...\tscore1,score2,...
+ */
+@Slf4j
+@Component
+public class YearShareDkElementsRecallStrategy implements RecallStrategy {
+
+    @Autowired
+    @Qualifier("redisTemplate")
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    private FilterService filterService;
+
+    private final String CLASS_NAME = this.getClass().getSimpleName();
+
+    public static final String PUSH_FROM = "recall_user_year_share_dk_elements";
+    public static final String redisKeyPrefix = "elements_rovn_recall";
+
+    /** 元素贡献分过滤阈值 (parse 时丢弃 c < 0.8 的 element, 噪声元素不进召回) */
+    public static final double MIN_CONTRIB_SCORE = 0.8;
+    /** 摊平后按时间序 distinct 取前 N 个 element 进 Redis 倒排查询 */
+    public static final int topN = 30;
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+
+        List<Video> videosResult = new ArrayList<>();
+        try {
+
+            if (MapUtils.isEmpty(param.getUserNetworkSeqVideoInfoMap())) {
+                return videosResult;
+            }
+
+            List<Pair<Long, String>> userNetworkVideoElement = this.parseUserActionVideoAndElements(param.getUserNetworkSeqFeature(), param.getUserNetworkSeqVideoInfoMap());
+            if (CollectionUtils.isEmpty(userNetworkVideoElement)) {
+                return videosResult;
+            }
+            // 按用户近期 share 行为时间序遍历, distinct 取前 topN 个高贡献分 element
+            // (贡献分过滤已在 parseUserActionVideoAndElements 内 c >= MIN_CONTRIB_SCORE 完成)
+            List<String> allElements = userNetworkVideoElement.stream()
+                    .map(Pair::getValue)
+                    .filter(StringUtils::isNotBlank)
+                    .distinct()
+                    .limit(topN)
+                    .collect(Collectors.toList());
+
+            List<String> keys = this.getRedisKey(allElements);
+            List<String> values = redisTemplate.opsForValue().multiGet(keys);
+
+            // 保留 Redis 倒排的真实 rovn 分 (而非位置分): scoresMap 的 score 会写到 Video.rovScore,
+            // 粗排截断 coarseMap miss 的 vid 会 fallback 用 Video.rovScore 排序, 真实分更有信号.
+            Map<Long, Double> scoresMap = recall(param.getVideoId(), values);
+            List<Long> ids = scoresMap.entrySet().stream()
+                    .sorted(Comparator.comparingDouble((Map.Entry<Long, Double> e) -> e.getValue()).reversed())
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
+
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
+            FilterResult filterResult = filterService.filter(filterParam);
+            if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+                for (Long vid : filterResult.getVideoIds()) {
+                    Video video = new Video();
+                    video.setVideoId(vid);
+                    video.setRovScore(scoresMap.getOrDefault(vid, 0.0));
+                    video.setPushFrom(pushFrom());
+                    videosResult.add(video);
+                }
+            }
+        } catch (Exception e) {
+            log.error("recall is wrong in {}, error={}", CLASS_NAME, e);
+        }
+
+        return videosResult;
+    }
+
+    /**
+     * 摊平: 每个 share vid 一般有多个 dk_element, 输出 (vid, element) pair 序列, 按 vid 时间序保留
+     */
+    private List<Pair<Long, String>> parseUserActionVideoAndElements(Map<String, String> userNetworkSeqFeature, Map<Long, Map<String, String>> userNetworkSeqVideoInfoMap) {
+        List<Pair<Long, String>> result = new ArrayList<>();
+        List<String> actVidSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, "a_v_s");
+        List<String> actTypeSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, "a_t_s");
+        if (actVidSeq.size() != actTypeSeq.size()) {
+            return new ArrayList<>();
+        }
+
+        for (int i = 0; i < actVidSeq.size(); i++) {
+            long videoIdL = NumberUtils.toLong(actVidSeq.get(i), -1);
+            if (videoIdL <= 0) {
+                continue;
+            }
+            String type = actTypeSeq.get(i);
+            if (!"share".equals(type)) {
+                continue;
+            }
+
+            Map<String, String> videoBaseInfo = userNetworkSeqVideoInfoMap.getOrDefault(videoIdL, new HashMap<>());
+            String dkElementsStr = videoBaseInfo.get("dk_elements");
+            if (StringUtils.isBlank(dkElementsStr)) {
+                continue;
+            }
+            List<String> kws = DkElementsUtils.parseElementKws(dkElementsStr, MIN_CONTRIB_SCORE);
+            for (String kw : kws) {
+                result.add(Pair.of(videoIdL, kw));
+            }
+        }
+        return result;
+    }
+
+    private List<String> getRedisKey(List<String> elementList) {
+        List<String> keys = new ArrayList<>();
+        for (String element : elementList) {
+            keys.add(String.format("%s:%s", redisKeyPrefix, element));
+        }
+        return keys;
+    }
+
+    /**
+     * 解析 multiGet 拿到的 N 个 Redis value, 拼成 vid -> 真实 score map.
+     * value 格式: vid1,vid2,...\tscore1,score2,...  (rovn 真实分)
+     * 同 vid 在多个 element 倒排里出现时, 取 max score (跟 AbstractRedisRecallStrategy 一致).
+     */
+    private Map<Long, Double> recall(Long headVid, List<String> values) {
+        Map<Long, Double> scoresMap = new HashMap<>();
+        if (CollectionUtils.isEmpty(values)) {
+            return scoresMap;
+        }
+        for (String value : values) {
+            if (StringUtils.isBlank(value)) {
+                continue;
+            }
+            String[] cells = value.split("\t");
+            if (cells.length != 2) {
+                continue;
+            }
+            List<Long> ids;
+            List<Double> scores;
+            try {
+                ids = Arrays.stream(cells[0].split(",")).map(Long::valueOf).collect(Collectors.toList());
+                scores = Arrays.stream(cells[1].split(",")).map(Double::valueOf).collect(Collectors.toList());
+            } catch (NumberFormatException nfe) {
+                continue;
+            }
+            if (ids.isEmpty() || ids.size() != scores.size()) {
+                continue;
+            }
+            for (int i = 0; i < ids.size(); i++) {
+                long id = ids.get(i);
+                if (headVid != null && headVid == id) {
+                    continue;
+                }
+                scoresMap.merge(id, scores.get(i), Math::max);
+            }
+        }
+        return scoresMap;
+    }
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FROM;
+    }
+}

+ 74 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/DkElementsUtils.java

@@ -0,0 +1,74 @@
+package com.tzld.piaoquan.recommend.server.util;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * dk_elements JSON 串解析
+ * value 形如 {"党的二十大":{"d":"事件","p":"主题","c":2.1},"民生":{"d":"事件","p":"主题","c":1.4}}
+ *
+ * 当前只需要 element kw 列表 (LinkedHashMap 保 JSON 出现顺序), 未来若要 c 加权再扩展
+ * kw 清理跟 dk_keywords 老逻辑对齐: 去掉空白/制表/冒号
+ */
+@Slf4j
+public class DkElementsUtils {
+
+    public static List<String> parseElementKws(String dkElementsStr) {
+        return parseElementKws(dkElementsStr, Double.NEGATIVE_INFINITY);
+    }
+
+    /**
+     * 带贡献分阈值过滤: 只保留 element 内部 "c" 字段 >= minScore 的 kw, 按 JSON 出现顺序
+     * c 缺失或不可解析的 element 也会被剔除 (不放过任何不达标的)
+     */
+    public static List<String> parseElementKws(String dkElementsStr, double minScore) {
+        if (StringUtils.isBlank(dkElementsStr)) {
+            return Collections.emptyList();
+        }
+        try {
+            JSONObject obj = JSONObject.parseObject(dkElementsStr);
+            if (obj == null || obj.isEmpty()) {
+                return Collections.emptyList();
+            }
+            List<String> kws = new ArrayList<>(obj.size());
+            for (String raw : obj.keySet()) {
+                if (Double.isFinite(minScore) && !passContribFilter(obj, raw, minScore)) {
+                    continue;
+                }
+                String kw = cleanKw(raw);
+                if (!kw.isEmpty()) {
+                    kws.add(kw);
+                }
+            }
+            return kws;
+        } catch (Exception e) {
+            log.error("parseElementKws error, value=[{}]", dkElementsStr, e);
+            return Collections.emptyList();
+        }
+    }
+
+    private static boolean passContribFilter(JSONObject obj, String rawKey, double minScore) {
+        try {
+            JSONObject element = obj.getJSONObject(rawKey);
+            if (element == null) {
+                return false;
+            }
+            Double c = element.getDouble("c");
+            return c != null && c >= minScore;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private static String cleanKw(String kw) {
+        if (kw == null) {
+            return "";
+        }
+        return kw.replaceAll("(\\s+|\\t|:)", "");
+    }
+}

+ 10 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -145,6 +145,16 @@ public class RecallUtils {
         all.sort(Comparator.comparingDouble((Video v) -> coarseMap.getOrDefault(v.getVideoId(), v.getRovScore())).reversed());
         all.sort(Comparator.comparingDouble((Video v) -> coarseMap.getOrDefault(v.getVideoId(), v.getRovScore())).reversed());
         List<Video> picked = all.size() <= topN ? all : all.subList(0, topN);
         List<Video> picked = all.size() <= topN ? all : all.subList(0, topN);
 
 
+        // 把粗排分写回 picked Video.rovScore, 让后续阶段 (精排公式 / 重排 / funnel)
+        // 看到统一的粗排信号, 不再受召回阶段位置分/真实分差异影响.
+        // 粗排 map miss 的 vid 保留召回阶段 rovScore 作为兜底.
+        for (Video v : picked) {
+            Double coarseScore = coarseMap.get(v.getVideoId());
+            if (coarseScore != null) {
+                v.setRovScore(coarseScore);
+            }
+        }
+
         Set<Long> pickedIds = picked.stream().map(Video::getVideoId).collect(Collectors.toSet());
         Set<Long> pickedIds = picked.stream().map(Video::getVideoId).collect(Collectors.toSet());
         rovRecallRank.addAll(picked);
         rovRecallRank.addAll(picked);
         setVideo.addAll(pickedIds);
         setVideo.addAll(pickedIds);