Prechádzať zdrojové kódy

feat:漏斗日志升级8阶段 (召回/过滤/截断/合并/排序/排序截断/冷启/输出)

- 中间结构 FunnelContext + RecallVideoEntry + RankVideoEntry + ColdStartAction + SelectKind
- step 1-3: FilterService 写召回 raw + 过滤 reasons; RecallUtils 写截断 select=self/other
- step 4-6: RankService 写合并/打分/排序截断 (Video.sortScore 直接回写)
- step 7: RankStrategy4RegionMergeModelBasic.mergeAndSort 冷启 INSERTED 标记
- step 8: RecommendService 写最终下发列表
- RecallService 末尾 syncScoresToFunnel: 把 Video.rovScore 反向同步到 entry.score
  (解决部分 strategy filter 前 scoresMap 为空导致 score=0)
- FunnelAggregator 序列化为单条 SLS 记录 (8 个 step JSON 字段 + filter_reasons),
  index 1-based, score 保留 6 位小数
- FunnelLogConfig Apollo 配置 (funnel.log.config): enabled / sampleRate / appTypes
  默认全关; 关闭时 ctx=null 各路径短路, 零性能开销
yangxiaohui 1 týždeň pred
rodič
commit
2d23efc762
20 zmenil súbory, kde vykonal 657 pridanie a 198 odobranie
  1. 51 14
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  2. 3 4
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java
  3. 67 17
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java
  4. 13 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/ColdStartAction.java
  5. 225 110
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java
  6. 54 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelContext.java
  7. 19 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogConfig.java
  8. 8 15
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java
  9. 0 18
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelRawItem.java
  10. 18 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RankVideoEntry.java
  11. 40 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RecallVideoEntry.java
  12. 10 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/SelectKind.java
  13. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java
  14. 28 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java
  15. 18 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelBasic.java
  16. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java
  17. 3 4
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java
  18. 0 7
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java
  19. 25 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java
  20. 71 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

+ 51 - 14
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -25,8 +25,9 @@ import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.RankRouter;
 import com.tzld.piaoquan.recommend.server.service.funnel.FunnelAggregator;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelLogConfig;
 import com.tzld.piaoquan.recommend.server.service.funnel.FunnelLogService;
