Bladeren bron

Merge branch 'feature_20260525_video_funnel_log' of algorithm/recommend-server into master

yangxiaohui 1 week geleden
bovenliggende
commit
77b75bf175
56 gewijzigde bestanden met toevoegingen van 843 en 62 verwijderingen
  1. 63 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  2. 8 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java
  3. 81 9
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java
  4. 13 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/ColdStartAction.java
  5. 256 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java
  6. 54 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelContext.java
  7. 19 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogConfig.java
  8. 56 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java
  9. 18 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RankVideoEntry.java
  10. 40 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RecallVideoEntry.java
  11. 10 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/SelectKind.java
  12. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java
  13. 5 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java
  14. 47 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java
  15. 18 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelBasic.java
  16. 13 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java
  17. 4 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java
  18. 25 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java
  19. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java
  20. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRedisRecallStrategy.java
  21. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java
  22. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java
  23. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AppFallbackRecallStrategy.java
  24. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/BlessRecallStrategy.java
  25. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerHeadRovnRecallStrategy.java
  26. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerRovnRecallStrategy.java
  27. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnAllRovRecallStrategy.java
  28. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnRecallStrategy.java
  29. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java
  30. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomson.java
  31. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate1STRRecallStrategy.java
  32. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2AndChannelRovRecallStrategy.java
  33. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2RovRecallStrategy.java
  34. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2STRRecallStrategy.java
  35. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate1RecallStrategy.java
  36. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate2RecallStrategy.java
  37. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HotReturnUvRecallStrategy.java
  38. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRosRecallStrategy.java
  39. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRovnRecallStrategy.java
  40. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceStrRecallStrategy.java
  41. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceRovnRecallStrategy.java
  42. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceSTRRecallStrategy.java
  43. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyROS.java
  44. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1.java
  45. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1AllRov.java
  46. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV7LongTermV1.java
  47. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRosnRecallStrategy.java
  48. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRovnRecallStrategy.java
  49. 3 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SocialI2IBasicRecallStrategy.java
  50. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate1RecallStrategy.java
  51. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate2RecallStrategy.java
  52. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserDeconstructionKeywordsRecallStrategy.java
  53. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearReturnCate2RecallStrategy.java
  54. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate1RecallStrategy.java
  55. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate2RecallStrategy.java
  56. 71 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

