فهرست منبع

feat:漏斗日志骨架 (召回/过滤/排序/preview 4阶段, 视频x策略粒度)

- 新增 FunnelRawItem / FunnelLogService / FunnelAggregator
- RecallParam 加 funnelSink; FilterParam 加 pushFrom/scoresMap/funnelSink
- FilterService 在 sink 非空时按 pushFrom 写入 raw 列表 (id,score,strategyRank)
- RankResult 加 candidateVideoIds, RankService 透出 rovRecallRank
- RecommendService.videoRecommend 末尾调 FunnelAggregator+FunnelLogService 一次性 emit
- 37 个 RecallStrategy 迁移到 FilterParamFactory 新签名 (pushFrom + scoresMap)

下一步: 扩展到 8 阶段 (加召回截断/合并/排序截断/冷启 stage),
final schema 改为单条记录 + 每阶段一个 JSON 字段。
yangxiaohui 1 هفته پیش
والد
کامیت
d648d2e69c
48فایلهای تغییر یافته به همراه367 افزوده شده و 45 حذف شده
  1. 26 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  2. 9 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java
  3. 22 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java
  4. 141 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java
  5. 63 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java
  6. 18 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelRawItem.java
  7. 5 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java
  8. 19 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java
  9. 13 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java
  10. 5 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java
  11. 7 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java
  12. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java
  13. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRedisRecallStrategy.java
  14. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java
  15. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java
  16. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AppFallbackRecallStrategy.java
  17. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/BlessRecallStrategy.java
  18. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerHeadRovnRecallStrategy.java
  19. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerRovnRecallStrategy.java
  20. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnAllRovRecallStrategy.java
  21. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnRecallStrategy.java
  22. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java
  23. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomson.java
  24. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate1STRRecallStrategy.java
  25. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2AndChannelRovRecallStrategy.java
  26. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2RovRecallStrategy.java
  27. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2STRRecallStrategy.java
  28. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate1RecallStrategy.java
  29. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate2RecallStrategy.java
  30. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HotReturnUvRecallStrategy.java
  31. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRosRecallStrategy.java
  32. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRovnRecallStrategy.java
  33. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceStrRecallStrategy.java
  34. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceRovnRecallStrategy.java
  35. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceSTRRecallStrategy.java
  36. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyROS.java
  37. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1.java
  38. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1AllRov.java
  39. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV7LongTermV1.java
  40. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRosnRecallStrategy.java
  41. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRovnRecallStrategy.java
  42. 3 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SocialI2IBasicRecallStrategy.java
  43. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate1RecallStrategy.java
  44. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate2RecallStrategy.java
  45. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserDeconstructionKeywordsRecallStrategy.java
  46. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearReturnCate2RecallStrategy.java
  47. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate1RecallStrategy.java
  48. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate2RecallStrategy.java

+ 26 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -24,6 +24,9 @@ import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolService;
 import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
 import com.tzld.piaoquan.recommend.server.service.rank.RankRouter;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelAggregator;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelLogService;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
