|  | @@ -1,18 +1,31 @@
 | 
	
		
			
				|  |  |  package com.tzld.piaoquan.recommend.server.service.recall.strategy;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 | 
	
		
			
				|  |  | +import com.google.common.collect.Lists;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.model.Video;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConfigService;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.service.score.ScorerUtils;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.service.score4recall.ScorerPipeline4Recall;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 | 
	
		
			
				|  |  |  import lombok.Data;
 | 
	
		
			
				|  |  | +import lombok.extern.slf4j.Slf4j;
 | 
	
		
			
				|  |  |  import org.apache.commons.collections4.CollectionUtils;
 | 
	
		
			
				|  |  |  import org.apache.commons.lang3.RandomUtils;
 | 
	
		
			
				|  |  | +import org.apache.commons.lang3.math.NumberUtils;
 | 
	
		
			
				|  |  |  import org.apache.commons.lang3.tuple.Pair;
 | 
	
		
			
				|  |  |  import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | +import org.springframework.data.redis.core.ZSetOperations;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.math.BigDecimal;
 | 
	
		
			
				|  |  |  import java.math.RoundingMode;
 | 
	
		
			
				|  |  |  import java.util.*;
 | 
	
		
			
				|  |  | +import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_SCORE_FORMAT;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -21,7 +34,8 @@ import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConsta
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  @Service
 | 
	
		
			
				|  |  |  public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLevelScoreRecallStrategy {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +    @ApolloJsonValue("${ifOneLevelRandom:true}")
 | 
	
		
			
				|  |  | +    private boolean ifOneLevelRandom;
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      private FlowPoolConfigService flowPoolConfigService;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -29,11 +43,12 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
 | 
	
		
			
				|  |  |      Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
 | 
	
		
			
				|  |  |          //# 1. 获取流量池各层级分发概率权重
 | 
	
		
			
				|  |  |          Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          // 2. 判断各层级是否有视频需分发
 | 
	
		
			
				|  |  |          List<LevelWeight> availableLevels = new ArrayList<>();
 | 
	
		
			
				|  |  |          for (Map.Entry<String, Double> entry : levelWeightMap.entrySet()) {
 | 
	
		
			
				|  |  |              String levelKey = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, param.getAppType(), entry.getKey());
 | 
	
		
			
				|  |  | -            if (redisTemplate.hasKey(levelKey)) {
 | 
	
		
			
				|  |  | +            if (Boolean.TRUE.equals(redisTemplate.hasKey(levelKey))) {
 | 
	
		
			
				|  |  |                  LevelWeight lw = new LevelWeight();
 | 
	
		
			
				|  |  |                  lw.setLevel(entry.getKey());
 | 
	
		
			
				|  |  |                  lw.setLevelKey(levelKey);
 | 
	
	
		
			
				|  | @@ -41,6 +56,8 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
 | 
	
		
			
				|  |  |                  availableLevels.add(lw);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        //log.info("availableLevels {}", JSONUtils.toJson(availableLevels));
 | 
	
		
			
				|  |  |          if (CollectionUtils.isEmpty(availableLevels)) {
 | 
	
		
			
				|  |  |              return Pair.of("", "");
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -52,7 +69,7 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
 | 
	
		
			
				|  |  |          BigDecimal weightSumBD = new BigDecimal(weightSum);
 | 
	
		
			
				|  |  |          double level_p_low = 0;
 | 
	
		
			
				|  |  |          double weight_temp = 0;
 | 
	
		
			
				|  |  | -        double level_p_up;
 | 
	
		
			
				|  |  | +        double level_p_up = 0;
 | 
	
		
			
				|  |  |          Map<String, LevelP> level_p_mapping = new HashMap<>();
 | 
	
		
			
				|  |  |          for (LevelWeight lw : availableLevels) {
 | 
	
		
			
				|  |  |              BigDecimal bd = new BigDecimal(weight_temp + lw.getWeight());
 | 
	
	
		
			
				|  | @@ -68,7 +85,6 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // 4. 随机生成[0,1)之间数,返回相应概率区间的key
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |          double random_p = RandomUtils.nextDouble(0, 1);
 | 
	
		
			
				|  |  |          for (Map.Entry<String, LevelP> entry : level_p_mapping.entrySet()) {
 | 
	
		
			
				|  |  |              if (random_p >= entry.getValue().getMin()
 | 
	
	
		
			
				|  | @@ -97,4 +113,68 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
 | 
	
		
			
				|  |  |      public String pushFrom() {
 | 
	
		
			
				|  |  |          return FlowPoolConstants.PUSH_FORM;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public List<Video> recall(RecallParam param) {
 | 
	
		
			
				|  |  | +        Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
 | 
	
		
			
				|  |  | +        String flowPoolKey = flowPoolKeyAndLevel.getLeft();
 | 
	
		
			
				|  |  | +        String level = flowPoolKeyAndLevel.getRight();
 | 
	
		
			
				|  |  | +        Set<ZSetOperations.TypedTuple<String>> data = redisTemplate.opsForZSet().reverseRangeWithScores(flowPoolKey, 0, 1000);
 | 
	
		
			
				|  |  | +        if (CollectionUtils.isEmpty(data)) {
 | 
	
		
			
				|  |  | +            return null;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        Map<String, String> videoFlowPoolMap = new LinkedHashMap<>();
 | 
	
		
			
				|  |  | +        Map<Long, String> videoFlowPoolMap_ = new LinkedHashMap<>();
 | 
	
		
			
				|  |  | +        for (ZSetOperations.TypedTuple<String> value : data) {
 | 
	
		
			
				|  |  | +            String[] values = Objects.requireNonNull(value.getValue()).split("-");
 | 
	
		
			
				|  |  | +            videoFlowPoolMap.put(values[0], values[1]);
 | 
	
		
			
				|  |  | +            videoFlowPoolMap_.put(NumberUtils.toLong(values[0], 0),  values[1]);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        Map<Long, Double> resultmap = null;
 | 
	
		
			
				|  |  | +        if ("1".equals(level) && ifOneLevelRandom) {
 | 
	
		
			
				|  |  | +            // 流量池一层改为全随机
 | 
	
		
			
				|  |  | +            int limitSize = 60;
 | 
	
		
			
				|  |  | +            List<Long> keyList = new ArrayList<>(videoFlowPoolMap_.keySet());
 | 
	
		
			
				|  |  | +            Collections.shuffle(keyList);
 | 
	
		
			
				|  |  | +            resultmap = keyList.stream().limit(limitSize).collect(Collectors.toMap(
 | 
	
		
			
				|  |  | +                    key -> key,
 | 
	
		
			
				|  |  | +                    key -> Math.random()
 | 
	
		
			
				|  |  | +            ));
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +            ScorerPipeline4Recall pipeline = ScorerUtils.getScorerPipeline4Recall("feeds_recall_config_tomson.conf");
 | 
	
		
			
				|  |  | +            List<List<Pair<Long, Double>>> results = pipeline.recall(videoFlowPoolMap);
 | 
	
		
			
				|  |  | +            List<Pair<Long, Double>> result = results.get(0);
 | 
	
		
			
				|  |  | +            resultmap = result.stream()
 | 
	
		
			
				|  |  | +                    .collect(Collectors.toMap(
 | 
	
		
			
				|  |  | +                            Pair::getLeft, // 键是Pair的left值
 | 
	
		
			
				|  |  | +                            Pair::getRight, // 值是Pair的right值
 | 
	
		
			
				|  |  | +                            (existingValue, newValue) -> existingValue, // 如果键冲突,选择保留现有的值(或者你可以根据需要定义其他合并策略)
 | 
	
		
			
				|  |  | +                            LinkedHashMap::new // 使用LinkedHashMap来保持插入顺序(如果需要的话)
 | 
	
		
			
				|  |  | +                    ));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 3 召回内部过滤
 | 
	
		
			
				|  |  | +        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()));
 | 
	
		
			
				|  |  | +        filterParam.setForceTruncation(10000);
 | 
	
		
			
				|  |  | +        filterParam.setConcurrent(true);
 | 
	
		
			
				|  |  | +        filterParam.setNotUsePreView(false);
 | 
	
		
			
				|  |  | +        FilterResult filterResult = filterService.filter(filterParam);
 | 
	
		
			
				|  |  | +        List<Video> videosResult = new ArrayList<>();
 | 
	
		
			
				|  |  | +        if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
 | 
	
		
			
				|  |  | +            Map<Long, Double> finalResultmap = resultmap;
 | 
	
		
			
				|  |  | +            filterResult.getVideoIds().forEach(vid -> {
 | 
	
		
			
				|  |  | +                Video recallData = new Video();
 | 
	
		
			
				|  |  | +                recallData.setVideoId(vid);
 | 
	
		
			
				|  |  | +                recallData.setAbCode(param.getAbCode());
 | 
	
		
			
				|  |  | +                recallData.setRovScore(finalResultmap.getOrDefault(vid, 0.0));
 | 
	
		
			
				|  |  | +                recallData.setPushFrom(pushFrom());
 | 
	
		
			
				|  |  | +                recallData.setFlowPool(videoFlowPoolMap_.get(vid));
 | 
	
		
			
				|  |  | +                recallData.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
 | 
	
		
			
				|  |  | +                recallData.setLevel(level);
 | 
	
		
			
				|  |  | +                videosResult.add(recallData);
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        videosResult.sort(Comparator.comparingDouble(o -> -o.getRovScore()));
 | 
	
		
			
				|  |  | +        return videosResult;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |