Переглянути джерело

Merge branch 'feature/zhangbo_coldstart' of algorithm/recommend-server into master

zhangbo 1 рік тому
батько
коміт
94222acd51
15 змінених файлів з 394 додано та 15 видалено
  1. 1 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/RecommendParam.java
  2. 6 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  3. 1 2
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankRouter.java
  4. 1 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java
  5. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV2.java
  6. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV3.java
  7. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV4.java
  8. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV5.java
  9. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV536.java
  10. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV547.java
  11. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategyFlowThompsonModel.java
  12. 1 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java
  13. 54 12
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java
  14. 145 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyFilterDigit.java
  15. 164 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomsonFilterDigit.java

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/RecommendParam.java

@@ -28,6 +28,7 @@ public class RecommendParam {
     private String hRuleKey;
 
     private int flowPoolId;
+    private int lastDigit;
     private String flowPoolAbtestGroup;
     private String rankKeyPrefix;
     private int appType;

+ 6 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -330,7 +330,7 @@ public class RecommendService {
         }
 
         // 流量池分发实验组划分
-        int flowPoolIdChoice = flowPoolIds.get(RandomUtils.nextInt(0, flowPoolIds.size() - 1));
+        int flowPoolIdChoice = flowPoolIds.get(RandomUtils.nextInt(0, flowPoolIds.size()));
         param.setFlowPoolId(flowPoolIdChoice);
         param.setFlowPoolAbtestGroup("control_group");
         Map<String, List<Integer>> flowPoolConfig = flowPoolConfigService.getFlowPoolConfig();
@@ -340,6 +340,10 @@ public class RecommendService {
             }
         }
 
+        // @desc 新的流量池分发实验组划分,每个尾号不同策略分组。 @time 20240318 @author 张博
+        int lastDigit = RandomUtils.nextInt(0, 10);
+        param.setLastDigit(lastDigit);
+
         // 风险过滤
         List<String> keysRisk = new ArrayList<>();
         keysRisk.add("RISK_SHIELD_FILTER_RULE_V1_JSON");
