Browse Source

feature_sharePlayCount

丁云鹏 1 year ago
parent
commit
5e1a0d0cba
16 changed files with 491 additions and 27 deletions
  1. 80 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/FlowPoolConfigService.java
  2. 10 17
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  3. 70 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FlowPoolFilterService.java
  4. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java
  5. 3 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java
  6. 122 3
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java
  7. 77 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java
  8. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java
  9. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup224HRegionRecallStrategy.java
  10. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup248HRegionRecallStrategy.java
  11. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup324HRegionRecallStrategy.java
  12. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup348HRegionRecallStrategy.java
  13. 97 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategy.java
  14. 22 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/QuickFlowPoolWithLevelRecallStrategy.java
  15. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Region24HRegionRecallStrategy.java
  16. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionHRegionRecallStrategy.java

+ 80 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/FlowPoolConfigService.java

@@ -0,0 +1,80 @@
+package com.tzld.piaoquan.recommend.server.service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author dyp
+ */
+@Service
+@Slf4j
+public class FlowPoolConfigService {
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    // {"control_group": [], "experimental_flow_set_level": [10, 11, 12, 13, 14, 15, 16, 17], "experimental_flow_set_level_score": [9, 18]}
+    private LoadingCache<String, Map<String, List<Integer>>> flowPoolConfigCache = CacheBuilder.newBuilder()
+            .maximumSize(10)
+            .refreshAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterAccess(600, TimeUnit.SECONDS)
+            .build(new CacheLoader<String, Map<String, List<Integer>>>() {
+                @Override
+                public Map<String, List<Integer>> load(String key) throws Exception {
+                    String value = redisTemplate.opsForValue().get(key);
+                    if (StringUtils.isEmpty(value)) {
+                        value = "{\"control_group\": [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],\n" +
+                                "                           \"experimental_flow_set_level\": [],\n" +
+                                "                           \"experimental_flow_set_level_score\": []}";
+                    }
+                    return JSONObject.parseObject(value, Map.class);
+                }
+            });
+
+    private LoadingCache<String, Map<String, Double>> levelWeightConfigCache = CacheBuilder.newBuilder()
+            .maximumSize(10)
+            .refreshAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterAccess(600, TimeUnit.SECONDS)
+            .build(new CacheLoader<String, Map<String, Double>>() {
+                @Override
+                public Map<String, Double> load(String key) throws Exception {
+                    String value = redisTemplate.opsForValue().get(key);
+                    if (StringUtils.isEmpty(value)) {
+                        value = "level_weight = {\"1\": 1, \"2\": 1, \"3\": 1, \"4\": 1, \"5\": 1, \"6\": 1}";
+                    }
+                    return JSONObject.parseObject(value, Map.class);
+                }
+            });
+
+    public Map<String, List<Integer>> getFlowPoolConfig() {
+        try {
+            return flowPoolConfigCache.get("flow:pool:abtest:config");
+        } catch (ExecutionException e) {
+            log.error("getFlowPoolConfig error", e);
+            return Collections.emptyMap();
+        }
+    }
+
+    public Map<String, Double> getLevelWeight() {
+        try {
+            return levelWeightConfigCache.get("flow:pool:level:recommend:weight");
+        } catch (ExecutionException e) {
+            log.error("getFlowPoolConfig error", e);
+            return Collections.emptyMap();
+        }
+    }
+}

+ 10 - 17
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -10,6 +10,8 @@ import com.tzld.piaoquan.recommend.server.gen.recommend.HomepageRecommendRequest
 import com.tzld.piaoquan.recommend.server.gen.recommend.HomepageRecommendResponse;
 import com.tzld.piaoquan.recommend.server.gen.recommend.Video;
 import com.tzld.piaoquan.recommend.server.model.param.HomepageRecommendParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallService;
 import com.tzld.piaoquan.recommend.server.util.DateUtils;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