@@ -50,6 +53,8 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.ZoneId;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -81,6 +86,8 @@ public class RecommendService {
     @Autowired
     private TimerLogService timerLogService;
     @Autowired
+    private FunnelLogService funnelLogService;
+    @Autowired
     private FeatureService featureService;
 
     @Autowired
@@ -187,7 +194,7 @@ public class RecommendService {
             timerLogMapTL.get().put("genRecommendParamTime", genRecommendParamTime);
             // log.info("genRecommendParam={},genRecommendParam cost={}", JSONUtils.toJson(param),
             //         genRecommendParamTime);
-            List<Video> videos = videoRecommend(param);
+            List<Video> videos = videoRecommend(param, request);
             stopwatch.reset().start();
             updateCache(request, param, videos);
             long updateCacheTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -525,7 +532,7 @@ public class RecommendService {
         return null;
     }
 
-    private List<Video> videoRecommend(RecommendParam param) {
+    private List<Video> videoRecommend(RecommendParam param, RecommendRequest request) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         String vid = String.valueOf(param.getVideoId());
         UserShareReturnProfile userProfile = featureService.getUserProfile(param.getUid(), param.getMid());
@@ -574,6 +581,8 @@ public class RecommendService {
         recallParam.setUserSocialRecallInfo(userSocialRecallInfo);
         recallParam.setUserNetworkSeqFeature(userNetworkSeqFeature);
         recallParam.setUserNetworkSeqVideoInfoMap(userNetworkSeqVideoInfoMap);
+        ConcurrentMap<String, List<FunnelRawItem>> funnelSink = new ConcurrentHashMap<>();
+        recallParam.setFunnelSink(funnelSink);
         RecallResult recallResult = recallService.recall(recallParam);
 
         long recallTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
@@ -594,6 +603,7 @@ public class RecommendService {
 
 
         if (rankResult == null || CollectionUtils.isEmpty(rankResult.getVideos())) {
+            logFunnel(request, param, recallParam, recallResult, rankResult, Collections.emptyList());
             return Collections.emptyList();
         }
 
@@ -602,9 +612,23 @@ public class RecommendService {
         if (param.getSize() < rankResult.getVideos().size()) {
             videos = rankResult.getVideos().subList(0, param.getSize());
         }
+        logFunnel(request, param, recallParam, recallResult, rankResult, videos);
         return videos;
     }
 
+    private void logFunnel(RecommendRequest request, RecommendParam param, RecallParam recallParam,
+                           RecallResult recallResult, RankResult rankResult, List<Video> returnedVideos) {
+        try {
+            String traceId = String.valueOf(TraceUtils.currentTraceId());
+            List<Map<String, String>> rows = FunnelAggregator.build(
+                    traceId, request, param, recallParam.getFunnelSink(),
+                    recallResult, rankResult, returnedVideos);
+            funnelLogService.log(rows);
+        } catch (Exception e) {
+            log.error("logFunnel error", e);
+        }
+    }
+
     public RecallParam convertToRecallParam(RecommendParam param) {
         RecallParam recallParam = new RecallParam();
         recallParam.setAppType(param.getAppType());

+ 9 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java

@@ -1,10 +1,12 @@
 package com.tzld.piaoquan.recommend.server.service.filter;
 
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import lombok.Data;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author dyp
@@ -16,6 +18,13 @@ public class FilterParam {
     private String mid;
     private String uid;
 
+    /** 漏斗:调用方所属召回策略 pushFrom (可空,仅漏斗记录用) */
+    private String pushFrom;
+    /** 漏斗:videoId -> 召回 score (可空) */
+    private Map<Long, Double> scoresMap;
+    /** 漏斗:raw 输入 sink (可空,pushFrom -> List<FunnelRawItem>) */
+    private ConcurrentMap<String, List<FunnelRawItem>> funnelSink;
+
     // 风险过滤
     private String regionCode;
     private String cityCode;

+ 22 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterService.java

@@ -5,9 +5,11 @@ import com.google.common.collect.Lists;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
 import com.tzld.piaoquan.recommend.server.service.filter.strategy.*;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
@@ -31,10 +33,30 @@ public class FilterService {
     private Map<String, Integer> filterExpConfig;
 
     public FilterResult filter(FilterParam param) {
+        recordRawToFunnelSink(param);
         List<Long> videoIds = viewFilter(param);
         return new FilterResult(videoIds);
     }
 
+    private void recordRawToFunnelSink(FilterParam param) {
+        if (param == null || param.getFunnelSink() == null || StringUtils.isBlank(param.getPushFrom())
+                || CollectionUtils.isEmpty(param.getVideoIds())) {
+            return;
+        }
+        List<Long> ids = param.getVideoIds();
+        Map<Long, Double> scoresMap = param.getScoresMap();
+        List<FunnelRawItem> rawList = new ArrayList<>(ids.size());
+        for (int i = 0; i < ids.size(); i++) {
+            long id = ids.get(i);
+            double score = scoresMap != null ? scoresMap.getOrDefault(id, 0.0) : 0.0;
+            rawList.add(new FunnelRawItem(id, score, i));
+        }
+        param.getFunnelSink().merge(param.getPushFrom(), rawList, (oldList, newList) -> {
+            oldList.addAll(newList);
+            return oldList;
+        });
+    }
+
     private List<Long> viewFilter(FilterParam param) {
 
         List<FilterStrategy> strategies = getStrategies(param);

+ 141 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelAggregator.java

@@ -0,0 +1,141 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import com.google.common.base.Strings;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+import com.tzld.piaoquan.recommend.server.model.RecommendParam;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.rank.RankResult;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 把 4 阶段(raw / filtered / ranked / preview)合并成每 (videoId, pushFrom) 一行。
+ * 下游 BI: group by videoId max(各 flag) 出整体漏斗;group by pushFrom 出分策略漏斗。
+ */
+public class FunnelAggregator {
+
+    private static class Row {
+        boolean inRecall;
+        boolean inFiltered;
+        double recallScore;
+        int strategyRank;
+    }
+
+    public static List<Map<String, String>> build(
+            String traceId,
+            RecommendRequest request,
+            RecommendParam param,
+            Map<String, List<FunnelRawItem>> funnelSink,
+            RecallResult recallResult,
+            RankResult rankResult,
+            List<Video> returnedVideos) {
+
+        // rank 阶段:被 mergeAndRankRovRecall 挑中进打分的视频 id 集合
+        Set<Long> rankedSet = new HashSet<>();
+        if (rankResult != null && rankResult.getCandidateVideoIds() != null) {
+            rankedSet.addAll(rankResult.getCandidateVideoIds());
+        }
+
+        // rank 最终位置 + 最终 score (rankResult.videos 是 rank 截断前的整序列)
+        Map<Long, Integer> rankPosMap = new HashMap<>();
+        Map<Long, Double> rankScoreMap = new HashMap<>();
+        if (rankResult != null && CollectionUtils.isNotEmpty(rankResult.getVideos())) {
+            List<Video> rv = rankResult.getVideos();
+            for (int i = 0; i < rv.size(); i++) {
+                Video v = rv.get(i);
+                rankPosMap.put(v.getVideoId(), i + 1);
+                rankScoreMap.put(v.getVideoId(), v.getSortScore());
+            }
+        }
+
+        // preview = 截断后下发
+        Map<Long, Integer> previewPosMap = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(returnedVideos)) {
+            for (int i = 0; i < returnedVideos.size(); i++) {
+                previewPosMap.put(returnedVideos.get(i).getVideoId(), i + 1);
+            }
+        }
+
+        // (pushFrom -> videoId -> Row)
+        Map<String, Map<Long, Row>> table = new LinkedHashMap<>();
+
+        // raw
+        if (funnelSink != null) {
+            funnelSink.forEach((pushFrom, items) -> {
+                Map<Long, Row> byVid = table.computeIfAbsent(pushFrom, k -> new LinkedHashMap<>());
+                for (FunnelRawItem item : items) {
+                    Row r = byVid.computeIfAbsent(item.getVideoId(), k -> new Row());
+                    r.inRecall = true;
+                    r.recallScore = item.getScore();
+                    r.strategyRank = item.getStrategyRank();
+                }
+            });
+        }
+
+        // filtered
+        if (recallResult != null && CollectionUtils.isNotEmpty(recallResult.getData())) {
+            for (RecallResult.RecallData d : recallResult.getData()) {
+                if (CollectionUtils.isEmpty(d.getVideos())) {
+                    continue;
+                }
+                String pushFrom = d.getPushFrom();
+                Map<Long, Row> byVid = table.computeIfAbsent(pushFrom, k -> new LinkedHashMap<>());
+                for (int i = 0; i < d.getVideos().size(); i++) {
+                    Video v = d.getVideos().get(i);
+                    Row r = byVid.computeIfAbsent(v.getVideoId(), k -> new Row());
+                    r.inFiltered = true;
+                    // 兜底:strategy 不走 FilterService 时 raw 没记,从 Video 补
+                    if (!r.inRecall) {
+                        r.recallScore = v.getRovScore();
+                        r.strategyRank = i;
+                    }
+                }
+            }
+        }
+
+        List<Map<String, String>> rows = new ArrayList<>();
+        Map<String, String> base = buildBase(traceId, request, param);
+        table.forEach((pushFrom, byVid) -> byVid.forEach((vid, r) -> {
+            Map<String, String> row = new HashMap<>(base);
+            row.put("videoId", String.valueOf(vid));
+            row.put("pushFrom", pushFrom);
+            row.put("recallScore", String.valueOf(r.recallScore));
+            row.put("strategyRank", String.valueOf(r.strategyRank));
+            row.put("inRecall", r.inRecall ? "1" : "0");
+            row.put("inFiltered", r.inFiltered ? "1" : "0");
+            row.put("inRanked", rankedSet.contains(vid) ? "1" : "0");
+            row.put("inPreview", previewPosMap.containsKey(vid) ? "1" : "0");
+            row.put("rankScore", String.valueOf(rankScoreMap.getOrDefault(vid, 0.0)));
+            row.put("rankPos", String.valueOf(rankPosMap.getOrDefault(vid, 0)));
+            row.put("previewPos", String.valueOf(previewPosMap.getOrDefault(vid, 0)));
+            rows.add(row);
+        }));
+        return rows;
+    }
+
+    private static Map<String, String> buildBase(String traceId, RecommendRequest request, RecommendParam param) {
+        Map<String, String> base = new HashMap<>();
+        base.put("traceId", Strings.nullToEmpty(traceId));
+        if (request != null) {
+            base.put("recommendTraceId", Strings.nullToEmpty(request.getRecommendTraceId()));
+            base.put("sessionId", Strings.nullToEmpty(request.getSessionId()));
+            base.put("rootSessionId", Strings.nullToEmpty(request.getRootSessionId()));
+            base.put("mid", Strings.nullToEmpty(request.getMid()));
+            base.put("appType", String.valueOf(request.getAppType()));
+            base.put("newexpgroup", Strings.nullToEmpty(request.getNewExpGroup()));
+        }
+        if (param != null) {
+            base.put("abExpCode", JSONUtils.toJson(param.getAbExpCodes()));
+        }
+        return base;
+    }
+}

+ 63 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelLogService.java

@@ -0,0 +1,63 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import com.aliyun.openservices.aliyun.log.producer.LogProducer;
+import com.aliyun.openservices.aliyun.log.producer.Producer;
+import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
+import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
+import com.aliyun.openservices.log.common.LogItem;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 推荐漏斗日志 (video × pushFrom 粒度,4 阶段 flag)。
+ */
+@Service
+@Slf4j
+public class FunnelLogService {
+
+    @Value("${aliyun.log.project}")
+    private String project;
+    @Value("${aliyun.log.endpoint}")
+    private String endpoint;
+    @Value("${aliyun.log.accessKeyId}")
+    private String accessKeyId;
+    @Value("${aliyun.log.accessKeySecret}")
+    private String accessKeySecret;
+
+    private String logStore = "funnel-log";
+
+    private Producer producer;
+
+    @PostConstruct
+    public void init() {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producer = new LogProducer(producerConfig);
+        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
+    }
+
+    public void log(List<Map<String, String>> data) {
+        if (CollectionUtils.isEmpty(data)) {
+            return;
+        }
+        try {
+            List<LogItem> items = data.stream().map(d -> {
+                LogItem logItem = new LogItem();
+                d.entrySet().forEach(e -> logItem.PushBack(e.getKey(), e.getValue()));
+                return logItem;
+            }).collect(Collectors.toList());
+            producer.send(project, logStore, items);
+        } catch (InterruptedException e) {
+            log.warn("funnel log interrupted");
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            log.error("Failed to send funnel logs", e);
+        }
+    }
+}

+ 18 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/funnel/FunnelRawItem.java

@@ -0,0 +1,18 @@
+package com.tzld.piaoquan.recommend.server.service.funnel;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 漏斗 raw 阶段(filter 前)单条记录。
+ * strategyRank 为该 video 在所属召回策略内部的位置(0-based,按策略原始排序)。
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class FunnelRawItem {
+    private long videoId;
+    private double score;
+    private int strategyRank;
+}

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java

@@ -15,4 +15,9 @@ import java.util.List;
 @AllArgsConstructor
 public class RankResult {
     private List<Video> videos;
+    private List<Long> candidateVideoIds;
+
+    public RankResult(List<Video> videos) {
+        this.videos = videos;
+    }
 }

+ 19 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -41,7 +41,9 @@ public abstract class RankService {
         if (2 == param.getRecommendType()) {
             tagDuplicateVideos(param);
             List<Video> rovRecallRank = mergeAndRankRovRecall(param);
-            return new RankResult(rovRecallRank);
+            RankResult result = new RankResult(rovRecallRank);
+            result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+            return result;
         }
 
         if (param.isSpecialRecommend()) {
@@ -97,7 +99,22 @@ public abstract class RankService {
         removeDuplicate(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
 
         // 融合排序
-        return mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
+        RankResult result = mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
+        if (result != null) {
+            result.setCandidateVideoIds(toCandidateIds(rovRecallRank));
+        }
+        return result;
+    }
+
+    private static List<Long> toCandidateIds(List<Video> videos) {
+        if (CollectionUtils.isEmpty(videos)) {
+            return Collections.emptyList();
+        }
+        List<Long> ids = new ArrayList<>(videos.size());
+        for (Video v : videos) {
+            ids.add(v.getVideoId());
+        }
+        return ids;
     }
 
     private void tagDuplicateVideos(RankParam param) {

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/FilterParamFactory.java

@@ -11,6 +11,15 @@ import java.util.Map;
  */
 public class FilterParamFactory {
     public static FilterParam create(RecallParam param, List<Long> videoIds) {
+        return create(param, videoIds, null, null);
+    }
+
+    /**
+     * 漏斗版:额外透传 pushFrom + scoresMap,让 FilterService 记录 raw 阶段数据。
+     * 任一为 null 时退化为原行为(不记录漏斗 raw)。
+     */
+    public static FilterParam create(RecallParam param, List<Long> videoIds,
+                                     String pushFrom, Map<Long, Double> scoresMap) {
         FilterParam filterParam = new FilterParam();
         filterParam.setVideoIds(videoIds);
         filterParam.setAppType(param.getAppType());
@@ -25,6 +34,10 @@ public class FilterParamFactory {
         filterParam.setCityCode(param.getCityCode());
         filterParam.setHotSceneType(param.getHotSceneType());
         filterParam.setClientIp(param.getClientIp());
+
+        filterParam.setPushFrom(pushFrom);
+        filterParam.setScoresMap(scoresMap);
+        filterParam.setFunnelSink(param.getFunnelSink());
         return filterParam;
     }
 }

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserSRBO;
 import com.tzld.piaoquan.recommend.server.service.rank.bo.UserShareReturnProfile;
 import lombok.Data;
@@ -7,12 +8,16 @@ import lombok.Data;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author dyp
  */
 @Data
 public class RecallParam {
+    /** 漏斗 raw sink (可空):每路召回策略在 filter 前把 (id, score, strategyRank) 写入 sink[pushFrom] */
+    private ConcurrentMap<String, List<FunnelRawItem>> funnelSink;
+
     private String regionCode;
     private String cityCode;
     private String mid;

+ 7 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
 import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.funnel.FunnelRawItem;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -26,6 +27,12 @@ public class RecallResult {
     public static class RecallData {
         private String pushFrom;
         private List<Video> videos;
+        private List<FunnelRawItem> rawList;
+
+        public RecallData(String pushFrom, List<Video> videos) {
+            this.pushFrom = pushFrom;
+            this.videos = videos;
+        }
     }
 
     public List<Video> mergeRecallVideos() {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java

@@ -46,7 +46,7 @@ public abstract class AbstractFlowPoolWithLevelRecallStrategy implements RecallS
             videoFlowPoolMap.put(NumberUtils.toLong(values[0], 0), values[1]);
         }
         FilterResult filterResult = filterService.filter(FilterParamFactory.create(param,
-                new ArrayList<>(videoFlowPoolMap.keySet())));
+                new ArrayList<>(videoFlowPoolMap.keySet()), pushFrom(), null));
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             filterResult.getVideoIds().stream().forEach(vid -> {
                 Video recallData = new Video();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRedisRecallStrategy.java

@@ -91,7 +91,7 @@ public abstract class AbstractRedisRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java

@@ -129,7 +129,7 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
                 lastVideoId = t.getValue();
                 videoMap.put(NumberUtils.toLong(t.getValue(), 0), t.getScore());
             }
-            FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+            FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
 
             FilterResult filterResult = filterService.filter(filterParam);
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java

@@ -56,7 +56,7 @@ public abstract class AbstractVideoRecallStrategy implements RecallStrategy {
                 v -> NumberUtils.toDouble(v.get(1)));
         List<Video> results = new ArrayList<>();
 
-        FilterResult filterResult = filterService.filter(FilterParamFactory.create(param, Lists.newArrayList(videoScoreMap.keySet())));
+        FilterResult filterResult = filterService.filter(FilterParamFactory.create(param, Lists.newArrayList(videoScoreMap.keySet()), pushFrom(), videoScoreMap));
 
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             filterResult.getVideoIds().stream().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AppFallbackRecallStrategy.java

@@ -54,7 +54,7 @@ public class AppFallbackRecallStrategy implements RecallStrategy {
         List<Long> videoIdList = Arrays.asList(videoIds.split(",")).stream()
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
-        FilterParam filterParam = FilterParamFactory.create(param, videoIdList);
+        FilterParam filterParam = FilterParamFactory.create(param, videoIdList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/BlessRecallStrategy.java

@@ -58,7 +58,7 @@ public class BlessRecallStrategy implements RecallStrategy {
         for (Pair<Long, Double> v : result) {
             videoMap.put(v.getLeft(), v.getRight());
         }
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerHeadRovnRecallStrategy.java

@@ -102,7 +102,7 @@ public class ChannelLayerHeadRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ChannelLayerRovnRecallStrategy.java

@@ -93,7 +93,7 @@ public class ChannelLayerRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnAllRovRecallStrategy.java

@@ -85,7 +85,7 @@ public class CityRovnAllRovRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/CityRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class CityRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java

@@ -111,7 +111,7 @@ public class DouHotFlowPoolRecallStrategy extends AbstractFlowPoolWithLevelRecal
             videoIdAndFlowPoolMap.put(Long.valueOf(split[0]), split[1]);
         }
 
-        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(videoIdAndFlowPoolMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(videoIdAndFlowPoolMap.keySet()), pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
 
         // 对视频随机打断

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomson.java

@@ -145,7 +145,7 @@ public class FlowPoolWithLevelRecallStrategyTomson extends AbstractFlowPoolWithL
                 ));
 
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()), pushFrom(), resultmap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate1STRRecallStrategy.java

@@ -84,7 +84,7 @@ public class HeadCate1STRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2AndChannelRovRecallStrategy.java

@@ -82,7 +82,7 @@ public class HeadCate2AndChannelRovRecallStrategy implements RecallStrategy {
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
 
-        FilterParam filterParam = FilterParamFactory.create(param, allVid);
+        FilterParam filterParam = FilterParamFactory.create(param, allVid, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         Set<Long> filterVids = new HashSet<>(filterResult.getVideoIds());
         filterVids.remove(param.getVideoId());

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2RovRecallStrategy.java

@@ -80,7 +80,7 @@ public class HeadCate2RovRecallStrategy implements RecallStrategy {
                 .map(Long::parseLong)
                 .collect(Collectors.toList());
 
-        FilterParam filterParam = FilterParamFactory.create(param, allVid);
+        FilterParam filterParam = FilterParamFactory.create(param, allVid, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         Set<Long> filterVids = new HashSet<>(filterResult.getVideoIds());
         filterVids.remove(param.getVideoId());

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadCate2STRRecallStrategy.java

@@ -84,7 +84,7 @@ public class HeadCate2STRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate1RecallStrategy.java

@@ -85,7 +85,7 @@ public class HeadProvinceCate1RecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HeadProvinceCate2RecallStrategy.java

@@ -85,7 +85,7 @@ public class HeadProvinceCate2RecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/HotReturnUvRecallStrategy.java

@@ -86,7 +86,7 @@ public class HotReturnUvRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRosRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceRosRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/PrioriProvinceStrRecallStrategy.java

@@ -85,7 +85,7 @@ public class PrioriProvinceStrRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceRovnRecallStrategy.java

@@ -85,7 +85,7 @@ public class ProvinceRovnRecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/ProvinceSTRRecallStrategy.java

@@ -78,7 +78,7 @@ public class ProvinceSTRRecallStrategy implements RecallStrategy {
     }
 
     private void fillVideoResult(RecallParam param, List<Long> vidList, List<Video> videosResult) {
-        FilterParam filterParam = FilterParamFactory.create(param, vidList);
+        FilterParam filterParam = FilterParamFactory.create(param, vidList, pushFrom(), null);
         FilterResult filterResult = filterService.filter(filterParam);
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
             List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyROS.java

@@ -50,7 +50,7 @@ public class RegionRealtimeRecallStrategyROS implements RecallStrategy {
         }
         long t1 = System.currentTimeMillis();
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1.java

@@ -51,7 +51,7 @@ public class RegionRealtimeRecallStrategyV1 implements RecallStrategy {
         }
         long t1 = System.currentTimeMillis();
         // 3 召回内部过滤
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         List<Long> videosResultId = new ArrayList<>();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV1AllRov.java

@@ -44,7 +44,7 @@ public class RegionRealtimeRecallStrategyV1AllRov implements RecallStrategy {
             videoMap.put(v.getLeft(), v.getRight());
         }
 
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionRealtimeRecallStrategyV7LongTermV1.java

@@ -42,7 +42,7 @@ public class RegionRealtimeRecallStrategyV7LongTermV1 implements RecallStrategy
         for (Pair<Long, Double> v : result) {
             videoMap.put(v.getLeft(), v.getRight());
         }
-        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()));
+        FilterParam filterParam = FilterParamFactory.create(param, Lists.newArrayList(videoMap.keySet()), pushFrom(), videoMap);
         FilterResult filterResult = filterService.filter(filterParam);
         List<Video> videosResult = new ArrayList<>();
         if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRosnRecallStrategy.java

@@ -49,7 +49,7 @@ public class SceneCFRosnRecallStrategy implements RecallStrategy {
             }
             Map<Long, Double> vid2Score = getIdScoreMap(headVid, redisValue);
             List<Long> vids = new ArrayList<>(vid2Score.keySet());
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), vid2Score);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SceneCFRovnRecallStrategy.java

@@ -49,7 +49,7 @@ public class SceneCFRovnRecallStrategy implements RecallStrategy {
             }
             Map<Long, Double> vid2Score = getIdScoreMap(headVid, redisValue);
             List<Long> vids = new ArrayList<>(vid2Score.keySet());
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), vid2Score);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 filterResult.getVideoIds().forEach(vid -> {

+ 3 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SocialI2IBasicRecallStrategy.java

@@ -61,17 +61,15 @@ public abstract class SocialI2IBasicRecallStrategy implements RecallStrategy {
 
             List<Long> videoIds = recallPair.stream().map(Pair::getKey).collect(Collectors.toList());
 
+            Map<Long, Double> videoScoreMap = recallPair.stream()
+                    .collect(Collectors.toMap(Pair::getKey, Pair::getValue, (o1, o2) -> o1));
 
             // 视频过滤
-            FilterParam filterParam = FilterParamFactory.create(param, videoIds);
+            FilterParam filterParam = FilterParamFactory.create(param, videoIds, pushFrom(), videoScoreMap);
             FilterResult filterResult = filterService.filter(filterParam);
             if (Objects.isNull(filterResult) || CollectionUtils.isEmpty(filterResult.getVideoIds())) {
                 return videos;
             }
-
-            // 返回结果
-            Map<Long, Double> videoScoreMap = recallPair.stream()
-                    .collect(Collectors.toMap(Pair::getKey, Pair::getValue, (o1, o2) -> o1));
             for (Long videoId : filterResult.getVideoIds()) {
                 Video video = new Video();
                 video.setVideoId(videoId);

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate1RecallStrategy.java

@@ -50,7 +50,7 @@ public class UserCate1RecallStrategy implements RecallStrategy {
                 if (!keys.isEmpty()) {
                     List<String> values = redisTemplate.opsForValue().multiGet(keys);
                     List<Long> ids = recall(param.getVideoId(), values);
-                    FilterParam filterParam = FilterParamFactory.create(param, ids);
+                    FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
                     FilterResult filterResult = filterService.filter(filterParam);
                     if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                         List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserCate2RecallStrategy.java

@@ -50,7 +50,7 @@ public class UserCate2RecallStrategy implements RecallStrategy {
                 if (!keys.isEmpty()) {
                     List<String> values = redisTemplate.opsForValue().multiGet(keys);
                     List<Long> ids = recall(param.getVideoId(), values);
-                    FilterParam filterParam = FilterParamFactory.create(param, ids);
+                    FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
                     FilterResult filterResult = filterService.filter(filterParam);
                     if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                         List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/UserDeconstructionKeywordsRecallStrategy.java

@@ -58,7 +58,7 @@ public class UserDeconstructionKeywordsRecallStrategy implements RecallStrategy
             }
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> vids = this.recall(param.getVideoId(), values);
-            FilterParam filterParam = FilterParamFactory.create(param, vids);
+            FilterParam filterParam = FilterParamFactory.create(param, vids, pushFrom(), null);
 
             FilterResult filterResult = filterService.filter(filterParam);
             if (Objects.isNull(filterResult) || CollectionUtils.isEmpty(filterResult.getVideoIds())) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearReturnCate2RecallStrategy.java

@@ -167,7 +167,7 @@ public class YearReturnCate2RecallStrategy implements RecallStrategy {
             List<Long> ids = pair.getLeft();
             Map<Long, Double> scoresMap = pair.getRight();
             if (null != ids && null != scoresMap && !ids.isEmpty()) {
-                FilterParam filterParam = FilterParamFactory.create(param, ids);
+                FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
                 FilterResult filterResult = filterService.filter(filterParam);
                 if (null != filterResult && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                     filterResult.getVideoIds().forEach(vid -> {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate1RecallStrategy.java

@@ -81,7 +81,7 @@ public class YearShareCate1RecallStrategy implements RecallStrategy {
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> ids = recall(param.getVideoId(), values);
 
-            FilterParam filterParam = FilterParamFactory.create(param, ids);
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 List<Long> filterIds = filterResult.getVideoIds();

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearShareCate2RecallStrategy.java

@@ -80,7 +80,7 @@ public class YearShareCate2RecallStrategy implements RecallStrategy {
             List<String> values = redisTemplate.opsForValue().multiGet(keys);
             List<Long> ids = recall(param.getVideoId(), values);
 
-            FilterParam filterParam = FilterParamFactory.create(param, ids);
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), null);
             FilterResult filterResult = filterService.filter(filterParam);
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 List<Long> filterIds = filterResult.getVideoIds();