@@ -495,6 +499,7 @@ public class RecommendService {
 
         recallParam.setVideoId(param.getVideoId());
         recallParam.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
+        recallParam.setLastDigit(param.getLastDigit());
 
         String provinceCode = StringUtils.isNotBlank(param.getProvinceCode())
                 ? param.getProvinceCode()

+ 1 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankRouter.java

@@ -49,6 +49,7 @@ public class RankRouter {
         }
         switch (abCode) {
             case "60097":
+            case "60121": // 536
                 return rankStrategy4Density.rank(param);
             case "60106":
                 return rankStrategy4Rankv2Model.rank(param);
@@ -69,8 +70,6 @@ public class RankRouter {
                 return rankStrategyFlowThompsonModel.rank(param);
             case "60120": // 576
                 return rankStrategy4RegionMerge.rank(param);
-            case "60121": // 536
-                return rankStrategy4RegionMergeModelV536.rank(param);
             case "60122": // 537
                 return rankStrategy4RegionMergeModelV2.rank(param);
             case "60123": // 541

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -138,6 +138,7 @@ public class RankService {
                 || param.getAbCode().equals("60095")
                 || param.getAbCode().equals("60096")
                 || param.getAbCode().equals("60097")
+                || param.getAbCode().equals("60121")
                 || param.getAbCode().equals("60098")
                 || param.getAbCode().equals("60103")
                 || param.getAbCode().equals("60104")

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV2.java

@@ -77,6 +77,9 @@ public class RankStrategy4RegionMergeModelV2 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV3.java

@@ -81,6 +81,9 @@ public class RankStrategy4RegionMergeModelV3 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV4.java

@@ -71,6 +71,9 @@ public class RankStrategy4RegionMergeModelV4 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV5.java

@@ -72,6 +72,9 @@ public class RankStrategy4RegionMergeModelV5 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV536.java

@@ -70,6 +70,9 @@ public class RankStrategy4RegionMergeModelV536 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV547.java

@@ -71,6 +71,9 @@ public class RankStrategy4RegionMergeModelV547 extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategyFlowThompsonModel.java

@@ -59,6 +59,9 @@ public class RankStrategyFlowThompsonModel extends RankService {
         Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
                 .filter(d -> d.getPushFrom().equals(pushFrom))
                 .findFirst();
+        if (!data.isPresent()){
+            return Collections.emptyList();
+        }
         List<Video> videoList = data.get().getVideos();
         if (videoList == null) {
             return Collections.emptyList();

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -25,6 +25,7 @@ public class RecallParam {
     private String abCode;
     private int size;
     private String flowPoolAbtestGroup;
+    private int lastDigit;
     private Long videoId;
     private String uid;
     private boolean specialRecommend;

+ 54 - 12
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum;
 import com.tzld.piaoquan.recommend.server.model.Video;
@@ -29,6 +30,8 @@ public class RecallService implements ApplicationContextAware {
     private final Map<String, RecallStrategy> strategyMap = new HashMap<>();
     private ApplicationContext applicationContext;
     private final ExecutorService pool = ThreadPoolFactory.recallPool();
+    @ApolloJsonValue("${last.digit.abcode:{}}")
+    protected Map<Integer, String> lastDigitAbcode;
 
     @PostConstruct
     public void init() {
@@ -117,20 +120,59 @@ public class RecallService implements ApplicationContextAware {
                     strategies.addAll(getRegionRecallStrategy(param));
             }
             //2:通过“流量池标记”控制“流量池召回子策略” 其中有9组会走EXPERIMENTAL_FLOW_SET_LEVEL 有1组会走EXPERIMENTAL_FLOW_SET_LEVEL_SCORE
-            if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL)) {
-                strategies.add(strategyMap.get(QuickFlowPoolWithLevelRecallStrategy.class.getSimpleName()));
-                if ("60126".equals(abCode)){
-                    strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyTomson.class.getSimpleName()));
-                }else {
-                    strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+            if ("60111".equals(abCode) || "60112".equals(abCode)){
+                int lastDigit = param.getLastDigit();
+                String lastDigitAB = lastDigitAbcode != null? lastDigitAbcode.getOrDefault(lastDigit, "default"): "default";
+                switch (lastDigitAB){
+                    case "random":
+                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyFilterDigit.class.getSimpleName()));
+                        break;
+                    case "tomson":
+                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyTomsonFilterDigit.class.getSimpleName()));
+                        break;
+                    case "score":
+                        strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+                        break;
+                    default:
+                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyFilterDigit.class.getSimpleName()));
+                        break;
+                }
+            }else{
+                if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL)) {
+                    strategies.add(strategyMap.get(QuickFlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+                    if ("60126".equals(abCode)){
+                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyTomson.class.getSimpleName()));
+                    }else {
+                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+                    }
+                } else if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL_SCORE)) {
+                    strategies.add(strategyMap.get(QuickFlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+                    strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+                } else {
+                    strategies.add(strategyMap.get(QuickFlowPoolWithScoreRecallStrategy.class.getSimpleName()));
+                    strategies.add(strategyMap.get(FlowPoolWithScoreRecallStrategy.class.getSimpleName()));
                 }
-            } else if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL_SCORE)) {
-                strategies.add(strategyMap.get(QuickFlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
-                strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
-            } else {
-                strategies.add(strategyMap.get(QuickFlowPoolWithScoreRecallStrategy.class.getSimpleName()));
-                strategies.add(strategyMap.get(FlowPoolWithScoreRecallStrategy.class.getSimpleName()));
             }
+//            if ("60126".equals(abCode)){
+//                strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyTomsonFilterDigit.class.getSimpleName()));
+//            }else{
+//                int lastDigit = param.getLastDigit();
+//                String lastDigitAB = lastDigitAbcode != null? lastDigitAbcode.getOrDefault(lastDigit, "default"): "default";
+//                switch (lastDigitAB){
+//                    case "random":
+//                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyFilterDigit.class.getSimpleName()));
+//                        break;
+//                    case "tomson":
+//                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyTomsonFilterDigit.class.getSimpleName()));
+//                        break;
+//                    case "score":
+//                        strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+//                        break;
+//                    default:
+//                        strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategyFilterDigit.class.getSimpleName()));
+//                        break;
+//                }
+//            }
         }
 
         //3:通过“abcode”控制“召回子策略”

+ 145 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyFilterDigit.java

@@ -0,0 +1,145 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConfigService;
+import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT;
+
+/**
+ * @author dyp
+ */
+@Service
+@Slf4j
+public class FlowPoolWithLevelRecallStrategyFilterDigit extends AbstractFlowPoolWithLevelRecallStrategy {
+
+    @Autowired
+    private FlowPoolConfigService flowPoolConfigService;
+
+    @Override
+    Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
+        //# 1. 获取流量池各层级分发概率权重
+        Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
+
+        // 2. 判断各层级是否有视频需分发
+        List<LevelWeight> availableLevels = new ArrayList<>();
+        for (Map.Entry<String, Double> entry : levelWeightMap.entrySet()) {
+            String levelKey = String.format(KEY_WITH_LEVEL_FORMAT, param.getAppType(), entry.getKey());
+            if (Boolean.TRUE.equals(redisTemplate.hasKey(levelKey))) {
+                LevelWeight lw = new LevelWeight();
+                lw.setLevel(entry.getKey());
+                lw.setLevelKey(levelKey);
+                lw.setWeight(entry.getValue());
+                availableLevels.add(lw);
+            }
+        }
+        if (CollectionUtils.isEmpty(availableLevels)) {
+            return Pair.of("", "");
+        }
+        // 3. 根据可分发层级权重设置分发概率
+        availableLevels.sort(Comparator.comparingDouble(LevelWeight::getWeight));
+
+        double weightSum = availableLevels.stream().mapToDouble(LevelWeight::getWeight).sum();
+        BigDecimal weightSumBD = new BigDecimal(weightSum);
+        double level_p_low = 0;
+        double weight_temp = 0;
+        double level_p_up = 0;
+        Map<String, LevelP> level_p_mapping = new HashMap<>();
+        for (LevelWeight lw : availableLevels) {
+            BigDecimal bd = new BigDecimal(weight_temp + lw.getWeight());
+            level_p_up = bd.divide(weightSumBD, 2, RoundingMode.HALF_UP).doubleValue();
+            LevelP levelP = new LevelP();
+            levelP.setMin(level_p_low);
+            levelP.setMax(level_p_up);
+            levelP.setLevelKey(lw.getLevelKey());
+            level_p_mapping.put(lw.level, levelP);
+            level_p_low = level_p_up;
+
+            weight_temp += lw.getWeight();
+        }
+
+        // 4. 随机生成[0,1)之间数,返回相应概率区间的key
+        double random_p = RandomUtils.nextDouble(0, 1);
+        for (Map.Entry<String, LevelP> entry : level_p_mapping.entrySet()) {
+            if (random_p >= entry.getValue().getMin()
+                    && random_p <= entry.getValue().getMax()) {
+                return Pair.of(entry.getValue().getLevelKey(), entry.getKey());
+            }
+        }
+        return Pair.of("", "");
+    }
+
+    @Data
+    static class LevelWeight {
+        private String level;
+        private String levelKey;
+        private Double weight;
+    }
+
+    @Data
+    static class LevelP {
+        private String levelKey;
+        private double min;
+        private double max;
+    }
+
+    @Override
+    public String pushFrom() {
+        return FlowPoolConstants.PUSH_FORM;
+    }
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
+        String flowPoolKey = flowPoolKeyAndLevel.getLeft();
+        String level = flowPoolKeyAndLevel.getRight();
+        int getSize = param.getSize() * 5;
+        List<Video> results = new ArrayList<>();
+        List<String> list = Objects.requireNonNull(redisTemplate.opsForSet().members(flowPoolKey)).stream().filter(o ->
+                NumberUtils.toLong(o.split("-")[0], 0) % 10 == param.getLastDigit()).distinct().collect(Collectors.toList());
+        Collections.shuffle(list);
+        List<String> data = list.subList(0, Math.min(getSize, list.size()));
+        if (CollectionUtils.isEmpty(data)) {
+            return null;
+        }
+        Map<Long, String> videoFlowPoolMap = new LinkedHashMap<>();
+        for (String value : data) {
+            String[] values = value.split("-");
+            videoFlowPoolMap.put(NumberUtils.toLong(values[0], 0), values[1]);
+        }
+
+        FilterResult filterResult = filterService.filter(FilterParamFactory.create(param, videoFlowPoolMap));
+
+        if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+            filterResult.getVideoIds().forEach(vid -> {
+                Video recallData = new Video();
+                recallData.setVideoId(vid);
+                recallData.setAbCode(param.getAbCode());
+                recallData.setRovScore(RandomUtils.nextDouble(0, 100));
+                recallData.setPushFrom(pushFrom());
+                recallData.setFlowPool(videoFlowPoolMap.get(vid));
+                recallData.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
+                recallData.setLevel(level);
+                results.add(recallData);
+            });
+        }
+
+        return results.subList(0, Math.min(results.size(), param.getSize()));
+    }
+}