-import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
@@ -53,8 +54,6 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.ZoneId;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -104,6 +103,10 @@ public class RecommendService {
     @ApolloJsonValue("${none.user.risk.exclude.apptype:[]}")
     private Set<Integer> noneUserRiskExcludeAppTypes;
 
+    /** 漏斗日志配置 (Apollo 热调,JSON: {enabled, sampleRate, appTypes}) */
+    @ApolloJsonValue("${funnel.log.config:{}}")
+    private FunnelLogConfig funnelLogConfig = new FunnelLogConfig();
+
     @ApolloJsonValue("${testing.risk.province:[]}")
     private Set<String> testingRiskProvince;
     @ApolloJsonValue("${testing.risk.city:[]}")
@@ -581,8 +584,8 @@ public class RecommendService {
         recallParam.setUserSocialRecallInfo(userSocialRecallInfo);
         recallParam.setUserNetworkSeqFeature(userNetworkSeqFeature);
         recallParam.setUserNetworkSeqVideoInfoMap(userNetworkSeqVideoInfoMap);
-        ConcurrentMap<String, List<FunnelRawItem>> funnelSink = new ConcurrentHashMap<>();
-        recallParam.setFunnelSink(funnelSink);
+        FunnelContext funnelContext = shouldSampleFunnel(request) ? buildFunnelContext(request, param) : null;
+        recallParam.setFunnelContext(funnelContext);
         RecallResult recallResult = recallService.recall(recallParam);
 
         long recallTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -595,6 +598,7 @@ public class RecommendService {
         rankParam.setHeadInfo(headVideoInfo);
         rankParam.setUserRTShareList(param.getUserRTShareList());
         rankParam.setBehaviorVideos(behaviorVideos);
+        rankParam.setFunnelContext(funnelContext);
         RankResult rankResult = rankRouter.rank(rankParam);
 
         long rankTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -603,7 +607,7 @@ public class RecommendService {
 
 
         if (rankResult == null || CollectionUtils.isEmpty(rankResult.getVideos())) {
-            logFunnel(request, param, recallParam, recallResult, rankResult, Collections.emptyList());
+            if (funnelContext != null) logFunnel(funnelContext, Collections.emptyList());
             return Collections.emptyList();
         }
 
@@ -612,18 +616,51 @@ public class RecommendService {
         if (param.getSize() < rankResult.getVideos().size()) {
             videos = rankResult.getVideos().subList(0, param.getSize());
         }
-        logFunnel(request, param, recallParam, recallResult, rankResult, videos);
+        if (funnelContext != null) logFunnel(funnelContext, videos);
         return videos;
     }
 
-    private void logFunnel(RecommendRequest request, RecommendParam param, RecallParam recallParam,
-                           RecallResult recallResult, RankResult rankResult, List<Video> returnedVideos) {
+    private boolean shouldSampleFunnel(RecommendRequest request) {
+        if (request == null || funnelLogConfig == null || !funnelLogConfig.isEnabled()) return false;
+        int rate = funnelLogConfig.getSampleRate();
+        if (rate <= 0) return false;
+        Set<Integer> appTypes = funnelLogConfig.getAppTypes();
+        if (CollectionUtils.isNotEmpty(appTypes) && !appTypes.contains(request.getAppType())) {
+            return false;
+        }
+        if (rate >= 100) return true;
+        String mid = StringUtils.defaultString(request.getMid());
+        int hash = (mid.hashCode() & 0x7FFFFFFF) % 100;
+        return hash < rate;
+    }
+
+    private FunnelContext buildFunnelContext(RecommendRequest request, RecommendParam param) {
+        FunnelContext ctx = new FunnelContext();
+        ctx.setTraceId(String.valueOf(TraceUtils.currentTraceId()));
+        if (request != null) {
+            ctx.setRecommendTraceId(Strings.nullToEmpty(request.getRecommendTraceId()));
+            ctx.setSessionId(Strings.nullToEmpty(request.getSessionId()));
+            ctx.setSubSessionId(Strings.nullToEmpty(request.getSubSessionId()));
+            ctx.setRootSessionId(Strings.nullToEmpty(request.getRootSessionId()));
+            ctx.setMid(Strings.nullToEmpty(request.getMid()));
+            ctx.setAppType(request.getAppType());
+            ctx.setNewExpGroup(Strings.nullToEmpty(request.getNewExpGroup()));
+        }
+        if (param != null) {
+            ctx.setAbExpCodes(param.getAbExpCodes());
+        }
+        return ctx;
+    }
+
+    private void logFunnel(FunnelContext ctx, List<Video> returnedVideos) {
         try {
-            String traceId = String.valueOf(TraceUtils.currentTraceId());
-            List<Map<String, String>> rows = FunnelAggregator.build(
-                    traceId, request, param, recallParam.getFunnelSink(),
-                    recallResult, rankResult, returnedVideos);
-            funnelLogService.log(rows);
+            if (CollectionUtils.isNotEmpty(returnedVideos)) {
+                for (Video v : returnedVideos) {
+                    ctx.getStep8OutputVideoIds().add(v.getVideoId());
+                }
+            }
+            Map<String, String> row = FunnelAggregator.toLogItem(ctx);
+            funnelLogService.log(row);
         } catch (Exception e) {
             log.error("logFunnel error", e);
         }

+ 3 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java

@@ -1,12 +1,11 @@
 package com.tzld.piaoquan.recommend.server.service.filter;
 
-import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import lombok.Data;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author dyp
@@ -22,8 +21,8 @@ public class FilterParam {
     private String pushFrom;
     /** 漏斗:videoId -> 召回 score (可空) */
     private Map<Long, Double> scoresMap;
-    /** 漏斗:raw 输入 sink (可空,pushFrom -> List<FunnelRawItem>) */
-    private ConcurrentMap<String, List<FunnelRawItem>> funnelSink;
+    /** 漏斗上下文 (可空) */
+    private FunnelContext funnelContext;
 
     // 风险过滤
     private String regionCode;

+ 67 - 17
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java

@@ -5,7 +5,8 @@ import com.google.common.collect.Lists;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
 import com.tzld.piaoquan.recommend.server.service.filter.strategy.*;
-import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.RandomUtils;
@@ -14,8 +15,10 @@ import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -33,31 +36,36 @@ public class FilterService {
     private Map<String, Integer> filterExpConfig;
 
     public FilterResult filter(FilterParam param) {
-        recordRawToFunnelSink(param);
-        List<Long> videoIds = viewFilter(param);
+        // 阶段 1: filter 前 entries(按 funnel 开关创建并挂到 context)
+        List<RecallVideoEntry> entries = recordRecallStageToContext(param);
+        // 阶段 2: 跑 filter 策略,entries 非空时同步回写 filteredIn/indexNewAfterFilter/filterReasons
+        List<Long> videoIds = viewFilter(param, entries);
         return new FilterResult(videoIds);
     }
 
-    private void recordRawToFunnelSink(FilterParam param) {
-        if (param == null || param.getFunnelSink() == null || StringUtils.isBlank(param.getPushFrom())
+    private List<RecallVideoEntry> recordRecallStageToContext(FilterParam param) {
+        if (param == null
+                || param.getFunnelContext() == null
+                || StringUtils.isBlank(param.getPushFrom())
                 || CollectionUtils.isEmpty(param.getVideoIds())) {
-            return;
+            return null;
         }
         List<Long> ids = param.getVideoIds();
         Map<Long, Double> scoresMap = param.getScoresMap();
-        List<FunnelRawItem> rawList = new ArrayList<>(ids.size());
+        List<RecallVideoEntry> entries = new ArrayList<>(ids.size());
         for (int i = 0; i < ids.size(); i++) {
             long id = ids.get(i);
             double score = scoresMap != null ? scoresMap.getOrDefault(id, 0.0) : 0.0;
-            rawList.add(new FunnelRawItem(id, score, i));
+            entries.add(new RecallVideoEntry(id, score, i));
         }
-        param.getFunnelSink().merge(param.getPushFrom(), rawList, (oldList, newList) -> {
+        param.getFunnelContext().getStages123RecallByStrategy().merge(param.getPushFrom(), entries, (oldList, newList) -> {
             oldList.addAll(newList);
             return oldList;
         });
+        return entries;
     }
 
-    private List<Long> viewFilter(FilterParam param) {
+    private List<Long> viewFilter(FilterParam param, List<RecallVideoEntry> entries) {
 
         List<FilterStrategy> strategies = getStrategies(param);
         CountDownLatch cdl = new CountDownLatch(strategies.size());
@@ -78,24 +86,66 @@ public class FilterService {
             return Collections.emptyList();
         }
 
-        List<List<Long>> videoIds = new ArrayList<>();
-        for (Future<List<Long>> f : futures) {
+        int n = strategies.size();
+        List<Set<Long>> keeps = new ArrayList<>(n);
+        List<String> reasonKeys = new ArrayList<>(n);
+        for (int i = 0; i < n; i++) {
             try {
-                videoIds.add(f.get());
+                keeps.add(new HashSet<>(futures.get(i).get()));
             } catch (Exception e) {
                 log.error("future get error ", e);
+                keeps.add(Collections.emptySet());
             }
+            reasonKeys.add(reasonKey(strategies.get(i)));
         }
-        if (CollectionUtils.isEmpty(videoIds)) {
+        if (keeps.isEmpty()) {
             return Collections.emptyList();
         }
-        List<Long> result = Lists.newArrayList(param.getVideoIds());
-        for (int i = 0; i < videoIds.size(); ++i) {
-            result.retainAll(videoIds.get(i));
+
+        // 漏斗未开 → 旧 retainAll 行为
+        if (entries == null) {
+            List<Long> result = Lists.newArrayList(param.getVideoIds());
+            for (Set<Long> keep : keeps) {
+                result.retainAll(keep);
+            }
+            return result;
+        }
+
+        // 漏斗开了 → 同步回写 entry 阶段 2 字段
+        List<Long> result = new ArrayList<>(entries.size());
+        int newPos = 0;
+        for (RecallVideoEntry e : entries) {
+            long id = e.getVideoId();
+            List<String> reasons = null;
+            for (int i = 0; i < keeps.size(); i++) {
+                if (!keeps.get(i).contains(id)) {
+                    if (reasons == null) reasons = new ArrayList<>(2);
+                    reasons.add(reasonKeys.get(i));
+                }
+            }
+            if (reasons == null) {
+                e.setFilteredIn(true);
+                e.setIndexNewAfterFilter(newPos++);
+                result.add(id);
+            } else {
+                e.setFilteredIn(false);
+                e.setFilterReasons(reasons);
+            }
         }
         return result;
     }
 
+    private static String reasonKey(FilterStrategy s) {
+        if (s instanceof PreViewedStrategy) return "previewed";
+        if (s instanceof ViewedStrategy) return "viewed";
+        if (s instanceof RecommendStatusStrategy) return "recommend_status";
+        if (s instanceof AppletVideoStatusStrategy) return "applet_video_status";
+        if (s instanceof RiskVideoStrategy) return "risk";
+        if (s instanceof VideoSourceTypeStrategy) return "ugc";
+        if (s instanceof GeneralSpiderStrategy) return "spider";
+        return s.getClass().getSimpleName();
+    }
+
     private List<FilterStrategy> getStrategies(FilterParam param) {
         List<FilterStrategy> strategies = new ArrayList<>();
         strategies.add(ServiceBeanFactory.getBean(PreViewedStrategy.class));

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/ColdStartAction.java

@@ -0,0 +1,13 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+/**
+ * 漏斗 阶段 7 冷启替换动作。
+ * NONE     = 未被冷启替换/插入
+ * REPLACED = 该 video 被冷启视频替换掉(从结果列表里消失)
+ * INSERTED = 该 video 是冷启加入的(原本不在召回链路)
+ */
+public enum ColdStartAction {
+    NONE,
+    REPLACED,
+    INSERTED
+}

+ 225 - 110
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java

@@ -1,141 +1,256 @@
 package com.tzld.piaoquan.recommend.server.service.funnel;
 
-import com.google.common.base.Strings;
-import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-import com.tzld.piaoquan.recommend.server.model.RecommendParam;
-import com.tzld.piaoquan.recommend.server.model.Video;
-import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
-import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
-import com.tzld.piaoquan.recommend.server.util.JSONUtils;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 
-import java.util.ArrayList;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
- * 把 4 阶段(raw / filtered / ranked / preview)合并成每 (videoId, pushFrom) 一行。
- * 下游 BI: group by videoId max(各 flag) 出整体漏斗;group by pushFrom 出分策略漏斗。
+ * 把 FunnelContext 序列化成一条 SLS 日志(8 个 step JSON 字段 + filter reasons)。
  */
 public class FunnelAggregator {
 
-    private static class Row {
-        boolean inRecall;
-        boolean inFiltered;
-        double recallScore;
-        int strategyRank;
+    public static Map<String, String> toLogItem(FunnelContext ctx) {
+        Map<String, String> row = new LinkedHashMap<>();
+        if (ctx == null) return row;
+
+        // base
+        row.put("traceId", StringUtils.defaultString(ctx.getTraceId()));
+        row.put("recommendTraceId", StringUtils.defaultString(ctx.getRecommendTraceId()));
+        row.put("sessionId", StringUtils.defaultString(ctx.getSessionId()));
+        row.put("subSessionId", StringUtils.defaultString(ctx.getSubSessionId()));
+        row.put("rootSessionId", StringUtils.defaultString(ctx.getRootSessionId()));
+        row.put("mid", StringUtils.defaultString(ctx.getMid()));
+        row.put("appType", String.valueOf(ctx.getAppType()));
+        row.put("newExpGroup", StringUtils.defaultString(ctx.getNewExpGroup()));
+        row.put("abExpCode", JSON.toJSONString(ctx.getAbExpCodes()));
+
+        // step 1-3 + filter reasons
+        row.put("step_1_recall", JSON.toJSONString(buildStep1(ctx)));
+        row.put("step_2_filtered", JSON.toJSONString(buildStep2(ctx)));
+        row.put("step_2_filter_reasons", JSON.toJSONString(buildStep2Reasons(ctx)));
+        row.put("step_3_truncated", JSON.toJSONString(buildStep3(ctx)));
+
+        // step 4-8
+        row.put("step_4_merged", JSON.toJSONString(buildStep4(ctx)));
+        row.put("step_5_ranked", JSON.toJSONString(buildStep5(ctx)));
+        row.put("step_6_rank_truncated", JSON.toJSONString(buildStep6(ctx)));
+        row.put("step_7_cold_start", JSON.toJSONString(buildStep7(ctx)));
+        row.put("step_8_output", JSON.toJSONString(buildStep8(ctx)));
+
+        return row;
     }
 
-    public static List<Map<String, String>> build(
-            String traceId,
-            RecommendRequest request,
-            RecommendParam param,
-            Map<String, List<FunnelRawItem>> funnelSink,
-            RecallResult recallResult,
-            RankResult rankResult,
-            List<Video> returnedVideos) {
-
-        // rank 阶段:被 mergeAndRankRovRecall 挑中进打分的视频 id 集合
-        Set<Long> rankedSet = new HashSet<>();
-        if (rankResult != null && rankResult.getCandidateVideoIds() != null) {
-            rankedSet.addAll(rankResult.getCandidateVideoIds());
-        }
+    // ===== step 1-3: {pushFrom: [...]} =====
+    private static Map<String, JSONArray> buildStep1(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                arr.add(o);
+            }
+            out.put(pf, arr);
+        });
+        return out;
+    }
 
-        // rank 最终位置 + 最终 score (rankResult.videos 是 rank 截断前的整序列)
-        Map<Long, Integer> rankPosMap = new HashMap<>();
-        Map<Long, Double> rankScoreMap = new HashMap<>();
-        if (rankResult != null && CollectionUtils.isNotEmpty(rankResult.getVideos())) {
-            List<Video> rv = rankResult.getVideos();
-            for (int i = 0; i < rv.size(); i++) {
-                Video v = rv.get(i);
-                rankPosMap.put(v.getVideoId(), i + 1);
-                rankScoreMap.put(v.getVideoId(), v.getSortScore());
+    private static Map<String, JSONArray> buildStep2(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (!e.isFilteredIn()) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                arr.add(o);
             }
-        }
+            out.put(pf, arr);
+        });
+        return out;
+    }
+
+    private static Map<String, JSONArray> buildStep2Reasons(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (e.isFilteredIn()) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("reasons", e.getFilterReasons());
+                arr.add(o);
+            }
+            if (!arr.isEmpty()) out.put(pf, arr);
+        });
+        return out;
+    }
 
-        // preview = 截断后下发
-        Map<Long, Integer> previewPosMap = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(returnedVideos)) {
-            for (int i = 0; i < returnedVideos.size(); i++) {
-                previewPosMap.put(returnedVideos.get(i).getVideoId(), i + 1);
+    private static Map<String, JSONArray> buildStep3(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (e.getSelect() == null) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                o.put("select", e.getSelect().name().toLowerCase());
+                o.put("truncate", e.getTruncate());
+                arr.add(o);
             }
+            out.put(pf, arr);
+        });
+        return out;
+    }
+
+    // ===== step 4-8: 按 vid 组织,每条带完整 recalls 历史 =====
+    private static JSONArray buildStep4(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (Long vid : ctx.getStep4MergedVideoIds()) {
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            arr.add(o);
         }
+        return arr;
+    }
 
-        // (pushFrom -> videoId -> Row)
-        Map<String, Map<Long, Row>> table = new LinkedHashMap<>();
-
-        // raw
-        if (funnelSink != null) {
-            funnelSink.forEach((pushFrom, items) -> {
-                Map<Long, Row> byVid = table.computeIfAbsent(pushFrom, k -> new LinkedHashMap<>());
-                for (FunnelRawItem item : items) {
-                    Row r = byVid.computeIfAbsent(item.getVideoId(), k -> new Row());
-                    r.inRecall = true;
-                    r.recallScore = item.getScore();
-                    r.strategyRank = item.getStrategyRank();
-                }
-            });
+    private static JSONArray buildStep5(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (Long vid : ctx.getStep4MergedVideoIds()) {
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r == null) continue;
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            o.put("rank", buildRankBlock(r));
+            arr.add(o);
         }
+        return arr;
+    }
 
-        // filtered
-        if (recallResult != null && CollectionUtils.isNotEmpty(recallResult.getData())) {
-            for (RecallResult.RecallData d : recallResult.getData()) {
-                if (CollectionUtils.isEmpty(d.getVideos())) {
-                    continue;
-                }
-                String pushFrom = d.getPushFrom();
-                Map<Long, Row> byVid = table.computeIfAbsent(pushFrom, k -> new LinkedHashMap<>());
-                for (int i = 0; i < d.getVideos().size(); i++) {
-                    Video v = d.getVideos().get(i);
-                    Row r = byVid.computeIfAbsent(v.getVideoId(), k -> new Row());
-                    r.inFiltered = true;
-                    // 兜底:strategy 不走 FilterService 时 raw 没记,从 Video 补
-                    if (!r.inRecall) {
-                        r.recallScore = v.getRovScore();
-                        r.strategyRank = i;
-                    }
-                }
-            }
+    private static JSONArray buildStep6(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
+            long vid = ctx.getStep6RankTruncatedVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            o.put("rank_index", i + 1);
+            arr.add(o);
         }
+        return arr;
+    }
 
-        List<Map<String, String>> rows = new ArrayList<>();
-        Map<String, String> base = buildBase(traceId, request, param);
-        table.forEach((pushFrom, byVid) -> byVid.forEach((vid, r) -> {
-            Map<String, String> row = new HashMap<>(base);
-            row.put("videoId", String.valueOf(vid));
-            row.put("pushFrom", pushFrom);
-            row.put("recallScore", String.valueOf(r.recallScore));
-            row.put("strategyRank", String.valueOf(r.strategyRank));
-            row.put("inRecall", r.inRecall ? "1" : "0");
-            row.put("inFiltered", r.inFiltered ? "1" : "0");
-            row.put("inRanked", rankedSet.contains(vid) ? "1" : "0");
-            row.put("inPreview", previewPosMap.containsKey(vid) ? "1" : "0");
-            row.put("rankScore", String.valueOf(rankScoreMap.getOrDefault(vid, 0.0)));
-            row.put("rankPos", String.valueOf(rankPosMap.getOrDefault(vid, 0)));
-            row.put("previewPos", String.valueOf(previewPosMap.getOrDefault(vid, 0)));
-            rows.add(row);
-        }));
-        return rows;
+    private static JSONArray buildStep7(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        // step 7 = step 6 + if_cold_start
+        for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
+            long vid = ctx.getStep6RankTruncatedVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            o.put("rank_index", i + 1);
+            ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
+            o.put("if_cold_start", action != ColdStartAction.NONE);
+            o.put("cold_start_action", action.name());
+            arr.add(o);
+        }
+        // 冷启 INSERTED 的视频可能不在 step6RankTruncatedVideoIds 里,补
+        ctx.getStep7ColdStartActions().forEach((vid, action) -> {
+            if (action == ColdStartAction.INSERTED && !ctx.getStep6RankTruncatedVideoIds().contains(vid)) {
+                JSONObject o = new JSONObject();
+                o.put("vid", vid);
+                o.put("if_cold_start", true);
+                o.put("cold_start_action", action.name());
+                arr.add(o);
+            }
+        });
+        return arr;
     }
 
-    private static Map<String, String> buildBase(String traceId, RecommendRequest request, RecommendParam param) {
-        Map<String, String> base = new HashMap<>();
-        base.put("traceId", Strings.nullToEmpty(traceId));
-        if (request != null) {
-            base.put("recommendTraceId", Strings.nullToEmpty(request.getRecommendTraceId()));
-            base.put("sessionId", Strings.nullToEmpty(request.getSessionId()));
-            base.put("rootSessionId", Strings.nullToEmpty(request.getRootSessionId()));
-            base.put("mid", Strings.nullToEmpty(request.getMid()));
-            base.put("appType", String.valueOf(request.getAppType()));
-            base.put("newexpgroup", Strings.nullToEmpty(request.getNewExpGroup()));
+    private static JSONArray buildStep8(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (int i = 0; i < ctx.getStep8OutputVideoIds().size(); i++) {
+            long vid = ctx.getStep8OutputVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
+            o.put("if_cold_start", action != ColdStartAction.NONE);
+            o.put("rank_index", i + 1);
+            arr.add(o);
         }
-        if (param != null) {
-            base.put("abExpCode", JSONUtils.toJson(param.getAbExpCodes()));
+        return arr;
+    }
+
+    // ===== helpers =====
+    private static JSONArray buildRecallsForVid(FunnelContext ctx, long vid) {
+        JSONArray recalls = new JSONArray();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            for (RecallVideoEntry e : entries) {
+                if (e.getVideoId() != vid) continue;
+                if (e.getSelect() != SelectKind.SELF) continue;   // 只列该路实际贡献到合并的视频
+                JSONObject r = new JSONObject();
+                r.put("strategy", pf);
+                r.put("index", displayIndex(e.getIndex()));
+                r.put("score", round6(e.getScore()));
+                r.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                r.put("truncate", e.getTruncate());
+                recalls.add(r);
+                break;
+            }
+        });
+        return recalls;
+    }
+
+    private static JSONObject buildRankBlock(RankVideoEntry r) {
+        JSONObject o = new JSONObject();
+        o.put("score", round6(r.getRankScore()));
+        if (MapUtils.isNotEmpty(r.getSubScores())) {
+            r.getSubScores().forEach((k, v) -> o.put(k, round6(v)));
         }
-        return base;
+        return o;
+    }
+
+    /** double → 保留 6 位小数 */
+    private static BigDecimal round6(double v) {
+        return BigDecimal.valueOf(v).setScale(6, RoundingMode.HALF_UP);
+    }
+
+    /** 内部 0-based 位置 → 对外展示 1-based。负值(如 -1 表"未通过")原样返回 */
+    private static int displayIndex(int zeroBased) {
+        return zeroBased < 0 ? zeroBased : zeroBased + 1;
     }
 }

+ 54 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelContext.java

@@ -0,0 +1,54 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * 单次推荐请求的漏斗中间态容器,挂在 RecallParam + RankParam 上跨服务传递。
+ * 末尾由 FunnelAggregator 序列化成一条 SLS 日志(8 个 step JSON 字段)。
+ *
+ * 并发安全说明:
+ * - stages123RecallByStrategy 被 30+ 召回 strategy 子线程并发写 → ConcurrentMap
+ * - 同 pushFrom 的 List 由 FilterService 单次写入,不会并发追加
+ * - 阶段 4-8 由主线程顺序写
+ */
+@Data
+public class FunnelContext {
+    // ===== base =====
+    private String traceId;
+    private String recommendTraceId;
+    private String sessionId;
+    private String subSessionId;
+    private String rootSessionId;
+    private String mid;
+    private int    appType;
+    private String newExpGroup;
+    private Set<String> abExpCodes;
+
+    /** Step 1-3: 召回 / 过滤 / 截断(per pushFrom,字段在 RecallVideoEntry 上分组) */
+    private ConcurrentMap<String, List<RecallVideoEntry>> stages123RecallByStrategy = new ConcurrentHashMap<>();
+
+    /** Step 4: 合并去重后顺序 */
+    private List<Long>         step4MergedVideoIds = new ArrayList<>();
+    /** Step 4: vid -> 合并时归因到哪条 pushFrom */
+    private Map<Long, String>  step4MergedAttribution = new HashMap<>();
+
+    /** Step 5: rank 模型打分结果 */
+    private Map<Long, RankVideoEntry> step5RankedData = new HashMap<>();
+
+    /** Step 6: 排序截断后顺序(rank_index = list 中位置 + 1) */
+    private List<Long>         step6RankTruncatedVideoIds = new ArrayList<>();
+
+    /** Step 7: 冷启替换标记 */
+    private Map<Long, ColdStartAction> step7ColdStartActions = new HashMap<>();
+
+    /** Step 8: 最终下发顺序(output_index = list 中位置 + 1) */
+    private List<Long>         step8OutputVideoIds = new ArrayList<>();
+}

+ 19 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogConfig.java

@@ -0,0 +1,19 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * 漏斗日志配置(Apollo key: funnel.log.config,JSON 整体配置)。
+ */
+@Data
+public class FunnelLogConfig {
+    /** 总开关 */
+    private boolean enabled = false;
+    /** 采样率 0-100。0=全关,100=全开。按 mid hash 决定单请求是否记录 */
+    private int sampleRate = 0;
+    /** 生效 appType 白名单。空集 = 不限制 */
+    private Set<Integer> appTypes = Collections.emptySet();
+}

+ 8 - 15
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java

@@ -6,17 +6,15 @@ import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
 import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
 import com.aliyun.openservices.log.common.LogItem;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
- * 推荐漏斗日志 (video × pushFrom 粒度,4 阶段 flag)
+ * 推荐漏斗日志:一次请求一条记录,包含 8 个 step JSON 字段
  */
 @Service
 @Slf4j
@@ -42,22 +40,17 @@ public class FunnelLogService {
         producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
     }
 
-    public void log(List<Map<String, String>> data) {
-        if (CollectionUtils.isEmpty(data)) {
-            return;
-        }
+    public void log(Map<String, String> row) {
+        if (MapUtils.isEmpty(row)) return;
         try {
-            List<LogItem> items = data.stream().map(d -> {
-                LogItem logItem = new LogItem();
-                d.entrySet().forEach(e -> logItem.PushBack(e.getKey(), e.getValue()));
-                return logItem;
-            }).collect(Collectors.toList());
-            producer.send(project, logStore, items);
+            LogItem logItem = new LogItem();
+            row.forEach(logItem::PushBack);
+            producer.send(project, logStore, logItem);
         } catch (InterruptedException e) {
             log.warn("funnel log interrupted");
             Thread.currentThread().interrupt();
         } catch (Exception e) {
-            log.error("Failed to send funnel logs", e);
+            log.error("Failed to send funnel log", e);
         }
     }
 }

+ 0 - 18
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelRawItem.java

@@ -1,18 +0,0 @@
-package com.tzld.piaoquan.recommend.server.service.funnel;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * 漏斗 raw 阶段(filter 前)单条记录。
- * strategyRank 为该 video 在所属召回策略内部的位置(0-based,按策略原始排序)。
- */
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class FunnelRawItem {
-    private long videoId;
-    private double score;
-    private int strategyRank;
-}

+ 18 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RankVideoEntry.java

@@ -0,0 +1,18 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/**
+ * 漏斗 阶段 5 单条记录(rank 模型打分后)。
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RankVideoEntry {
+    private double rankScore;
+    private Map<String, Double> subScores;  // 预留:rank 各分项 / 中间特征
+}

+ 40 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RecallVideoEntry.java

@@ -0,0 +1,40 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 漏斗 阶段 1-3 单条记录(per pushFrom × videoId)。
+ * - 阶段 1 召回: index / score
+ * - 阶段 2 过滤: filteredIn / indexNewAfterFilter / filterReasons
+ * - 阶段 3 截断: select / truncate
+ */
+@Data
+@NoArgsConstructor
+public class RecallVideoEntry {
+    private long videoId;
+    private double score;
+    private int index;                  // 召回原始位置 (0-based)
+
+    private boolean filteredIn;
+    private int indexNewAfterFilter = -1;
+    private List<String> filterReasons;  // filteredIn=false 时被哪些 filter 干掉
+
+    /**
+     * 阶段 3 截断标记:
+     * - SELF  = 该 vid 在该路 top-recallNum 名次内 + 被该路实际贡献给合并列表
+     * - OTHER = 在该路 top-recallNum 名次内但被前面 pushFrom 抢走
+     * - null  = 未参与截断或在 top-N 之外
+     */
+    private SelectKind select;
+    /** 该路 recallNum 配额 (truncate) */
+    private int truncate;
+
+    public RecallVideoEntry(long videoId, double score, int index) {
+        this.videoId = videoId;
+        this.score = score;
+        this.index = index;
+    }
+}

+ 10 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/SelectKind.java

@@ -0,0 +1,10 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+/**
+ * 阶段 3 截断:该 vid 在该 pushFrom 的 top-recallNum 名次内被该路实际选中(SELF)
+ * 还是被前面其他 pushFrom 抢走(OTHER)。
+ */
+public enum SelectKind {
+    SELF,
+    OTHER
+}

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.recommend.server.service.rank;
 
 import com.tzld.piaoquan.recommend.server.model.MachineInfo;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
@@ -16,6 +17,8 @@ import java.util.Set;
 @Data
 public class RankParam {
     private RecallResult recallResult;
+    /** 漏斗上下文 (可空) */
+    private FunnelContext funnelContext;
     private int size;
     private int topK;
     private String rankKeyPrefix;

+ 28 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -6,6 +6,8 @@ import com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum;
 import com.tzld.piaoquan.recommend.server.model.RankVideoInfo;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RankVideoEntry;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import lombok.extern.slf4j.Slf4j;
@@ -41,8 +43,10 @@ public abstract class RankService {
         if (2 == param.getRecommendType()) {
             tagDuplicateVideos(param);
             List<Video> rovRecallRank = mergeAndRankRovRecall(param);
+            writeFunnelMergedStage(param, rovRecallRank);
             RankResult result = new RankResult(rovRecallRank);
             result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+            writeFunnelRankTruncatedStage(param, result.getVideos());
             return result;
         }
 
@@ -92,6 +96,7 @@ public abstract class RankService {
 
         // 2 正常走分发 排序+冷启动
         List<Video> rovRecallRank = mergeAndRankRovRecall(param);
+        writeFunnelMergedStage(param, rovRecallRank);
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);
 
         List<Video> douHotFlowPoolRank = extractAndSort(param, DouHotFlowPoolRecallStrategy.PUSH_FROM);
@@ -102,10 +107,33 @@ public abstract class RankService {
         RankResult result = mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
         if (result != null) {
             result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+            writeFunnelRankTruncatedStage(param, result.getVideos());
         }
         return result;
     }
 
+    /** 阶段 4 + 阶段 5: 合并 + 排序打分 — rovRecallRank 顺序即合并顺序,Video.sortScore 是 rank 模型最终分 */
+    private static void writeFunnelMergedStage(RankParam param, List<Video> rovRecallRank) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rovRecallRank)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : rovRecallRank) {
+            ctx.getStep4MergedVideoIds().add(v.getVideoId());
+            if (v.getPushFrom() != null) {
+                ctx.getStep4MergedAttribution().put(v.getVideoId(), v.getPushFrom());
+            }
+            ctx.getStep5RankedData().put(v.getVideoId(), new RankVideoEntry(v.getSortScore(), null));
+        }
+    }
+
+    /** 阶段 6: 排序截断 — rankResult.videos 顺序 = rank_index */
+    private static void writeFunnelRankTruncatedStage(RankParam param, List<Video> rankedVideos) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rankedVideos)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : rankedVideos) {
+            ctx.getStep6RankTruncatedVideoIds().add(v.getVideoId());
+        }
+    }
+
     private static List<Long> toCandidateIds(List<Video> videos) {
         if (CollectionUtils.isEmpty(videos)) {
             return Collections.emptyList();

+ 18 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelBasic.java

@@ -6,6 +6,8 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.reflect.TypeToken;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.funnel.ColdStartAction;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.RankService;
@@ -60,6 +62,12 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
 
     String CLASS_NAME = this.getClass().getSimpleName();
 
+    /** 阶段 7: 冷启替换 — 来自 flowVideos / douHotFlowPoolVideos 的视频标记为 INSERTED */
+    private static void markColdStartInserted(FunnelContext ctx, Video v) {
+        if (ctx == null || v == null) return;
+        ctx.getStep7ColdStartActions().put(v.getVideoId(), ColdStartAction.INSERTED);
+    }
+
     public void duplicate(Set<Long> setVideo, List<Video> videos) {
         Iterator<Video> iterator = videos.iterator();
         while (iterator.hasNext()) {
@@ -135,6 +143,7 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         RankProcessorBoost.boostByFestive(param, rovVideos, rankReduceByFestiveConfig);
 
         // 7 流量池按比例强插
+        FunnelContext funnelCtx = param.getFunnelContext();
         List<Video> result = new ArrayList<>();
         for (int i = 0; i < param.getTopK() && i < rovVideos.size(); i++) {
             result.add(rovVideos.get(i));
@@ -146,13 +155,17 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
             double rand = RandomUtils.nextDouble(0, 1);
             if (rand < flowPoolP) {
                 if (flowPoolIndex < flowVideos.size()) {
-                    result.add(flowVideos.get(flowPoolIndex++));
+                    Video v = flowVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
                 } else {
                     break;
                 }
             } else if (this.isInsertDouHotFlowPoolVideo()) {
                 if (flowPoolIndex < douHotFlowPoolVideos.size()) {
-                    result.add(douHotFlowPoolVideos.get(flowPoolIndex++));
+                    Video v = douHotFlowPoolVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
                 } else {
                     break;
                 }
@@ -166,7 +179,9 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         }
         if (rovPoolIndex >= rovVideos.size()) {
             for (int i = flowPoolIndex; i < flowVideos.size() && result.size() < param.getSize(); i++) {
-                result.add(flowVideos.get(i));
+                Video v = flowVideos.get(i);
+                result.add(v);
+                markColdStartInserted(funnelCtx, v);
             }
         }
         if (flowPoolIndex >= flowVideos.size()) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java

@@ -37,7 +37,7 @@ public class FilterParamFactory {
 
         filterParam.setPushFrom(pushFrom);
         filterParam.setScoresMap(scoresMap);
-        filterParam.setFunnelSink(param.getFunnelSink());
+        filterParam.setFunnelContext(param.getFunnelContext());
         return filterParam;
     }
 }

+ 3 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -1,6 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
-import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import lombok.Data;
@@ -8,15 +8,14 @@ import lombok.Data;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author dyp
  */
 @Data
 public class RecallParam {
-    /** 漏斗 raw sink (可空):每路召回策略在 filter 前把 (id, score, strategyRank) 写入 sink[pushFrom] */
-    private ConcurrentMap<String, List<FunnelRawItem>> funnelSink;
+    /** 漏斗上下文 (可空):填了才记漏斗 */
+    private FunnelContext funnelContext;
 
     private String regionCode;
     private String cityCode;

+ 0 - 7
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java

@@ -1,7 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
 import com.tzld.piaoquan.recommend.server.model.Video;
-import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -27,12 +26,6 @@ public class RecallResult {
     public static class RecallData {
         private String pushFrom;
         private List<Video> videos;
-        private List<FunnelRawItem> rawList;
-
-        public RecallData(String pushFrom, List<Video> videos) {
-            this.pushFrom = pushFrom;
-            this.videos = videos;
-        }
     }
 
     public List<Video> mergeRecallVideos() {

+ 25 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -5,6 +5,8 @@ import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.ExperimentService;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -84,9 +86,32 @@ public class RecallService implements ApplicationContextAware {
             }
         }
 
+        syncScoresToFunnel(param.getFunnelContext(), results);
         return new RecallResult(results);
     }
 
+    /**
+     * 漏斗:把 filter 后 Video.rovScore 同步回 funnel entry。
+     * 部分 strategy 在 filter 前没传 scoresMap (entry.score 默认 0),
+     * filter 后 strategy 才把 rovScore set 到 Video 上。这里统一回写。
+     */
+    private static void syncScoresToFunnel(FunnelContext ctx, List<RecallResult.RecallData> results) {
+        if (ctx == null || results == null) return;
+        for (RecallResult.RecallData data : results) {
+            if (data == null || data.getVideos() == null) continue;
+            List<RecallVideoEntry> entries = ctx.getStages123RecallByStrategy().get(data.getPushFrom());
+            if (entries == null) continue;
+            java.util.Map<Long, Double> videoScores = new java.util.HashMap<>(data.getVideos().size());
+            for (Video v : data.getVideos()) {
+                videoScores.put(v.getVideoId(), v.getRovScore());
+            }
+            for (RecallVideoEntry e : entries) {
+                Double s = videoScores.get(e.getVideoId());
+                if (s != null) e.setScore(s);   // 覆盖:filter 通过的 entry 用 Video.rovScore(最终 score)
+            }
+        }
+    }
+
     private List<RecallStrategy> getRecallStrategy(RecallParam param) {
         List<RecallStrategy> strategies = new ArrayList<>();
 

+ 71 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -1,6 +1,9 @@
 package com.tzld.piaoquan.recommend.server.util;
 
 import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
+import com.tzld.piaoquan.recommend.server.service.funnel.SelectKind;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
@@ -60,15 +63,78 @@ public class RecallUtils {
 
         rovRecallRank.addAll(v0);
         setVideo.addAll(v0.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+        markSelfMixed(param, v0, sizeReturn);
     }
 
     public static void extractRecall(int recallNum, RankParam param, String pushFrom, Set<Long> setVideo, List<Video> rovRecallRank) {
         if (recallNum > 0) {
-            List<Video> list = extractAndSort(param, pushFrom);
-            list = list.stream().filter(r -> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
-            list = list.subList(0, Math.min(recallNum, list.size()));
-            rovRecallRank.addAll(list);
-            setVideo.addAll(list.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+            List<Video> rawFiltered = extractAndSort(param, pushFrom);
+            List<Video> dedupedList = rawFiltered.stream()
+                    .filter(r -> !setVideo.contains(r.getVideoId()))
+                    .collect(Collectors.toList());
+            List<Video> selfList = dedupedList.subList(0, Math.min(recallNum, dedupedList.size()));
+            rovRecallRank.addAll(selfList);
+            setVideo.addAll(selfList.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+            markStep3Select(param, pushFrom, recallNum, rawFiltered, selfList);
+        }
+    }
+
+    /**
+     * 阶段 3 截断标记 (单 pushFrom)。业务行为不变:
+     * - SELF  = selfList 全集(业务实际贡献给合并的视频)
+     * - OTHER = rawFiltered 前 recallNum 名次内但不在 selfList 中(即被前面 pushFrom 抢走的)
+     */
+    private static void markStep3Select(RankParam param, String pushFrom, int recallNum,
+                                        List<Video> rawFiltered, List<Video> selfList) {
+        if (param == null || param.getFunnelContext() == null
+                || StringUtils.isBlank(pushFrom)) {
+            return;
+        }
+        List<RecallVideoEntry> entries = param.getFunnelContext().getStages123RecallByStrategy().get(pushFrom);
+        if (CollectionUtils.isEmpty(entries)) return;
+        Map<Long, RecallVideoEntry> byVid = new HashMap<>(entries.size());
+        for (RecallVideoEntry e : entries) byVid.put(e.getVideoId(), e);
+
+        Set<Long> selfIds = new HashSet<>();
+        for (Video v : selfList) selfIds.add(v.getVideoId());
+
+        // SELF
+        for (Video v : selfList) {
+            RecallVideoEntry e = byVid.get(v.getVideoId());
+            if (e != null) {
+                e.setSelect(SelectKind.SELF);
+                e.setTruncate(recallNum);
+            }
+        }
+        // OTHER: rawFiltered 前 recallNum 个里不在 selfList 中
+        int topN = Math.min(recallNum, rawFiltered.size());
+        for (int i = 0; i < topN; i++) {
+            Video v = rawFiltered.get(i);
+            if (selfIds.contains(v.getVideoId())) continue;
+            RecallVideoEntry e = byVid.get(v.getVideoId());
+            if (e != null && e.getSelect() == null) {
+                e.setSelect(SelectKind.OTHER);
+                e.setTruncate(recallNum);
+            }
+        }
+    }
+
+    /** 阶段 3 截断: 多路混合 (extractOldSpecialRecall) — 按 Video.pushFrom 分组归因 SELF。OTHER 这里不计算(多路合一截断的归因复杂,先 skip) */
+    private static void markSelfMixed(RankParam param, List<Video> list, int truncate) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(list)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : list) {
+            String pf = v.getPushFrom();
+            if (StringUtils.isBlank(pf)) continue;
+            List<RecallVideoEntry> entries = ctx.getStages123RecallByStrategy().get(pf);
+            if (CollectionUtils.isEmpty(entries)) continue;
+            for (RecallVideoEntry e : entries) {
+                if (e.getVideoId() == v.getVideoId()) {
+                    e.setSelect(SelectKind.SELF);
+                    e.setTruncate(truncate);
+                    break;
+                }
+            }
         }
     }