@@ -49,22 +51,11 @@ public class RecommendService {
 
     private int[] flowPoolIdList = {7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
-    // {"control_group": [], "experimental_flow_set_level": [10, 11, 12, 13, 14, 15, 16, 17], "experimental_flow_set_level_score": [9, 18]}
-    private LoadingCache<String, Map<String, List<Integer>>> flowPoolConfigCache = CacheBuilder.newBuilder()
-            .maximumSize(10)
-            .refreshAfterWrite(600, TimeUnit.SECONDS)
-            .expireAfterWrite(600, TimeUnit.SECONDS)
-            .expireAfterAccess(600, TimeUnit.SECONDS)
-            .build(new CacheLoader<String, Map<String, List<Integer>>>() {
-                @Override
-                public Map<String, List<Integer>> load(String key) throws Exception {
-                    String value = redisTemplate.opsForValue().get(key);
-                    if (StringUtils.isEmpty(value)) {
-                        return Collections.emptyMap();
-                    }
-                    return JSONObject.parseObject(value, Map.class);
-                }
-            });
+    @Autowired
+    private FlowPoolConfigService flowPoolConfigService;
+
+    @Autowired
+    private RecallService recallService;
 
     @PostConstruct
     public void init() {
@@ -124,6 +115,8 @@ public class RecommendService {
 
     private List<Video> videoRecommendOld(HomepageRecommendRequest request, HomepageRecommendParam param) {
         // TODO
+        RecallParam recallParam = new RecallParam();
+        recallService.recall(recallParam);
         return null;
     }
 
@@ -215,7 +208,7 @@ public class RecommendService {
             int flowPoolIdChoice = flowPoolIdList[RandomUtils.nextInt(0, flowPoolIdList.length - 1)];
             param.setFlow_pool_abtest_group("control_group");
             try {
-                Map<String, List<Integer>> flowPoolConfig = flowPoolConfigCache.get("flow:pool:abtest:config");
+                Map<String, List<Integer>> flowPoolConfig = flowPoolConfigService.getFlowPoolConfig();
                 for (Map.Entry<String, List<Integer>> entry : flowPoolConfig.entrySet()) {
                     if (entry.getValue().contains(flowPoolIdChoice)) {
                         param.setFlow_pool_abtest_group(entry.getKey());

+ 70 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FlowPoolFilterService.java

@@ -0,0 +1,70 @@
+package com.tzld.piaoquan.recommend.server.service.filter;
+
+import com.tzld.piaoquan.recommend.server.service.FlowPoolConfigService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+@Service
+public class FlowPoolFilterService {
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+    @Autowired
+    private FlowPoolConfigService flowPoolConfigService;
+
+    @Autowired
+    private FilterService filterService;
+
+    public FilterResult filter(FilterParam param) {
+        FilterResult result = filterService.filter(param);
+        result.getVideoIds();
+        Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
+
+        //if self.level_weight is None:
+        //            level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+        //        else:
+        //            level_weight = self.level_weight
+        //        level_list = [level for level in level_weight]
+        //
+        //        check_result = []
+        //        for video_id in video_ids:
+        //            video_id = int(video_id)
+        //            for flow_pool in flow_pool_mapping.get(video_id, []):
+        //                # 判断是否有本地分发记录
+        //                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+        //                # 无记录
+        //                if cur_count is None:
+        //                    # videos.append({'videoId': video_id, 'flowPool': flow_pool})
+        //                    continue
+        //                # 本地分发数 cur_count > 0
+        //                elif cur_count > 0:
+        //                    check_result.append((video_id, flow_pool))
+        //                # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+        //                else:
+        //                    add_remove_log = False
+        //                    remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+        //                    self.redis_helper.del_keys(remain_count_key)
+        //                    value = '{}-{}'.format(video_id, flow_pool)
+        //                    for item in config_.APP_TYPE:
+        //                        for level in level_list:
+        //                            flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{config_.APP_TYPE.get(item)}:{level}"
+        //                            remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
+        //                            if remove_res > 0:
+        //                                add_remove_log = True
+        //                        quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
+        //                                              f":{config_.QUICK_FLOW_POOL_ID}"
+        //                        remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
+        //                                                                             values=(value, ))
+        //                        if remove_res > 0:
+        //                            add_remove_log = True
+        //                    if add_remove_log is True:
+        //                        log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
+        //        return check_result
+        return null;
+    }
+}

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -8,10 +8,13 @@ import lombok.Data;
 @Data
 public class RecallParam {
     private String provinceCode;
+    private String cityCode;
     private String mid;
     private String appType;
     private String dataKey;
     private String ruleKey;
     private String abCode;
     private int size;
+    private String flowPoolAbtestGroup;
+
 }

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java

@@ -17,5 +17,8 @@ public class RecallResult {
         private double rovScore;
         private String pushFrom;
         private String abCode;
+        private String flowPool;
+        private String level;
+        private String flowPoolAbtestGroup;
     }
 }

+ 122 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -1,17 +1,136 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup224HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup324HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Region24HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionHRegionRecallStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Service;
 
-import java.util.Map;
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * @author dyp
  */
 @Service
-public class RecallService {
+@Slf4j
+public class RecallService implements ApplicationContextAware {
+
+    private final Map<String, RecallStrategy> strategyMap = new HashMap<>();
+
+    private ApplicationContext applicationContext;
 
     @ApolloJsonValue("city_code")
-    private Map<String, String> cityCode;
+    private Set<String> cityCodes;
+
+    private ExecutorService pool;
+
+    @PostConstruct
+    public void init() {
+        // init thread pool
+        pool = new ThreadPoolExecutor(8, 32,
+                0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000),
+                new ThreadFactoryBuilder().setNameFormat("video-title-%d").build(), new ThreadPoolExecutor.AbortPolicy());
+
+        Map<String, RecallStrategy> type = applicationContext.getBeansOfType(RecallStrategy.class);
+        for (Map.Entry<String, RecallStrategy> entry : type.entrySet()) {
+            RecallStrategy value = entry.getValue();
+            strategyMap.put(value.getClass().getSimpleName(), value);
+        }
+
+    }
+
+    public void recall(RecallParam param){
+
+        // region recall
+        String provinceCode = StringUtils.isNotBlank(param.getProvinceCode())
+                ? param.getProvinceCode()
+                : "-1";
+        String cityCode = StringUtils.isNotBlank(param.getCityCode())
+                ? param.getCityCode()
+                : "-1";
+        String regionCode;
+        if (cityCodes.contains(cityCode)) {
+            regionCode = cityCode;
+        } else {
+            regionCode = provinceCode;
+        }
+
+        List<RecallStrategy> strategies = new ArrayList<>();
+        if (regionCode.equals("-1")) {
+            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
+        } else {
+            strategies.add(strategyMap.get(RegionHRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Region24HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
+        }
+
+
+        CountDownLatch cdl = new CountDownLatch(strategies.size());
+        List<Future<RecallResult>> recallResultFutures = new ArrayList<>(strategies.size());
+        for (RecallStrategy strategy : strategies) {
+            Future<RecallResult> future = pool.submit(() -> {
+                RecallResult result = strategy.recall(param);
+                cdl.countDown();
+                return result;
+            });
+            recallResultFutures.add(future);
+        }
+        try {
+            cdl.await(3000, TimeUnit.MILLISECONDS);
+
+        } catch (InterruptedException e) {
+            // 1s 以后返回
+        }
+        // region_recall_result_list = [i.get() for i in t]
+        //        # 将已获取到的视频按顺序去重合并
+        //        now_video_ids = []
+        //        recall_result = []
+        //        recall_num = size
+        //        for region_result in region_recall_result_list:
+        //            for video in region_result:
+        //                video_id = video.get('videoId')
+        //                if video_id not in now_video_ids:
+        //                    recall_result.append(video)
+        //                    now_video_ids.append(video_id)
+        //                    if len(recall_result) >= recall_num:
+        //                        break
+        //                    else:
+        //                        continue
+        //        return recall_result[:recall_num]
+
+
+        // # 对在流量池中存在的视频添加标记字段
+        //        result = []
+        //        for item in videos:
+        //            video_id = item['videoId']
+        //            t = [
+        //                gevent.spawn(self.get_video_flow_pool, video_id, True),
+        //                gevent.spawn(self.get_video_flow_pool, video_id, False)
+        //            ]
+        //            gevent.joinall(t)
+        //            flow_pool_list = [i.get() for i in t]
+        //            flow_pool_list = [item for item in flow_pool_list if item != '']
+        //            if len(flow_pool_list) > 0:
+        //                flow_pool = flow_pool_list[0]
+        //                item['flowPool'] = flow_pool
+        //                item['isInFlowPool'] = 1
+        //            result.append(item)
+        //        return result
+    }
 
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
 }

+ 77 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java

@@ -0,0 +1,77 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.google.common.collect.Lists;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.filter.FlowPoolFilterService;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+public abstract class AbstractFlowPoolWithLevelRecallStrategy implements RecallStrategy {
+    @Autowired
+    protected RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    protected FlowPoolFilterService flowPoolFilterService;
+
+    @Override
+    public RecallResult recall(RecallParam param) {
+        Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
+        String flowPoolKey = flowPoolKeyAndLevel.getLeft();
+        String level = flowPoolKeyAndLevel.getRight();
+
+        int getSize = param.getSize() * 5;
+        List<RecallResult.RecallData> results = new ArrayList<>();
+        List<String> data = redisTemplate.opsForSet().randomMembers(flowPoolKey, getSize);
+        if (CollectionUtils.isEmpty(data)) {
+            return null;
+        }
+        Map<Long, String> videoMap = new HashMap<>();
+        for (String value : data) {
+            String[] values = value.split("-");
+            videoMap.put(NumberUtils.toLong(values[0], 0), values[1]);
+        }
+
+        FilterParam filterParam = new FilterParam();
+        filterParam.setVideoIds(Lists.newArrayList(videoMap.keySet()));
+        FilterResult filterResult = flowPoolFilterService.filter(filterParam);
+
+        if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+            filterResult.getVideoIds().stream().forEach(vid -> {
+                RecallResult.RecallData recallData = new RecallResult.RecallData();
+                recallData.setVideoId(vid);
+                recallData.setAbCode(param.getAbCode());
+                recallData.setRovScore(RandomUtils.nextInt(0, 100));
+                recallData.setPushFrom(pushFrom());
+                recallData.setFlowPool(videoMap.get(vid));
+                recallData.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
+                recallData.setLevel("level");
+                results.add(recallData);
+            });
+        }
+
+        RecallResult result = new RecallResult();
+        result.setRecallData(results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize()));
+        return result;
+    }
+
+    abstract Pair<String, String> flowPoolKeyAndLevel(RecallParam param);
+
+    abstract String pushFrom();
+
+}

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java

@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * @author dyp
  */
-public abstract class AbstractRecallStrategy implements RecallStrategy {
+public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
     @Autowired
     protected RedisTemplate<String, String> redisTemplate;
 

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup224HRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup224HRegionRecallStrategy.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class Dup224HRecallStrategy extends AbstractRecallStrategy {
+public class Dup224HRegionRecallStrategy extends AbstractRegionRecallStrategy {
     //record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_24H
     //            # 视频列表
     //            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup248HRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup248HRegionRecallStrategy.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class Dup248HRecallStrategy extends AbstractRecallStrategy {
+public class Dup248HRegionRecallStrategy extends AbstractRegionRecallStrategy {
     //  record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_48H
     //            # 视频列表
     //            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup324HRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup324HRegionRecallStrategy.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class Dup324HRecallStrategy extends AbstractRecallStrategy {
+public class Dup324HRegionRecallStrategy extends AbstractRegionRecallStrategy {
     //record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_24H
     //            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
     //            last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup348HRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Dup348HRegionRecallStrategy.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class Dup348HRecallStrategy extends AbstractRecallStrategy {
+public class Dup348HRegionRecallStrategy extends AbstractRegionRecallStrategy {
     //  record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_48H
     //            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H
     //            last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_48H_PREFIX

+ 97 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolWithLevelRecallStrategy.java

@@ -0,0 +1,97 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.service.FlowPoolConfigService;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.*;
+
+/**
+ * @author dyp
+ */
+@Service
+public class FlowPoolWithLevelRecallStrategy extends AbstractFlowPoolWithLevelRecallStrategy {
+
+    @Autowired
+    private FlowPoolConfigService flowPoolConfigService;
+
+    @Override
+    Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
+        //# 1. 获取流量池各层级分发概率权重
+        Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
+
+        // 2. 判断各层级是否有视频需分发
+        List<LevelWeight> availableLevels = new ArrayList<>();
+        levelWeightMap.forEach((l, w) -> {
+            String levelKey = String.format("flow:pool:level:item:%s:%s", param.getAppType(), l);
+            if (redisTemplate.hasKey(levelKey)) {
+                LevelWeight lw = new LevelWeight();
+                lw.setLevel(l);
+                lw.setLevelKey(levelKey);
+                lw.setWeight(w);
+                availableLevels.add(lw);
+            }
+        });
+        if (CollectionUtils.isEmpty(availableLevels)) {
+            return Pair.of("", "");
+        }
+
+        // 3. 根据可分发层级权重设置分发概率
+        Collections.sort(availableLevels, Comparator.comparingDouble(LevelWeight::getWeight));
+        double weightSum = availableLevels.stream().mapToDouble(o -> o.getWeight()).sum();
+        BigDecimal weightSumBD = new BigDecimal(weightSum);
+        double level_p_low = 0;
+        double weight_temp = 0;
+        double level_p_up = 0;
+        Map<String, LevelP> level_p_mapping = new HashMap<>();
+        for (LevelWeight lw : availableLevels) {
+            BigDecimal bd = new BigDecimal(weight_temp + lw.getWeight());
+            level_p_up = bd.divide(weightSumBD, 2, RoundingMode.HALF_UP).doubleValue();
+            LevelP levelP = new LevelP();
+            levelP.setMin(level_p_low);
+            levelP.setMax(level_p_up);
+            levelP.setLevelKey(lw.getLevelKey());
+            level_p_mapping.put(lw.level, levelP);
+            level_p_low = level_p_up;
+
+            weight_temp += lw.getWeight();
+        }
+
+        // 4. 随机生成[0,1)之间数,返回相应概率区间的key
+
+        int random_p = RandomUtils.nextInt(0, 1);
+        for (Map.Entry<String, LevelP> entry : level_p_mapping.entrySet()) {
+            if (random_p >= entry.getValue().getMin()
+                    && random_p >= entry.getValue().getMax()) {
+                return Pair.of(entry.getValue().getLevelKey(), entry.getKey());
+            }
+        }
+        return Pair.of("", "");
+    }
+
+    @Data
+    class LevelWeight {
+        private String level;
+        private String levelKey;
+        private Double weight;
+    }
+
+    @Data
+    class LevelP {
+        private String levelKey;
+        private double min;
+        private double max;
+    }
+
+    @Override
+    String pushFrom() {
+        return "flow_pool";
+    }
+}

+ 22 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/QuickFlowPoolWithLevelRecallStrategy.java

@@ -0,0 +1,22 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author dyp
+ */
+@Service
+public class QuickFlowPoolWithLevelRecallStrategy extends AbstractFlowPoolWithLevelRecallStrategy {
+
+    @Override
+    Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
+        return Pair.of(String.format("flow:pool:quick:item:%s:3", param.getAppType()), "");
+    }
+
+    @Override
+    String pushFrom() {
+        return "flow_pool";
+    }
+}

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Region24HRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/Region24HRegionRecallStrategy.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class Region24HRecallStrategy extends AbstractRecallStrategy {
+public class Region24HRegionRecallStrategy extends AbstractRegionRecallStrategy {
     //record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP1_24H
 //            # 视频列表
 //            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionHRecallStrategy.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/RegionHRegionRecallStrategy.java

@@ -8,7 +8,7 @@ import org.springframework.stereotype.Service;
  * @author dyp
  */
 @Service
-public class RegionHRecallStrategy extends AbstractRecallStrategy {
+public class RegionHRegionRecallStrategy extends AbstractRegionRecallStrategy {
 
     @Override
     protected String recordKey(RecallParam param) {