+ 164 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategyTomsonFilterDigit.java

@@ -0,0 +1,164 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConfigService;
+import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerUtils;
+import com.tzld.piaoquan.recommend.server.service.score4recall.ScorerPipeline4Recall;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT;
+
+/**
+ * @author zhangbo
+ */
+@Service
+@Slf4j
+public class FlowPoolWithLevelRecallStrategyTomsonFilterDigit extends AbstractFlowPoolWithLevelRecallStrategy {
+
+    @Autowired
+    private FlowPoolConfigService flowPoolConfigService;
+
+    @Override
+    Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
+        //# 1. 获取流量池各层级分发概率权重
+        Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
+
+        // 2. 判断各层级是否有视频需分发
+        List<LevelWeight> availableLevels = new ArrayList<>();
+        for (Map.Entry<String, Double> entry : levelWeightMap.entrySet()) {
+            String levelKey = String.format(KEY_WITH_LEVEL_FORMAT, param.getAppType(), entry.getKey());
+            if (Boolean.TRUE.equals(redisTemplate.hasKey(levelKey))) {
+                LevelWeight lw = new LevelWeight();
+                lw.setLevel(entry.getKey());
+                lw.setLevelKey(levelKey);
+                lw.setWeight(entry.getValue());
+                availableLevels.add(lw);
+            }
+        }
+        if (CollectionUtils.isEmpty(availableLevels)) {
+            return Pair.of("", "");
+        }
+
+        // 3. 根据可分发层级权重设置分发概率
+        availableLevels.sort(Comparator.comparingDouble(LevelWeight::getWeight));
+
+        double weightSum = availableLevels.stream().mapToDouble(o -> o.getWeight()).sum();
+        BigDecimal weightSumBD = new BigDecimal(weightSum);
+        double level_p_low = 0;
+        double weight_temp = 0;
+        double level_p_up = 0;
+        Map<String, LevelP> level_p_mapping = new HashMap<>();
+        for (LevelWeight lw : availableLevels) {
+            BigDecimal bd = new BigDecimal(weight_temp + lw.getWeight());
+            level_p_up = bd.divide(weightSumBD, 2, RoundingMode.HALF_UP).doubleValue();
+            LevelP levelP = new LevelP();
+            levelP.setMin(level_p_low);
+            levelP.setMax(level_p_up);
+            levelP.setLevelKey(lw.getLevelKey());
+            level_p_mapping.put(lw.level, levelP);
+            level_p_low = level_p_up;
+
+            weight_temp += lw.getWeight();
+        }
+
+        // 4. 随机生成[0,1)之间数,返回相应概率区间的key
+        double random_p = RandomUtils.nextDouble(0, 1);
+        for (Map.Entry<String, LevelP> entry : level_p_mapping.entrySet()) {
+            if (random_p >= entry.getValue().getMin()
+                    && random_p <= entry.getValue().getMax()) {
+                return Pair.of(entry.getValue().getLevelKey(), entry.getKey());
+            }
+        }
+        return Pair.of("", "");
+    }
+
+    @Data
+    static class LevelWeight {
+        private String level;
+        private String levelKey;
+        private Double weight;
+    }
+
+    @Data
+    static class LevelP {
+        private String levelKey;
+        private double min;
+        private double max;
+    }
+
+    @Override
+    public String pushFrom() {
+        return FlowPoolConstants.PUSH_FORM;
+    }
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
+        String flowPoolKey = flowPoolKeyAndLevel.getLeft();
+        String level = flowPoolKeyAndLevel.getRight();
+        List<String> data = Objects.requireNonNull(redisTemplate.opsForSet().members(flowPoolKey)).stream().filter(o ->
+                NumberUtils.toLong(o.split("-")[0], 0) % 10 == param.getLastDigit()).distinct().collect(Collectors.toList());
+
+        if (CollectionUtils.isEmpty(data)) {
+            return null;
+        }
+        Map<String, String> videoFlowPoolMap = new LinkedHashMap<>();
+        Map<Long, String> videoFlowPoolMap_ = new LinkedHashMap<>();
+        for (String value : data) {
+            String[] values = value.split("-");
+            videoFlowPoolMap.put(values[0], values[1]);
+            videoFlowPoolMap_.put(NumberUtils.toLong(values[0], 0), values[1]);
+        }
+        ScorerPipeline4Recall pipeline = ScorerUtils.getScorerPipeline4Recall("feeds_recall_config_tomson.conf");
+        List<List<Pair<Long, Double>>> results = pipeline.recall(videoFlowPoolMap);
+        List<Pair<Long, Double>> result = results.get(0);
+        Map<Long, Double> resultmap = result.stream()
+                .collect(Collectors.toMap(
+                        Pair::getLeft, // 键是Pair的left值
+                        Pair::getRight, // 值是Pair的right值
+                        (existingValue, newValue) -> existingValue, // 如果键冲突,选择保留现有的值(或者你可以根据需要定义其他合并策略)
+                        LinkedHashMap::new // 使用LinkedHashMap来保持插入顺序(如果需要的话)
+                ));
+        // 3 召回内部过滤
+        FilterParam filterParam = FilterParamFactory.create(param, result.stream()
+                .map(Pair::getLeft)
+                .collect(Collectors.toList()));
+        filterParam.setForceTruncation(10000);
+        filterParam.setConcurrent(true);
+        filterParam.setNotUsePreView(false);
+        FilterResult filterResult = filterService.filter(filterParam);
+        List<Video> videosResult = new ArrayList<>();
+        if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+            filterResult.getVideoIds().forEach(vid -> {
+                Video recallData = new Video();
+                recallData.setVideoId(vid);
+                recallData.setAbCode(param.getAbCode());
+                recallData.setRovScore(resultmap.getOrDefault(vid, 0.0));
+                recallData.setPushFrom(pushFrom());
+                recallData.setFlowPool(videoFlowPoolMap_.get(vid));
+                recallData.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
+                recallData.setLevel(level);
+                videosResult.add(recallData);
+            });
+        }
+        videosResult.sort(Comparator.comparingDouble(o -> -o.getRovScore()));
+        return videosResult;
+    }
+}