+ 63 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -24,6 +24,10 @@ import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolService;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.RankRouter;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelAggregator;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelLogConfig;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelLogService;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
@@ -81,6 +85,8 @@ public class RecommendService {
     @Autowired
     private TimerLogService timerLogService;
     @Autowired
+    private FunnelLogService funnelLogService;
+    @Autowired
     private FeatureService featureService;
 
     @Autowired
@@ -97,6 +103,10 @@ public class RecommendService {
     @ApolloJsonValue("${none.user.risk.exclude.apptype:[]}")
     private Set<Integer> noneUserRiskExcludeAppTypes;
 
+    /** 漏斗日志配置 (Apollo 热调,JSON: {enabled, sampleRate, appTypes}) */
+    @ApolloJsonValue("${funnel.log.config:{}}")
+    private FunnelLogConfig funnelLogConfig = new FunnelLogConfig();
+
     @ApolloJsonValue("${testing.risk.province:[]}")
     private Set<String> testingRiskProvince;
     @ApolloJsonValue("${testing.risk.city:[]}")
@@ -187,7 +197,7 @@ public class RecommendService {
             timerLogMapTL.get().put("genRecommendParamTime", genRecommendParamTime);
             // log.info("genRecommendParam={},genRecommendParam cost={}", JSONUtils.toJson(param),
             //         genRecommendParamTime);
-            List<Video> videos = videoRecommend(param);
+            List<Video> videos = videoRecommend(param, request);
             stopwatch.reset().start();
             updateCache(request, param, videos);
             long updateCacheTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -525,7 +535,7 @@ public class RecommendService {
         return null;
     }
 
-    private List<Video> videoRecommend(RecommendParam param) {
+    private List<Video> videoRecommend(RecommendParam param, RecommendRequest request) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         String vid = String.valueOf(param.getVideoId());
         UserShareReturnProfile userProfile = featureService.getUserProfile(param.getUid(), param.getMid());
@@ -574,6 +584,8 @@ public class RecommendService {
         recallParam.setUserSocialRecallInfo(userSocialRecallInfo);
         recallParam.setUserNetworkSeqFeature(userNetworkSeqFeature);
         recallParam.setUserNetworkSeqVideoInfoMap(userNetworkSeqVideoInfoMap);
+        FunnelContext funnelContext = shouldSampleFunnel(request) ? buildFunnelContext(request, param) : null;
+        recallParam.setFunnelContext(funnelContext);
         RecallResult recallResult = recallService.recall(recallParam);
 
         long recallTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -586,6 +598,7 @@ public class RecommendService {
         rankParam.setHeadInfo(headVideoInfo);
         rankParam.setUserRTShareList(param.getUserRTShareList());
         rankParam.setBehaviorVideos(behaviorVideos);
+        rankParam.setFunnelContext(funnelContext);
         RankResult rankResult = rankRouter.rank(rankParam);
 
         long rankTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -594,6 +607,7 @@ public class RecommendService {
 
 
         if (rankResult == null || CollectionUtils.isEmpty(rankResult.getVideos())) {
+            if (funnelContext != null) logFunnel(funnelContext, Collections.emptyList());
             return Collections.emptyList();
         }
 
@@ -602,9 +616,56 @@ public class RecommendService {
         if (param.getSize() < rankResult.getVideos().size()) {
             videos = rankResult.getVideos().subList(0, param.getSize());
         }
+        if (funnelContext != null) logFunnel(funnelContext, videos);
         return videos;
     }
 
+    private boolean shouldSampleFunnel(RecommendRequest request) {
+        if (request == null || funnelLogConfig == null || !funnelLogConfig.isEnabled()) return false;
+        int rate = funnelLogConfig.getSampleRate();
+        if (rate <= 0) return false;
+        Set<Integer> appTypes = funnelLogConfig.getAppTypes();
+        if (CollectionUtils.isNotEmpty(appTypes) && !appTypes.contains(request.getAppType())) {
+            return false;
+        }
+        if (rate >= 100) return true;
+        String mid = StringUtils.defaultString(request.getMid());
+        int hash = (mid.hashCode() & 0x7FFFFFFF) % 100;
+        return hash < rate;
+    }
+
+    private FunnelContext buildFunnelContext(RecommendRequest request, RecommendParam param) {
+        FunnelContext ctx = new FunnelContext();
+        ctx.setTraceId(String.valueOf(TraceUtils.currentTraceId()));
+        if (request != null) {
+            ctx.setRecommendTraceId(Strings.nullToEmpty(request.getRecommendTraceId()));
+            ctx.setSessionId(Strings.nullToEmpty(request.getSessionId()));
+            ctx.setSubSessionId(Strings.nullToEmpty(request.getSubSessionId()));
+            ctx.setRootSessionId(Strings.nullToEmpty(request.getRootSessionId()));
+            ctx.setMid(Strings.nullToEmpty(request.getMid()));
+            ctx.setAppType(request.getAppType());
+            ctx.setNewExpGroup(Strings.nullToEmpty(request.getNewExpGroup()));
+        }
+        if (param != null) {
+            ctx.setAbExpCodes(param.getAbExpCodes());
+        }
+        return ctx;
+    }
+
+    private void logFunnel(FunnelContext ctx, List<Video> returnedVideos) {
+        try {
+            if (CollectionUtils.isNotEmpty(returnedVideos)) {
+                for (Video v : returnedVideos) {
+                    ctx.getStep8OutputVideoIds().add(v.getVideoId());
+                }
+            }
+            Map<String, String> row = FunnelAggregator.toLogItem(ctx);
+            funnelLogService.log(row);
+        } catch (Exception e) {
+            log.error("logFunnel error", e);
+        }
+    }
+
     public RecallParam convertToRecallParam(RecommendParam param) {
         RecallParam recallParam = new RecallParam();
         recallParam.setAppType(param.getAppType());

+ 8 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.filter;
 
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import lombok.Data;
 
 import java.util.List;
@@ -16,6 +17,13 @@ public class FilterParam {
     private String mid;
     private String uid;
 
+    /** 漏斗:调用方所属召回策略 pushFrom (可空,仅漏斗记录用) */
+    private String pushFrom;
+    /** 漏斗:videoId -> 召回 score (可空) */
+    private Map<Long, Double> scoresMap;
+    /** 漏斗上下文 (可空) */
+    private FunnelContext funnelContext;
+
     // 风险过滤
     private String regionCode;
     private String cityCode;

+ 81 - 9
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java

@@ -5,15 +5,20 @@ import com.google.common.collect.Lists;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
 import com.tzld.piaoquan.recommend.server.service.filter.strategy.*;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -31,11 +36,36 @@ public class FilterService {
     private Map<String, Integer> filterExpConfig;
 
     public FilterResult filter(FilterParam param) {
-        List<Long> videoIds = viewFilter(param);
+        // 阶段 1: filter 前 entries(按 funnel 开关创建并挂到 context)
+        List<RecallVideoEntry> entries = recordRecallStageToContext(param);
+        // 阶段 2: 跑 filter 策略,entries 非空时同步回写 filteredIn/indexNewAfterFilter/filterReasons
+        List<Long> videoIds = viewFilter(param, entries);
         return new FilterResult(videoIds);
     }
 
-    private List<Long> viewFilter(FilterParam param) {
+    private List<RecallVideoEntry> recordRecallStageToContext(FilterParam param) {
+        if (param == null
+                || param.getFunnelContext() == null
+                || StringUtils.isBlank(param.getPushFrom())
+                || CollectionUtils.isEmpty(param.getVideoIds())) {
+            return null;
+        }
+        List<Long> ids = param.getVideoIds();
+        Map<Long, Double> scoresMap = param.getScoresMap();
+        List<RecallVideoEntry> entries = new ArrayList<>(ids.size());
+        for (int i = 0; i < ids.size(); i++) {
+            long id = ids.get(i);
+            double score = scoresMap != null ? scoresMap.getOrDefault(id, 0.0) : 0.0;
+            entries.add(new RecallVideoEntry(id, score, i));
+        }
+        param.getFunnelContext().getStages123RecallByStrategy().merge(param.getPushFrom(), entries, (oldList, newList) -> {
+            oldList.addAll(newList);
+            return oldList;
+        });
+        return entries;
+    }
+
+    private List<Long> viewFilter(FilterParam param, List<RecallVideoEntry> entries) {
 
         List<FilterStrategy> strategies = getStrategies(param);
         CountDownLatch cdl = new CountDownLatch(strategies.size());
@@ -56,24 +86,66 @@ public class FilterService {
             return Collections.emptyList();
         }
 
-        List<List<Long>> videoIds = new ArrayList<>();
-        for (Future<List<Long>> f : futures) {
+        int n = strategies.size();
+        List<Set<Long>> keeps = new ArrayList<>(n);
+        List<String> reasonKeys = new ArrayList<>(n);
+        for (int i = 0; i < n; i++) {
             try {
-                videoIds.add(f.get());
+                keeps.add(new HashSet<>(futures.get(i).get()));
             } catch (Exception e) {
                 log.error("future get error ", e);
+                keeps.add(Collections.emptySet());
             }
+            reasonKeys.add(reasonKey(strategies.get(i)));
         }
-        if (CollectionUtils.isEmpty(videoIds)) {
+        if (keeps.isEmpty()) {
             return Collections.emptyList();
         }
-        List<Long> result = Lists.newArrayList(param.getVideoIds());
-        for (int i = 0; i < videoIds.size(); ++i) {
-            result.retainAll(videoIds.get(i));
+
+        // 漏斗未开 → 旧 retainAll 行为
+        if (entries == null) {
+            List<Long> result = Lists.newArrayList(param.getVideoIds());
+            for (Set<Long> keep : keeps) {
+                result.retainAll(keep);
+            }
+            return result;
+        }
+
+        // 漏斗开了 → 同步回写 entry 阶段 2 字段
+        List<Long> result = new ArrayList<>(entries.size());
+        int newPos = 0;
+        for (RecallVideoEntry e : entries) {
+            long id = e.getVideoId();
+            List<String> reasons = null;
+            for (int i = 0; i < keeps.size(); i++) {
+                if (!keeps.get(i).contains(id)) {
+                    if (reasons == null) reasons = new ArrayList<>(2);
+                    reasons.add(reasonKeys.get(i));
+                }
+            }
+            if (reasons == null) {
+                e.setFilteredIn(true);
+                e.setIndexNewAfterFilter(newPos++);
+                result.add(id);
+            } else {
+                e.setFilteredIn(false);
+                e.setFilterReasons(reasons);
+            }
         }
         return result;
     }
 
+    private static String reasonKey(FilterStrategy s) {
+        if (s instanceof PreViewedStrategy) return "previewed";
+        if (s instanceof ViewedStrategy) return "viewed";
+        if (s instanceof RecommendStatusStrategy) return "recommend_status";
+        if (s instanceof AppletVideoStatusStrategy) return "applet_video_status";
+        if (s instanceof RiskVideoStrategy) return "risk";
+        if (s instanceof VideoSourceTypeStrategy) return "ugc";
+        if (s instanceof GeneralSpiderStrategy) return "spider";
+        return s.getClass().getSimpleName();
+    }
+
     private List<FilterStrategy> getStrategies(FilterParam param) {
         List<FilterStrategy> strategies = new ArrayList<>();
         strategies.add(ServiceBeanFactory.getBean(PreViewedStrategy.class));

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/ColdStartAction.java

@@ -0,0 +1,13 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+/**
+ * 漏斗 阶段 7 冷启替换动作。
+ * NONE     = 未被冷启替换/插入
+ * REPLACED = 该 video 被冷启视频替换掉(从结果列表里消失)
+ * INSERTED = 该 video 是冷启加入的(原本不在召回链路)
+ */
+public enum ColdStartAction {
+    NONE,
+    REPLACED,
+    INSERTED
+}

+ 256 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java

@@ -0,0 +1,256 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 把 FunnelContext 序列化成一条 SLS 日志(8 个 step JSON 字段 + filter reasons)。
+ */
+public class FunnelAggregator {
+
+    public static Map<String, String> toLogItem(FunnelContext ctx) {
+        Map<String, String> row = new LinkedHashMap<>();
+        if (ctx == null) return row;
+
+        // base
+        row.put("traceId", StringUtils.defaultString(ctx.getTraceId()));
+        row.put("recommendTraceId", StringUtils.defaultString(ctx.getRecommendTraceId()));
+        row.put("sessionId", StringUtils.defaultString(ctx.getSessionId()));
+        row.put("subSessionId", StringUtils.defaultString(ctx.getSubSessionId()));
+        row.put("rootSessionId", StringUtils.defaultString(ctx.getRootSessionId()));
+        row.put("mid", StringUtils.defaultString(ctx.getMid()));
+        row.put("appType", String.valueOf(ctx.getAppType()));
+        row.put("newExpGroup", StringUtils.defaultString(ctx.getNewExpGroup()));
+        row.put("abExpCode", JSON.toJSONString(ctx.getAbExpCodes()));
+
+        // step 1-3 + filter reasons
+        row.put("step_1_recall", JSON.toJSONString(buildStep1(ctx)));
+        row.put("step_2_filtered", JSON.toJSONString(buildStep2(ctx)));
+        row.put("step_2_filter_reasons", JSON.toJSONString(buildStep2Reasons(ctx)));
+        row.put("step_3_truncated", JSON.toJSONString(buildStep3(ctx)));
+
+        // step 4-8
+        row.put("step_4_merged", JSON.toJSONString(buildStep4(ctx)));
+        row.put("step_5_ranked", JSON.toJSONString(buildStep5(ctx)));
+        row.put("step_6_rank_truncated", JSON.toJSONString(buildStep6(ctx)));
+        row.put("step_7_cold_start", JSON.toJSONString(buildStep7(ctx)));
+        row.put("step_8_output", JSON.toJSONString(buildStep8(ctx)));
+
+        return row;
+    }
+
+    // ===== step 1-3: {pushFrom: [...]} =====
+    private static Map<String, JSONArray> buildStep1(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                arr.add(o);
+            }
+            out.put(pf, arr);
+        });
+        return out;
+    }
+
+    private static Map<String, JSONArray> buildStep2(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (!e.isFilteredIn()) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                arr.add(o);
+            }
+            out.put(pf, arr);
+        });
+        return out;
+    }
+
+    private static Map<String, JSONArray> buildStep2Reasons(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (e.isFilteredIn()) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("reasons", e.getFilterReasons());
+                arr.add(o);
+            }
+            if (!arr.isEmpty()) out.put(pf, arr);
+        });
+        return out;
+    }
+
+    private static Map<String, JSONArray> buildStep3(FunnelContext ctx) {
+        Map<String, JSONArray> out = new LinkedHashMap<>();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            JSONArray arr = new JSONArray();
+            for (RecallVideoEntry e : entries) {
+                if (e.getSelect() == null) continue;
+                JSONObject o = new JSONObject();
+                o.put("vid", e.getVideoId());
+                o.put("index", displayIndex(e.getIndex()));
+                o.put("score", round6(e.getScore()));
+                o.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                o.put("select", e.getSelect().name().toLowerCase());
+                o.put("truncate", e.getTruncate());
+                arr.add(o);
+            }
+            out.put(pf, arr);
+        });
+        return out;
+    }
+
+    // ===== step 4-8: 按 vid 组织,每条带完整 recalls 历史 =====
+    private static JSONArray buildStep4(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (Long vid : ctx.getStep4MergedVideoIds()) {
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            arr.add(o);
+        }
+        return arr;
+    }
+
+    private static JSONArray buildStep5(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (Long vid : ctx.getStep4MergedVideoIds()) {
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r == null) continue;
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            o.put("rank", buildRankBlock(r));
+            arr.add(o);
+        }
+        return arr;
+    }
+
+    private static JSONArray buildStep6(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
+            long vid = ctx.getStep6RankTruncatedVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            o.put("rank_index", i + 1);
+            arr.add(o);
+        }
+        return arr;
+    }
+
+    private static JSONArray buildStep7(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        // step 7 = step 6 + if_cold_start
+        for (int i = 0; i < ctx.getStep6RankTruncatedVideoIds().size(); i++) {
+            long vid = ctx.getStep6RankTruncatedVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            o.put("rank_index", i + 1);
+            ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
+            o.put("if_cold_start", action != ColdStartAction.NONE);
+            o.put("cold_start_action", action.name());
+            arr.add(o);
+        }
+        // 冷启 INSERTED 的视频可能不在 step6RankTruncatedVideoIds 里,补
+        ctx.getStep7ColdStartActions().forEach((vid, action) -> {
+            if (action == ColdStartAction.INSERTED && !ctx.getStep6RankTruncatedVideoIds().contains(vid)) {
+                JSONObject o = new JSONObject();
+                o.put("vid", vid);
+                o.put("if_cold_start", true);
+                o.put("cold_start_action", action.name());
+                arr.add(o);
+            }
+        });
+        return arr;
+    }
+
+    private static JSONArray buildStep8(FunnelContext ctx) {
+        JSONArray arr = new JSONArray();
+        for (int i = 0; i < ctx.getStep8OutputVideoIds().size(); i++) {
+            long vid = ctx.getStep8OutputVideoIds().get(i);
+            JSONObject o = new JSONObject();
+            o.put("vid", vid);
+            o.put("attributedPushFrom", ctx.getStep4MergedAttribution().get(vid));
+            o.put("recalls", buildRecallsForVid(ctx, vid));
+            RankVideoEntry r = ctx.getStep5RankedData().get(vid);
+            if (r != null) o.put("rank", buildRankBlock(r));
+            ColdStartAction action = ctx.getStep7ColdStartActions().getOrDefault(vid, ColdStartAction.NONE);
+            o.put("if_cold_start", action != ColdStartAction.NONE);
+            o.put("rank_index", i + 1);
+            arr.add(o);
+        }
+        return arr;
+    }
+
+    // ===== helpers =====
+    private static JSONArray buildRecallsForVid(FunnelContext ctx, long vid) {
+        JSONArray recalls = new JSONArray();
+        ctx.getStages123RecallByStrategy().forEach((pf, entries) -> {
+            for (RecallVideoEntry e : entries) {
+                if (e.getVideoId() != vid) continue;
+                if (e.getSelect() != SelectKind.SELF) continue;   // 只列该路实际贡献到合并的视频
+                JSONObject r = new JSONObject();
+                r.put("strategy", pf);
+                r.put("index", displayIndex(e.getIndex()));
+                r.put("score", round6(e.getScore()));
+                r.put("index_new", displayIndex(e.getIndexNewAfterFilter()));
+                r.put("truncate", e.getTruncate());
+                recalls.add(r);
+                break;
+            }
+        });
+        return recalls;
+    }
+
+    private static JSONObject buildRankBlock(RankVideoEntry r) {
+        JSONObject o = new JSONObject();
+        o.put("score", round6(r.getRankScore()));
+        if (MapUtils.isNotEmpty(r.getSubScores())) {
+            r.getSubScores().forEach((k, v) -> o.put(k, round6(v)));
+        }
+        return o;
+    }
+
+    /** double → 保留 6 位小数 */
+    private static BigDecimal round6(double v) {
+        return BigDecimal.valueOf(v).setScale(6, RoundingMode.HALF_UP);
+    }
+
+    /** 内部 0-based 位置 → 对外展示 1-based。负值(如 -1 表"未通过")原样返回 */
+    private static int displayIndex(int zeroBased) {
+        return zeroBased < 0 ? zeroBased : zeroBased + 1;
+    }
+}

+ 54 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelContext.java

@@ -0,0 +1,54 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * 单次推荐请求的漏斗中间态容器,挂在 RecallParam + RankParam 上跨服务传递。
+ * 末尾由 FunnelAggregator 序列化成一条 SLS 日志(8 个 step JSON 字段)。
+ *
+ * 并发安全说明:
+ * - stages123RecallByStrategy 被 30+ 召回 strategy 子线程并发写 → ConcurrentMap
+ * - 同 pushFrom 的 List 由 FilterService 单次写入,不会并发追加
+ * - 阶段 4-8 由主线程顺序写
+ */
+@Data
+public class FunnelContext {
+    // ===== base =====
+    private String traceId;
+    private String recommendTraceId;
+    private String sessionId;
+    private String subSessionId;
+    private String rootSessionId;
+    private String mid;
+    private int    appType;
+    private String newExpGroup;
+    private Set<String> abExpCodes;
+
+    /** Step 1-3: 召回 / 过滤 / 截断(per pushFrom,字段在 RecallVideoEntry 上分组) */
+    private ConcurrentMap<String, List<RecallVideoEntry>> stages123RecallByStrategy = new ConcurrentHashMap<>();
+
+    /** Step 4: 合并去重后顺序 */
+    private List<Long>         step4MergedVideoIds = new ArrayList<>();
+    /** Step 4: vid -> 合并时归因到哪条 pushFrom */
+    private Map<Long, String>  step4MergedAttribution = new HashMap<>();
+
+    /** Step 5: rank 模型打分结果 */
+    private Map<Long, RankVideoEntry> step5RankedData = new HashMap<>();
+
+    /** Step 6: 排序截断后顺序(rank_index = list 中位置 + 1) */
+    private List<Long>         step6RankTruncatedVideoIds = new ArrayList<>();
+
+    /** Step 7: 冷启替换标记 */
+    private Map<Long, ColdStartAction> step7ColdStartActions = new HashMap<>();
+
+    /** Step 8: 最终下发顺序(output_index = list 中位置 + 1) */
+    private List<Long>         step8OutputVideoIds = new ArrayList<>();
+}

+ 19 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogConfig.java

@@ -0,0 +1,19 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * 漏斗日志配置(Apollo key: funnel.log.config,JSON 整体配置)。
+ */
+@Data
+public class FunnelLogConfig {
+    /** 总开关 */
+    private boolean enabled = false;
+    /** 采样率 0-100。0=全关,100=全开。按 mid hash 决定单请求是否记录 */
+    private int sampleRate = 0;
+    /** 生效 appType 白名单。空集 = 不限制 */
+    private Set<Integer> appTypes = Collections.emptySet();
+}

+ 56 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java

@@ -0,0 +1,56 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import com.aliyun.openservices.aliyun.log.producer.LogProducer;
+import com.aliyun.openservices.aliyun.log.producer.Producer;
+import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
+import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
+import com.aliyun.openservices.log.common.LogItem;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+
+/**
+ * 推荐漏斗日志:一次请求一条记录,包含 8 个 step JSON 字段。
+ */
+@Service
+@Slf4j
+public class FunnelLogService {
+
+    @Value("${aliyun.log.project}")
+    private String project;
+    @Value("${aliyun.log.endpoint}")
+    private String endpoint;
+    @Value("${aliyun.log.accessKeyId}")
+    private String accessKeyId;
+    @Value("${aliyun.log.accessKeySecret}")
+    private String accessKeySecret;
+
+    private String logStore = "funnel-log";
+
+    private Producer producer;
+
+    @PostConstruct
+    public void init() {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producer = new LogProducer(producerConfig);
+        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
+    }
+
+    public void log(Map<String, String> row) {
+        if (MapUtils.isEmpty(row)) return;
+        try {
+            LogItem logItem = new LogItem();
+            row.forEach(logItem::PushBack);
+            producer.send(project, logStore, logItem);
+        } catch (InterruptedException e) {
+            log.warn("funnel log interrupted");
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            log.error("Failed to send funnel log", e);
+        }
+    }
+}

+ 18 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RankVideoEntry.java

@@ -0,0 +1,18 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/**
+ * 漏斗 阶段 5 单条记录(rank 模型打分后)。
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RankVideoEntry {
+    private double rankScore;
+    private Map<String, Double> subScores;  // 预留:rank 各分项 / 中间特征
+}

+ 40 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/RecallVideoEntry.java

@@ -0,0 +1,40 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 漏斗 阶段 1-3 单条记录(per pushFrom × videoId)。
+ * - 阶段 1 召回: index / score
+ * - 阶段 2 过滤: filteredIn / indexNewAfterFilter / filterReasons
+ * - 阶段 3 截断: select / truncate
+ */
+@Data
+@NoArgsConstructor
+public class RecallVideoEntry {
+    private long videoId;
+    private double score;
+    private int index;                  // 召回原始位置 (0-based)
+
+    private boolean filteredIn;
+    private int indexNewAfterFilter = -1;
+    private List<String> filterReasons;  // filteredIn=false 时被哪些 filter 干掉
+
+    /**
+     * 阶段 3 截断标记:
+     * - SELF  = 该 vid 在该路 top-recallNum 名次内 + 被该路实际贡献给合并列表
+     * - OTHER = 在该路 top-recallNum 名次内但被前面 pushFrom 抢走
+     * - null  = 未参与截断或在 top-N 之外
+     */
+    private SelectKind select;
+    /** 该路 recallNum 配额 (truncate) */
+    private int truncate;
+
+    public RecallVideoEntry(long videoId, double score, int index) {
+        this.videoId = videoId;
+        this.score = score;
+        this.index = index;
+    }
+}

+ 10 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/SelectKind.java

@@ -0,0 +1,10 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+/**
+ * 阶段 3 截断:该 vid 在该 pushFrom 的 top-recallNum 名次内被该路实际选中(SELF)
+ * 还是被前面其他 pushFrom 抢走(OTHER)。
+ */
+public enum SelectKind {
+    SELF,
+    OTHER
+}

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.recommend.server.service.rank;
 
 import com.tzld.piaoquan.recommend.server.model.MachineInfo;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
@@ -16,6 +17,8 @@ import java.util.Set;
 @Data
 public class RankParam {
     private RecallResult recallResult;
+    /** 漏斗上下文 (可空) */
+    private FunnelContext funnelContext;
     private int size;
     private int topK;
     private String rankKeyPrefix;

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java

@@ -15,4 +15,9 @@ import java.util.List;
 @AllArgsConstructor
 public class RankResult {
     private List<Video> videos;
+    private List<Long> candidateVideoIds;
+
+    public RankResult(List<Video> videos) {
+        this.videos = videos;
+    }
 }

+ 47 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -6,6 +6,8 @@ import com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum;
 import com.tzld.piaoquan.recommend.server.model.RankVideoInfo;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RankVideoEntry;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import lombok.extern.slf4j.Slf4j;
@@ -41,7 +43,11 @@ public abstract class RankService {
         if (2 == param.getRecommendType()) {
             tagDuplicateVideos(param);
             List<Video> rovRecallRank = mergeAndRankRovRecall(param);
-            return new RankResult(rovRecallRank);
+            writeFunnelMergedStage(param, rovRecallRank);
+            RankResult result = new RankResult(rovRecallRank);
+            result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+            writeFunnelRankTruncatedStage(param, result.getVideos());
+            return result;
         }
 
         if (param.isSpecialRecommend()) {
@@ -90,6 +96,7 @@ public abstract class RankService {
 
         // 2 正常走分发 排序+冷启动
         List<Video> rovRecallRank = mergeAndRankRovRecall(param);
+        writeFunnelMergedStage(param, rovRecallRank);
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);
 
         List<Video> douHotFlowPoolRank = extractAndSort(param, DouHotFlowPoolRecallStrategy.PUSH_FROM);
@@ -97,7 +104,45 @@ public abstract class RankService {
         removeDuplicate(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
 
         // 融合排序
-        return mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
+        RankResult result = mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
+        if (result != null) {
+            result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+            writeFunnelRankTruncatedStage(param, result.getVideos());
+        }
+        return result;
+    }
+
+    /** 阶段 4 + 阶段 5: 合并 + 排序打分 — rovRecallRank 顺序即合并顺序,Video.sortScore 是 rank 模型最终分 */
+    private static void writeFunnelMergedStage(RankParam param, List<Video> rovRecallRank) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rovRecallRank)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : rovRecallRank) {
+            ctx.getStep4MergedVideoIds().add(v.getVideoId());
+            if (v.getPushFrom() != null) {
+                ctx.getStep4MergedAttribution().put(v.getVideoId(), v.getPushFrom());
+            }
+            ctx.getStep5RankedData().put(v.getVideoId(), new RankVideoEntry(v.getSortScore(), null));
+        }
+    }
+
+    /** 阶段 6: 排序截断 — rankResult.videos 顺序 = rank_index */
+    private static void writeFunnelRankTruncatedStage(RankParam param, List<Video> rankedVideos) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(rankedVideos)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : rankedVideos) {
+            ctx.getStep6RankTruncatedVideoIds().add(v.getVideoId());
+        }
+    }
+
+    private static List<Long> toCandidateIds(List<Video> videos) {
+        if (CollectionUtils.isEmpty(videos)) {
+            return Collections.emptyList();
+        }
+        List<Long> ids = new ArrayList<>(videos.size());
+        for (Video v : videos) {
+            ids.add(v.getVideoId());
+        }
+        return ids;
     }
 
     private void tagDuplicateVideos(RankParam param) {

+ 18 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelBasic.java

@@ -6,6 +6,8 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.reflect.TypeToken;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.funnel.ColdStartAction;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.RankService;
@@ -60,6 +62,12 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
 
     String CLASS_NAME = this.getClass().getSimpleName();
 
+    /** 阶段 7: 冷启替换 — 来自 flowVideos / douHotFlowPoolVideos 的视频标记为 INSERTED */
+    private static void markColdStartInserted(FunnelContext ctx, Video v) {
+        if (ctx == null || v == null) return;
+        ctx.getStep7ColdStartActions().put(v.getVideoId(), ColdStartAction.INSERTED);
+    }
+
     public void duplicate(Set<Long> setVideo, List<Video> videos) {
         Iterator<Video> iterator = videos.iterator();
         while (iterator.hasNext()) {
@@ -135,6 +143,7 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         RankProcessorBoost.boostByFestive(param, rovVideos, rankReduceByFestiveConfig);
 
         // 7 流量池按比例强插
+        FunnelContext funnelCtx = param.getFunnelContext();
         List<Video> result = new ArrayList<>();
         for (int i = 0; i < param.getTopK() && i < rovVideos.size(); i++) {
             result.add(rovVideos.get(i));
@@ -146,13 +155,17 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
             double rand = RandomUtils.nextDouble(0, 1);
             if (rand < flowPoolP) {
                 if (flowPoolIndex < flowVideos.size()) {
-                    result.add(flowVideos.get(flowPoolIndex++));
+                    Video v = flowVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
                 } else {
                     break;
                 }
             } else if (this.isInsertDouHotFlowPoolVideo()) {
                 if (flowPoolIndex < douHotFlowPoolVideos.size()) {
-                    result.add(douHotFlowPoolVideos.get(flowPoolIndex++));
+                    Video v = douHotFlowPoolVideos.get(flowPoolIndex++);
+                    result.add(v);
+                    markColdStartInserted(funnelCtx, v);
                 } else {
                     break;
                 }
@@ -166,7 +179,9 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         }
         if (rovPoolIndex >= rovVideos.size()) {
             for (int i = flowPoolIndex; i < flowVideos.size() && result.size() < param.getSize(); i++) {
-                result.add(flowVideos.get(i));
+                Video v = flowVideos.get(i);
+                result.add(v);
+                markColdStartInserted(funnelCtx, v);
             }
         }
         if (flowPoolIndex >= flowVideos.size()) {

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java

@@ -11,6 +11,15 @@ import java.util.Map;
  */
 public class FilterParamFactory {
     public static FilterParam create(RecallParam param, List<Long> videoIds) {
+        return create(param, videoIds, null, null);
+    }
+
+    /**
+     * 漏斗版:额外透传 pushFrom + scoresMap,让 FilterService 记录 raw 阶段数据。
+     * 任一为 null 时退化为原行为(不记录漏斗 raw)。
+     */
+    public static FilterParam create(RecallParam param, List<Long> videoIds,
+                                     String pushFrom, Map<Long, Double> scoresMap) {
         FilterParam filterParam = new FilterParam();
         filterParam.setVideoIds(videoIds);
         filterParam.setAppType(param.getAppType());
@@ -25,6 +34,10 @@ public class FilterParamFactory {
         filterParam.setCityCode(param.getCityCode());
         filterParam.setHotSceneType(param.getHotSceneType());
         filterParam.setClientIp(param.getClientIp());
+
+        filterParam.setPushFrom(pushFrom);
+        filterParam.setScoresMap(scoresMap);
+        filterParam.setFunnelContext(param.getFunnelContext());
         return filterParam;
     }
 }

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import lombok.Data;
@@ -13,6 +14,9 @@ import java.util.Set;
  */
 @Data
 public class RecallParam {
+    /** 漏斗上下文 (可空):填了才记漏斗 */
+    private FunnelContext funnelContext;
+
     private String regionCode;
     private String cityCode;
     private String mid;

+ 25 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -5,6 +5,8 @@ import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.ExperimentService;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -84,9 +86,32 @@ public class RecallService implements ApplicationContextAware {
             }
         }
 
+        syncScoresToFunnel(param.getFunnelContext(), results);
         return new RecallResult(results);
     }
 
+    /**
+     * 漏斗:把 filter 后 Video.rovScore 同步回 funnel entry。
+     * 部分 strategy 在 filter 前没传 scoresMap (entry.score 默认 0),
+     * filter 后 strategy 才把 rovScore set 到 Video 上。这里统一回写。
+     */
+    private static void syncScoresToFunnel(FunnelContext ctx, List<RecallResult.RecallData> results) {
+        if (ctx == null || results == null) return;
+        for (RecallResult.RecallData data : results) {
+            if (data == null || data.getVideos() == null) continue;
+            List<RecallVideoEntry> entries = ctx.getStages123RecallByStrategy().get(data.getPushFrom());
+            if (entries == null) continue;
+            java.util.Map<Long, Double> videoScores = new java.util.HashMap<>(data.getVideos().size());
+            for (Video v : data.getVideos()) {
+                videoScores.put(v.getVideoId(), v.getRovScore());
+            }
+            for (RecallVideoEntry e : entries) {
+                Double s = videoScores.get(e.getVideoId());
+                if (s != null) e.setScore(s);   // 覆盖:filter 通过的 entry 用 Video.rovScore(最终 score)
+            }
+        }
+    }
+
     private List<RecallStrategy> getRecallStrategy(RecallParam param) {
         List<RecallStrategy> strategies = new ArrayList<>();
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java

@@ -46,7 +46,7 @@ public abstract class AbstractFlowPoolWithLevelRecallStrategy implements RecallS
             videoFlowPoolMap.put(NumberUtils.toLong(values[0], 0), values[1]);
         }
         FilterResult filterResult = filterService.filter(FilterParamFactory.create(param,
-                new ArrayList<>(videoFlowPoolMap.keySet())));
+                new ArrayList<>(videoFlowPoolMap.keySet()), pushFrom(), null));
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             filterResult.getVideoIds().stream().forEach(vid -> {
                 Video recallData = new Video();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRedisRecallStrategy.java

@@ -91,7 +91,7 @@ public abstract class AbstractRedisRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java

@@ -129,7 +129,7 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
                 lastVideoId = t.getValue();
                 videoMap.put(NumberUtils.toLong(t.getValue(), 0), t.getScore());
             }
-            FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+            FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
 
             FilterResult filterResult = filterService.filter(filterParam);
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java

@@ -56,7 +56,7 @@ public abstract class AbstractVideoRecallStrategy implements RecallStrategy {
                 v -> NumberUtils.toDouble(v.get(1)));
         List<Video> results = new ArrayList<>();
 
-        FilterResult filterResult = filterService.filter(FilterParamFactory.create(param, Lists.newArrayList(videoScoreMap.keySet())));
+        FilterResult filterResult = filterService.filter(FilterParamFactory.create(param, Lists.newArrayList(videoScoreMap.keySet()), pushFrom(), videoScoreMap));
 
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             filterResult.getVideoIds().stream().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AppFallbackRecallStrategy.java

@@ -54,7 +54,7 @@ public class AppFallbackRecallStrategy implements RecallStrategy {
         List<Long> videoIdList = Arrays.asList(videoIds.split(",")).stream()
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
-        FilterParam filterParam = FilterParamFactory.create(param, videoIdList);
+        FilterParam filterParam = FilterParamFactory.create(param, videoIdList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/BlessRecallStrategy.java

@@ -58,7 +58,7 @@ public class BlessRecallStrategy implements RecallStrategy {
         for (Pair<Long, Double> v : result) {
             videoMap.put(v.getLeft(), v.getRight());
         }
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerHeadRovnRecallStrategy.java

@@ -102,7 +102,7 @@ public class ChannelLayerHeadRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerRovnRecallStrategy.java

@@ -93,7 +93,7 @@ public class ChannelLayerRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnAllRovRecallStrategy.java

@@ -85,7 +85,7 @@ public class CityRovnAllRovRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class CityRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java

@@ -111,7 +111,7 @@ public class DouHotFlowPoolRecallStrategy extends AbstractFlowPoolWithLevelRecal
             videoIdAndFlowPoolMap.put(Long.valueOf(split[0]), split[1]);
         }
 
-        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(videoIdAndFlowPoolMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(videoIdAndFlowPoolMap.keySet()), pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
 
         // 对视频随机打断

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomson.java

@@ -145,7 +145,7 @@ public class FlowPoolWithLevelRecallStrategyTomson extends AbstractFlowPoolWithL
                 ));
 
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()), pushFrom(), resultmap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate1STRRecallStrategy.java

@@ -84,7 +84,7 @@ public class HeadCate1STRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2AndChannelRovRecallStrategy.java

@@ -82,7 +82,7 @@ public class HeadCate2AndChannelRovRecallStrategy implements RecallStrategy {
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
 
-        FilterParam filterParam = FilterParamFactory.create(param, allVid);
+        FilterParam filterParam = FilterParamFactory.create(param, allVid, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         Set<Long> filterVids = new HashSet<>(filterResult.getVideoIds());
         filterVids.remove(param.getVideoId());

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2RovRecallStrategy.java

@@ -80,7 +80,7 @@ public class HeadCate2RovRecallStrategy implements RecallStrategy {
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
 
-        FilterParam filterParam = FilterParamFactory.create(param, allVid);
+        FilterParam filterParam = FilterParamFactory.create(param, allVid, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         Set<Long> filterVids = new HashSet<>(filterResult.getVideoIds());
         filterVids.remove(param.getVideoId());

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2STRRecallStrategy.java

@@ -84,7 +84,7 @@ public class HeadCate2STRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate1RecallStrategy.java

@@ -85,7 +85,7 @@ public class HeadProvinceCate1RecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate2RecallStrategy.java

@@ -85,7 +85,7 @@ public class HeadProvinceCate2RecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HotReturnUvRecallStrategy.java

@@ -86,7 +86,7 @@ public class HotReturnUvRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRosRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceRosRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceStrRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceStrRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class ProvinceRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceSTRRecallStrategy.java

@@ -78,7 +78,7 @@ public class ProvinceSTRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyROS.java

@@ -50,7 +50,7 @@ public class RegionRealtimeRecallStrategyROS implements RecallStrategy {
         }
         long t1 = System.currentTimeMillis();
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1.java

@@ -51,7 +51,7 @@ public class RegionRealtimeRecallStrategyV1 implements RecallStrategy {
         }
         long t1 = System.currentTimeMillis();
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         List<Long> videosResultId = new ArrayList<>();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1AllRov.java

@@ -44,7 +44,7 @@ public class RegionRealtimeRecallStrategyV1AllRov implements RecallStrategy {
             videoMap.put(v.getLeft(), v.getRight());
         }
 
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV7LongTermV1.java

@@ -42,7 +42,7 @@ public class RegionRealtimeRecallStrategyV7LongTermV1 implements RecallStrategy
         for (Pair<Long, Double> v : result) {
             videoMap.put(v.getLeft(), v.getRight());
         }
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRosnRecallStrategy.java

@@ -49,7 +49,7 @@ public class SceneCFRosnRecallStrategy implements RecallStrategy {
             }
             Map<Long, Double> vid2Score = getIdScoreMap(headVid, redisValue);
             List<Long> vids = new ArrayList<>(vid2Score.keySet());
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), vid2Score);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRovnRecallStrategy.java

@@ -49,7 +49,7 @@ public class SceneCFRovnRecallStrategy implements RecallStrategy {
             }
             Map<Long, Double> vid2Score = getIdScoreMap(headVid, redisValue);
             List<Long> vids = new ArrayList<>(vid2Score.keySet());
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), vid2Score);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 filterResult.getVideoIds().forEach(vid -> {

+ 3 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SocialI2IBasicRecallStrategy.java

@@ -61,17 +61,15 @@ public abstract class SocialI2IBasicRecallStrategy implements RecallStrategy {
 
             List<Long> videoIds = recallPair.stream().map(Pair::getKey).collect(Collectors.toList());
 
+            Map<Long, Double> videoScoreMap = recallPair.stream()
+                    .collect(Collectors.toMap(Pair::getKey, Pair::getValue, (o1, o2) -> o1));
 
             // 视频过滤
-            FilterParam filterParam = FilterParamFactory.create(param, videoIds);
+            FilterParam filterParam = FilterParamFactory.create(param, videoIds, pushFrom(), videoScoreMap);
             FilterResult filterResult = filterService.filter(filterParam);
             if (Objects.isNull(filterResult) || CollectionUtils.isEmpty(filterResult.getVideoIds())) {
                 return videos;
             }
-
-            // 返回结果
-            Map<Long, Double> videoScoreMap = recallPair.stream()
-                    .collect(Collectors.toMap(Pair::getKey, Pair::getValue, (o1, o2) -> o1));
             for (Long videoId : filterResult.getVideoIds()) {
                 Video video = new Video();
                 video.setVideoId(videoId);

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate1RecallStrategy.java

@@ -50,7 +50,7 @@ public class UserCate1RecallStrategy implements RecallStrategy {
                 if (!keys.isEmpty()) {
                     List<String> values = redisTemplate.opsForValue().multiGet(keys);
                     List<Long> ids = recall(param.getVideoId(), values);
-                    FilterParam filterParam = FilterParamFactory.create(param, ids);
+                    FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
                     FilterResult filterResult = filterService.filter(filterParam);
                     if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                         List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate2RecallStrategy.java

@@ -50,7 +50,7 @@ public class UserCate2RecallStrategy implements RecallStrategy {
                 if (!keys.isEmpty()) {
                     List<String> values = redisTemplate.opsForValue().multiGet(keys);
                     List<Long> ids = recall(param.getVideoId(), values);
-                    FilterParam filterParam = FilterParamFactory.create(param, ids);
+                    FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
                     FilterResult filterResult = filterService.filter(filterParam);
                     if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                         List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserDeconstructionKeywordsRecallStrategy.java

@@ -58,7 +58,7 @@ public class UserDeconstructionKeywordsRecallStrategy implements RecallStrategy
             }
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> vids = this.recall(param.getVideoId(), values);
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), null);
 
             FilterResult filterResult = filterService.filter(filterParam);
             if (Objects.isNull(filterResult) || CollectionUtils.isEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearReturnCate2RecallStrategy.java

@@ -167,7 +167,7 @@ public class YearReturnCate2RecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate1RecallStrategy.java

@@ -81,7 +81,7 @@ public class YearShareCate1RecallStrategy implements RecallStrategy {
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> ids = recall(param.getVideoId(), values);
 
-            FilterParam filterParam = FilterParamFactory.create(param, ids);
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate2RecallStrategy.java

@@ -80,7 +80,7 @@ public class YearShareCate2RecallStrategy implements RecallStrategy {
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> ids = recall(param.getVideoId(), values);
 
-            FilterParam filterParam = FilterParamFactory.create(param, ids);
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 List<Long> filterIds = filterResult.getVideoIds();

+ 71 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -1,6 +1,9 @@
 package com.tzld.piaoquan.recommend.server.util;
 
 import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelContext;
+import com.tzld.piaoquan.recommend.server.service.funnel.RecallVideoEntry;
+import com.tzld.piaoquan.recommend.server.service.funnel.SelectKind;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
@@ -60,15 +63,78 @@ public class RecallUtils {
 
         rovRecallRank.addAll(v0);
         setVideo.addAll(v0.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+        markSelfMixed(param, v0, sizeReturn);
     }
 
     public static void extractRecall(int recallNum, RankParam param, String pushFrom, Set<Long> setVideo, List<Video> rovRecallRank) {
         if (recallNum > 0) {
-            List<Video> list = extractAndSort(param, pushFrom);
-            list = list.stream().filter(r -> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
-            list = list.subList(0, Math.min(recallNum, list.size()));
-            rovRecallRank.addAll(list);
-            setVideo.addAll(list.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+            List<Video> rawFiltered = extractAndSort(param, pushFrom);
+            List<Video> dedupedList = rawFiltered.stream()
+                    .filter(r -> !setVideo.contains(r.getVideoId()))
+                    .collect(Collectors.toList());
+            List<Video> selfList = dedupedList.subList(0, Math.min(recallNum, dedupedList.size()));
+            rovRecallRank.addAll(selfList);
+            setVideo.addAll(selfList.stream().map(Video::getVideoId).collect(Collectors.toSet()));
+            markStep3Select(param, pushFrom, recallNum, rawFiltered, selfList);
+        }
+    }
+
+    /**
+     * 阶段 3 截断标记 (单 pushFrom)。业务行为不变:
+     * - SELF  = selfList 全集(业务实际贡献给合并的视频)
+     * - OTHER = rawFiltered 前 recallNum 名次内但不在 selfList 中(即被前面 pushFrom 抢走的)
+     */
+    private static void markStep3Select(RankParam param, String pushFrom, int recallNum,
+                                        List<Video> rawFiltered, List<Video> selfList) {
+        if (param == null || param.getFunnelContext() == null
+                || StringUtils.isBlank(pushFrom)) {
+            return;
+        }
+        List<RecallVideoEntry> entries = param.getFunnelContext().getStages123RecallByStrategy().get(pushFrom);
+        if (CollectionUtils.isEmpty(entries)) return;
+        Map<Long, RecallVideoEntry> byVid = new HashMap<>(entries.size());
+        for (RecallVideoEntry e : entries) byVid.put(e.getVideoId(), e);
+
+        Set<Long> selfIds = new HashSet<>();
+        for (Video v : selfList) selfIds.add(v.getVideoId());
+
+        // SELF
+        for (Video v : selfList) {
+            RecallVideoEntry e = byVid.get(v.getVideoId());
+            if (e != null) {
+                e.setSelect(SelectKind.SELF);
+                e.setTruncate(recallNum);
+            }
+        }
+        // OTHER: rawFiltered 前 recallNum 个里不在 selfList 中
+        int topN = Math.min(recallNum, rawFiltered.size());
+        for (int i = 0; i < topN; i++) {
+            Video v = rawFiltered.get(i);
+            if (selfIds.contains(v.getVideoId())) continue;
+            RecallVideoEntry e = byVid.get(v.getVideoId());
+            if (e != null && e.getSelect() == null) {
+                e.setSelect(SelectKind.OTHER);
+                e.setTruncate(recallNum);
+            }
+        }
+    }
+
+    /** 阶段 3 截断: 多路混合 (extractOldSpecialRecall) — 按 Video.pushFrom 分组归因 SELF。OTHER 这里不计算(多路合一截断的归因复杂,先 skip) */
+    private static void markSelfMixed(RankParam param, List<Video> list, int truncate) {
+        if (param == null || param.getFunnelContext() == null || CollectionUtils.isEmpty(list)) return;
+        FunnelContext ctx = param.getFunnelContext();
+        for (Video v : list) {
+            String pf = v.getPushFrom();
+            if (StringUtils.isBlank(pf)) continue;
+            List<RecallVideoEntry> entries = ctx.getStages123RecallByStrategy().get(pf);
+            if (CollectionUtils.isEmpty(entries)) continue;
+            for (RecallVideoEntry e : entries) {
+                if (e.getVideoId() == v.getVideoId()) {
+                    e.setSelect(SelectKind.SELF);
+                    e.setTruncate(truncate);
+                    break;
+                }
+            }
         }
     }