Kaynağa Gözat

feat:漏斗 step_6 改为纯排序截断, step_5 加 rank_index, 删 step_8

- step_5 (排序) 加 rank_index 字段, 按 rovRecallRank 顺序 1-based
- step_6 (排序截断) 改为 rovRecallRank 的前 size 条 (不含冷启), 作为"不冷启会是谁"的参考快照
- step_7 (冷启融合) 改用 step8OutputVideoIds (实际下发列表), 等于最终输出
- 删除 step_8_output 字段 (与 step_7 冗余, 冷启融合后即下发)
- recalls 字段包含 self + other 所有命中路, 带 select 标记
yangxiaohui 1 hafta önce
ebeveyn
işleme
2632ebe321

+ 9 - 33
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java

@@ -40,12 +40,11 @@ public class FunnelAggregator {
         row.put("step_2_filter_reasons", JSON.toJSONString(buildStep2Reasons(ctx)));
         row.put("step_3_truncated", JSON.toJSONString(buildStep3(ctx)));
 
-        // step 4-8
+        // step 4-7 (step_8 已并入 step_7 — 冷启融合后即最终输出)
         row.put("step_4_merged", JSON.toJSONString(buildStep4(ctx)));
         row.put("step_5_ranked", JSON.toJSONString(buildStep5(ctx)));
         row.put("step_6_rank_truncated", JSON.toJSONString(buildStep6(ctx)));
         row.put("step_7_cold_start", JSON.toJSONString(buildStep7(ctx)));
-        row.put("step_8_output", JSON.toJSONString(buildStep8(ctx)));
 
         return row;
     }
@@ -138,19 +137,23 @@ public class FunnelAggregator {
 
     private static JSONArray buildStep5(FunnelContext ctx) {
         JSONArray arr = new JSONArray();
+        int rankIdx = 0;
         for (Long vid : ctx.getStep4MergedVideoIds()) {
             RankVideoEntry r = ctx.getStep5RankedData().get(vid);
             if (r == null) continue;
+            rankIdx++;
             JSONObject o = new JSONObject();
             o.put("vid", vid);
             o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
             o.put("recalls", buildRecallsForVid(ctx, vid));
             o.put("rank", buildRankBlock(r));
+            o.put("rank_index", rankIdx);
             arr.add(o);
         }
         return arr;
     }
 
+    /** 排序截断: rovRecallRank 的前 size 条(不含冷启)— 给 BI 做"不冷启会是谁"的参考 */
     private static JSONArray buildStep6(FunnelContext ctx) {
         JSONArray arr = new JSONArray();
         for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
@@ -167,37 +170,8 @@ public class FunnelAggregator {
         return arr;
     }
 
+    /** 冷启融合后的最终输出(实际下发列表)— 用 step8OutputVideoIds */
     private static JSONArray buildStep7(FunnelContext ctx) {
-        JSONArray arr = new JSONArray();
-        // step 7 = step 6 + if_cold_start
-        for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
-            long vid = ctx.getStep6RankTruncatedVideoIds().get(i);
-            JSONObject o = new JSONObject();
-            o.put("vid", vid);
-            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
-            o.put("recalls", buildRecallsForVid(ctx, vid));
-            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
-            if (r != null) o.put("rank", buildRankBlock(r));
-            o.put("rank_index", i + 1);
-            ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
-            o.put("if_cold_start", action != ColdStartAction.NONE);
-            o.put("cold_start_action", action.name());
-            arr.add(o);
-        }
-        // 冷启 INSERTED 的视频可能不在 step6RankTruncatedVideoIds 里,补
-        ctx.getStep7ColdStartActions().forEach((vid, action) -> {
-            if (action == ColdStartAction.INSERTED && !ctx.getStep6RankTruncatedVideoIds().contains(vid)) {
-                JSONObject o = new JSONObject();
-                o.put("vid", vid);
-                o.put("if_cold_start", true);
-                o.put("cold_start_action", action.name());
-                arr.add(o);
-            }
-        });
-        return arr;
-    }
-
-    private static JSONArray buildStep8(FunnelContext ctx) {
         JSONArray arr = new JSONArray();
         for (int i = 0; i < ctx.getStep8OutputVideoIds().size(); i++) {
             long vid = ctx.getStep8OutputVideoIds().get(i);
@@ -209,6 +183,7 @@ public class FunnelAggregator {
             if (r != null) o.put("rank", buildRankBlock(r));
             ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
             o.put("if_cold_start", action != ColdStartAction.NONE);
+            o.put("cold_start_action", action.name());
             o.put("rank_index", i + 1);
             arr.add(o);
         }
@@ -221,12 +196,13 @@ public class FunnelAggregator {
         ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
             for (RecallVideoEntry e : entries) {
                 if (e.getVideoId() != vid) continue;
-                if (e.getSelect() != SelectKind.SELF) continue;   // 只列该路实际贡献到合并的视频
+                if (e.getSelect() == null) continue;   // 列出该 vid 在该路 top-recallNum 名次内的命中(self + other)
                 JSONObject r = new JSONObject();
                 r.put("strategy", pf);
                 r.put("index", displayIndex(e.getIndex()));
                 r.put("score", round6(e.getScore()));
                 r.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                r.put("select", e.getSelect().name().toLowerCase());
                 r.put("truncate", e.getTruncate());
                 recalls.add(r);
                 break;

+ 8 - 7
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -44,9 +44,9 @@ public abstract class RankService {
             tagDuplicateVideos(param);
             List<Video> rovRecallRank = mergeAndRankRovRecall(param);
             writeFunnelMergedStage(param, rovRecallRank);
+            writeFunnelRankTruncatedStage(param, rovRecallRank, param.getSize());
             RankResult result = new RankResult(rovRecallRank);
             result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
-            writeFunnelRankTruncatedStage(param, result.getVideos());
             return result;
         }
 
@@ -97,6 +97,7 @@ public abstract class RankService {
         // 2 正常走分发 排序+冷启动
         List<Video> rovRecallRank = mergeAndRankRovRecall(param);
         writeFunnelMergedStage(param, rovRecallRank);
+        writeFunnelRankTruncatedStage(param, rovRecallRank, param.getSize());
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);
 
         List<Video> douHotFlowPoolRank = extractAndSort(param, DouHotFlowPoolRecallStrategy.PUSH_FROM);
@@ -107,7 +108,6 @@ public abstract class RankService {
         RankResult result = mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
         if (result != null) {
             result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
-            writeFunnelRankTruncatedStage(param, result.getVideos());
         }
         return result;
     }
@@ -125,12 +125,13 @@ public abstract class RankService {
         }
     }
 
-    /** 阶段 6: 排序截断 — rankResult.videos 顺序 = rank_index */
-    private static void writeFunnelRankTruncatedStage(RankParam param, List<Video> rankedVideos) {
-        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rankedVideos)) return;
+    /** 阶段 6: 排序截断 — 纯 rank 排序后的前 size 条(不含冷启,作为冷启替换前的快照) */
+    private static void writeFunnelRankTruncatedStage(RankParam param, List<Video> rovRecallRank, int size) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rovRecallRank)) return;
         FunnelContext ctx = param.getFunnelContext();
-        for (Video v : rankedVideos) {
-            ctx.getStep6RankTruncatedVideoIds().add(v.getVideoId());
+        int n = Math.min(rovRecallRank.size(), size);
+        for (int i = 0; i < n; i++) {
+            ctx.getStep6RankTruncatedVideoIds().add(rovRecallRank.get(i).getVideoId());
         }
     }