瀏覽代碼

homepage recommend

丁云鹏 1 年之前
父節點
當前提交
d9b4461c54
共有 14 個文件被更改,包括 493 次插入121 次删除
  1. 1 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/param/HomepageRecommendParam.java
  2. 13 12
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java
  3. 16 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java
  4. 18 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java
  5. 134 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java
  6. 4 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java
  7. 134 81
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java
  8. 3 1
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallStrategy.java
  9. 158 0
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/SpecialRegionRecallStrategy.java
  10. 2 4
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java
  11. 2 6
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelScoreRecallStrategy.java
  12. 2 4
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithScoreRecallStrategy.java
  13. 3 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java
  14. 3 5
      recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/param/HomepageRecommendParam.java

@@ -13,7 +13,7 @@ import lombok.Setter;
 @Setter
 public class HomepageRecommendParam {
     private int top_K;
-    private double flow_pool_P;
+    private double flowPoolP;
     private String ab_code;
     private String rule_key;
     private String data_key;

+ 13 - 12
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -2,15 +2,13 @@ package com.tzld.piaoquan.recommend.server.service;
 
 import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.tzld.piaoquan.recommend.server.gen.common.Result;
 import com.tzld.piaoquan.recommend.server.gen.recommend.HomepageRecommendRequest;
 import com.tzld.piaoquan.recommend.server.gen.recommend.HomepageRecommendResponse;
 import com.tzld.piaoquan.recommend.server.gen.recommend.Video;
 import com.tzld.piaoquan.recommend.server.model.param.HomepageRecommendParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallService;
 import com.tzld.piaoquan.recommend.server.util.DateUtils;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
@@ -25,10 +23,8 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
 import javax.annotation.PostConstruct;
-import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author dyp
@@ -110,6 +106,11 @@ public class RecommendService {
 
     private List<Video> videoRecommendNew(HomepageRecommendRequest request, HomepageRecommendParam param) {
         // TODO
+        RecallParam recallParam = new RecallParam();
+        RecallResult recallResult = recallService.recall(recallParam);
+
+
+
         return null;
     }
 
@@ -123,7 +124,7 @@ public class RecommendService {
     private HomepageRecommendParam genHomepageRecommendParam(HomepageRecommendRequest request, int recommendType) {
         HomepageRecommendParam param = new HomepageRecommendParam();
         param.setTop_K(3);
-        param.setFlow_pool_P(0.3);
+        param.setFlowPoolP(0.3);
         param.setNo_op_flag(true);
         param.setExpire_time(3600);
         param.setOld_video_index(-1);
@@ -150,17 +151,17 @@ public class RecommendService {
 
             // 流量池视频分发概率
             if (abExpCodes.contains("211")) {
-                param.setFlow_pool_P(0.9);
+                param.setFlowPoolP(0.9);
             } else if (abExpCodes.contains("221")) {
-                param.setFlow_pool_P(0.7);
+                param.setFlowPoolP(0.7);
             } else if (abExpCodes.contains("299")) {
-                param.setFlow_pool_P(0.5);
+                param.setFlowPoolP(0.5);
             } else if (abExpCodes.contains("300")) {
-                param.setFlow_pool_P(0.4);
+                param.setFlowPoolP(0.4);
             } else if (abExpCodes.contains("301")) {
-                param.setFlow_pool_P(0.6);
+                param.setFlowPoolP(0.6);
             } else if (abExpCodes.contains("339")) {
-                param.setFlow_pool_P(0);
+                param.setFlowPoolP(0);
             }
 
             for (Map.Entry<String, Map<String, String>> entry : abExpCodeMap.entrySet()) {

+ 16 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java

@@ -0,0 +1,16 @@
+package com.tzld.piaoquan.recommend.server.service.rank;
+
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import lombok.Data;
+
+/**
+ * @author dyp
+ */
+@Data
+public class RankParam {
+    private RecallResult recallResult;
+    private int size;
+    private int topK;
+    private String rankKeyPrefix;
+    private double flowPoolP;
+}

+ 18 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankResult.java

@@ -0,0 +1,18 @@
+package com.tzld.piaoquan.recommend.server.service.rank;
+
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * @author dyp
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RankResult {
+    private List<RecallResult.RecallData> data;
+}

+ 134 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -1,8 +1,141 @@
 package com.tzld.piaoquan.recommend.server.service.rank;
 
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
 /**
  * @author dyp
  */
 public class RankService {
-    
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    public RankResult rank(RankParam param) {
+
+        if (param.getRecallResult() == null
+                || (CollectionUtils.isEmpty(param.getRecallResult().getRovPoolRecall())
+                && CollectionUtils.isEmpty(param.getRecallResult().getFlowPoolRecall()))) {
+            return null;
+        }
+
+        // rank rov recall
+        List<String> videoIdKeys = param.getRecallResult().getRovPoolRecall().stream()
+                .map(t -> param.getRankKeyPrefix() + t.getVideoId())
+                .collect(Collectors.toList());
+        List<RecallResult.RecallData> rov_recall_rank = param.getRecallResult().getRovPoolRecall().stream()
+                .collect(Collectors.toList());
+        List<String> video_scores = redisTemplate.opsForValue().multiGet(videoIdKeys);
+        if (CollectionUtils.isNotEmpty(video_scores)
+                && video_scores.size() == rov_recall_rank.size()) {
+            for (int i = 0; i < video_scores.size(); i++) {
+                rov_recall_rank.get(i).setRovScore(NumberUtils.toDouble(video_scores.get(i), 0.0));
+            }
+            Collections.sort(rov_recall_rank, Comparator.comparingDouble(o -> -o.getRovScore()));
+        }
+
+        // rank flow pool recall
+        List<RecallResult.RecallData> flow_recall_rank = param.getRecallResult().getFlowPoolRecall().stream()
+                .collect(Collectors.toList());
+        Collections.sort(flow_recall_rank, Comparator.comparingDouble(o -> -o.getRovScore()));
+
+
+        // TODO 重构
+        //    去重原则:
+        //        如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
+        Set<Long> rovTopKVideoIds = new HashSet<>();
+        for (int i = 0; i < param.getTopK() && i < rov_recall_rank.size(); i++) {
+            rovTopKVideoIds.add(rov_recall_rank.get(i).getVideoId());
+        }
+
+        Set<Long> flowPoolVideoIds = new HashSet<>();
+        Iterator<RecallResult.RecallData> flowRecallRankIte = flow_recall_rank.iterator();
+        while (flowRecallRankIte.hasNext()) {
+            RecallResult.RecallData data = flowRecallRankIte.next();
+            if (rovTopKVideoIds.contains(data.getVideoId())) {
+                flowRecallRankIte.remove();
+            } else {
+                flowPoolVideoIds.add(data.getVideoId());
+            }
+        }
+
+        Iterator<RecallResult.RecallData> rovRecallRankIte = rov_recall_rank.iterator();
+        while (rovRecallRankIte.hasNext()) {
+            RecallResult.RecallData data = rovRecallRankIte.next();
+            if (flowPoolVideoIds.contains(data.getVideoId())) {
+                rovRecallRankIte.remove();
+            }
+        }
+
+        // 1 取topK
+        if (CollectionUtils.isEmpty(rov_recall_rank)) {
+            if (param.getSize() < flow_recall_rank.size()) {
+                return new RankResult(flow_recall_rank.subList(0, param.getSize()));
+            } else {
+                return new RankResult(flow_recall_rank);
+            }
+        }
+
+        // rov 取topK
+        List<RecallResult.RecallData> datas = new ArrayList<>();
+        for (int i = 0; i < param.getTopK() && i < rov_recall_rank.size(); i++) {
+            datas.add(rov_recall_rank.get(i));
+        }
+
+        double flowPoolP = param.getFlowPoolP();
+        if (param.getRecallResult().isQuickPool()) {
+            String quick_flow_pool_P = redisTemplate.opsForValue().get("flow:pool:quick:distribute:rate:3");
+            flowPoolP = NumberUtils.toDouble(quick_flow_pool_P, 0);
+        }
+
+        int flowPoolIndex = 0;
+        int rovPoolIndex = param.getTopK() - 1;
+
+        for (int i = 0; i < param.getSize() - param.getTopK(); i++) {
+            double rand = RandomUtils.nextDouble(0, 1);
+            if (rand < flowPoolP) {
+                if (flowPoolIndex < flow_recall_rank.size()) {
+                    datas.add(flow_recall_rank.get(flowPoolIndex++));
+                } else {
+                    break;
+                }
+            } else {
+                if (rovPoolIndex < rov_recall_rank.size()) {
+                    datas.add(rov_recall_rank.get(rovPoolIndex++));
+                } else {
+                    break;
+                }
+            }
+        }
+        //     while i < size - top_K:
+        //        # 随机生成[0, 1)浮点数
+        //        rand = random.random()
+        //        # log_.info('rand: {}'.format(rand))
+        //        if rand < flow_pool_P:
+        //            if flow_recall_rank:
+        //                rank_result.append(flow_recall_rank[0])
+        //                flow_recall_rank.remove(flow_recall_rank[0])
+        //            else:
+        //                rank_result.extend(rov_recall_rank[:size - top_K - i])
+        //                return rank_result[:size], flow_num
+        //        else:
+        //            if rov_recall_rank:
+        //                rank_result.append(rov_recall_rank[0])
+        //                rov_recall_rank.remove(rov_recall_rank[0])
+        //            else:
+        //                rank_result.extend(flow_recall_rank[:size - top_K - i])
+        //                return rank_result[:size], flow_num
+        //        i += 1
+
+        if (param.getSize() < datas.size()) {
+            return new RankResult(datas.subList(0, param.getSize()));
+        }
+        return new RankResult(datas);
+    }
 }

+ 4 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallResult.java

@@ -9,7 +9,9 @@ import java.util.List;
  */
 @Data
 public class RecallResult {
-    private List<RecallData> recallData;
+    private List<RecallData> rovPoolRecall;
+    private List<RecallData> flowPoolRecall;
+    private boolean quickPool;
 
     @Data
     public static class RecallData {
@@ -20,5 +22,6 @@ public class RecallResult {
         private String flowPool;
         private String level;
         private String flowPoolAbtestGroup;
+        private boolean inFlowPool;
     }
 }

+ 134 - 81
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -1,13 +1,9 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
-import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup224HRegionRecallStrategy;
-import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup324HRegionRecallStrategy;
-import com.tzld.piaoquan.recommend.server.service.recall.strategy.Region24HRegionRecallStrategy;
-import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionHRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
@@ -25,12 +21,7 @@ import java.util.concurrent.*;
 public class RecallService implements ApplicationContextAware {
 
     private final Map<String, RecallStrategy> strategyMap = new HashMap<>();
-
     private ApplicationContext applicationContext;
-
-    @ApolloJsonValue("city_code")
-    private Set<String> cityCodes;
-
     private ExecutorService pool;
 
     @PostConstruct
@@ -39,48 +30,20 @@ public class RecallService implements ApplicationContextAware {
         pool = new ThreadPoolExecutor(8, 32,
                 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000),
                 new ThreadFactoryBuilder().setNameFormat("video-title-%d").build(), new ThreadPoolExecutor.AbortPolicy());
-
         Map<String, RecallStrategy> type = applicationContext.getBeansOfType(RecallStrategy.class);
         for (Map.Entry<String, RecallStrategy> entry : type.entrySet()) {
             RecallStrategy value = entry.getValue();
             strategyMap.put(value.getClass().getSimpleName(), value);
         }
-
     }
 
-    public void recall(RecallParam param){
-
-        // region recall
-        String provinceCode = StringUtils.isNotBlank(param.getProvinceCode())
-                ? param.getProvinceCode()
-                : "-1";
-        String cityCode = StringUtils.isNotBlank(param.getCityCode())
-                ? param.getCityCode()
-                : "-1";
-        String regionCode;
-        if (cityCodes.contains(cityCode)) {
-            regionCode = cityCode;
-        } else {
-            regionCode = provinceCode;
-        }
-
-        List<RecallStrategy> strategies = new ArrayList<>();
-        if (regionCode.equals("-1")) {
-            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
-            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
-        } else {
-            strategies.add(strategyMap.get(RegionHRegionRecallStrategy.class.getSimpleName()));
-            strategies.add(strategyMap.get(Region24HRegionRecallStrategy.class.getSimpleName()));
-            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
-            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
-        }
-
-
+    public RecallResult recall(RecallParam param) {
+        List<RecallStrategy> strategies = getRecallStrategy(param);
         CountDownLatch cdl = new CountDownLatch(strategies.size());
-        List<Future<RecallResult>> recallResultFutures = new ArrayList<>(strategies.size());
+        List<Future<List<RecallResult.RecallData>>> recallResultFutures = new ArrayList<>(strategies.size());
         for (RecallStrategy strategy : strategies) {
-            Future<RecallResult> future = pool.submit(() -> {
-                RecallResult result = strategy.recall(param);
+            Future<List<RecallResult.RecallData>> future = pool.submit(() -> {
+                List<RecallResult.RecallData> result = strategy.recall(param);
                 cdl.countDown();
                 return result;
             });
@@ -88,45 +51,135 @@ public class RecallService implements ApplicationContextAware {
         }
         try {
             cdl.await(3000, TimeUnit.MILLISECONDS);
-
         } catch (InterruptedException e) {
-            // 1s 以后返回
+            log.error("rov_pool_recall_with_region recall error", e);
+            return null;
         }
-        // region_recall_result_list = [i.get() for i in t]
-        //        # 将已获取到的视频按顺序去重合并
-        //        now_video_ids = []
-        //        recall_result = []
-        //        recall_num = size
-        //        for region_result in region_recall_result_list:
-        //            for video in region_result:
-        //                video_id = video.get('videoId')
-        //                if video_id not in now_video_ids:
-        //                    recall_result.append(video)
-        //                    now_video_ids.append(video_id)
-        //                    if len(recall_result) >= recall_num:
-        //                        break
-        //                    else:
-        //                        continue
-        //        return recall_result[:recall_num]
-
-
-        // # 对在流量池中存在的视频添加标记字段
-        //        result = []
-        //        for item in videos:
-        //            video_id = item['videoId']
-        //            t = [
-        //                gevent.spawn(self.get_video_flow_pool, video_id, True),
-        //                gevent.spawn(self.get_video_flow_pool, video_id, False)
-        //            ]
-        //            gevent.joinall(t)
-        //            flow_pool_list = [i.get() for i in t]
-        //            flow_pool_list = [item for item in flow_pool_list if item != '']
-        //            if len(flow_pool_list) > 0:
-        //                flow_pool = flow_pool_list[0]
-        //                item['flowPool'] = flow_pool
-        //                item['isInFlowPool'] = 1
-        //            result.append(item)
-        //        return result
+
+
+        // TODO 重构
+        // merge
+        return merge(recallResultFutures, param);
+    }
+
+    private RecallResult merge(List<Future<List<RecallResult.RecallData>>> recallResultFutures, RecallParam param) {
+        List<List<RecallResult.RecallData>> results = new ArrayList<>();
+        for (Future<List<RecallResult.RecallData>> f : recallResultFutures) {
+            try {
+                results.add(f.get());
+            } catch (Exception e) {
+                log.error("future get error ", e);
+                results.add(Collections.emptyList());
+            }
+        }
+
+        // TODO 重构 merge sim recall and return recall
+        Set<Long> videoIds = new HashSet<>();
+        List<RecallResult.RecallData> datas = new ArrayList<>();
+        if (param.getAbCode().equals("60054")
+                || param.getAbCode().equals("60068")
+                || param.getAbCode().equals("60081")
+                || param.getAbCode().equals("60084")) {
+            if (results.size() >= 2) {
+                List<RecallResult.RecallData> region_recall = results.get(0);
+                if (CollectionUtils.isNotEmpty(region_recall)) {
+                    for (RecallResult.RecallData data : region_recall) {
+                        if (!videoIds.contains(data.getVideoId())) {
+                            datas.add(data);
+                            videoIds.add(data.getVideoId());
+                        }
+                    }
+                }
+                List<RecallResult.RecallData> simRecall = null;
+                List<RecallResult.RecallData> returnRecall = null;
+                if (param.getAppType().equals("18") || param.getAppType().equals("19")) {
+                    simRecall = results.get(1);
+                } else {
+                    if (results.size() >= 4) {
+                        simRecall = results.get(3);
+                    }
+                    if (results.size() >= 5 && !param.getAbCode().equals("60054")) {
+                        returnRecall = results.get(4);
+                    }
+                }
+
+                if (simRecall != null && CollectionUtils.isNotEmpty(simRecall)) {
+                    for (RecallResult.RecallData data : simRecall) {
+                        if (!videoIds.contains(data.getVideoId())) {
+                            datas.add(data);
+                            videoIds.add(data.getVideoId());
+                        }
+                    }
+                }
+
+                if (returnRecall != null && CollectionUtils.isNotEmpty(returnRecall)) {
+                    for (RecallResult.RecallData data : returnRecall) {
+                        if (!videoIds.contains(data.getVideoId())) {
+                            datas.add(data);
+                            videoIds.add(data.getVideoId());
+                        }
+                    }
+                }
+                if (CollectionUtils.isNotEmpty(datas)) {
+                    results.set(0, datas);
+                }
+            }
+        }
+
+
+        RecallResult result = new RecallResult();
+        if (param.getAppType().equals("18") || param.getAppType().equals("19")) {
+            // TODO 可能没有这个code
+            if (param.getAbCode().equals("30001")
+                    || param.getAbCode().equals("30002")
+                    || param.getAbCode().equals("80001")) {
+                result.setRovPoolRecall(results.get(0));
+                result.setFlowPoolRecall(results.get(1));
+            } else {
+                result.setRovPoolRecall(results.get(0));
+            }
+        } else {
+            if (CollectionUtils.isNotEmpty(results.get(1))) {
+                result.setQuickPool(true);
+                result.setRovPoolRecall(results.get(0));
+                result.setFlowPoolRecall(results.get(1));
+
+            } else {
+                result.setRovPoolRecall(results.get(0));
+                result.setFlowPoolRecall(results.get(2));
+            }
+        }
+        return result;
+    }
+
+    private List<RecallStrategy> getRecallStrategy(RecallParam param) {
+        List<RecallStrategy> strategies = new ArrayList<>();
+        if (param.getAppType().equals("18") || param.getAppType().equals("19")) {
+            strategies.add(strategyMap.get(SpecialRegionRecallStrategy.class.getSimpleName()));
+        } else if (param.getFlowPoolAbtestGroup().equals("experimental_flow_set_level")) {
+            strategies.add(strategyMap.get(SpecialRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(QuickFlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+        } else if (param.getFlowPoolAbtestGroup().equals("experimental_flow_set_level_score")) {
+            strategies.add(strategyMap.get(SpecialRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(QuickFlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+        } else {
+            strategies.add(strategyMap.get(SpecialRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(QuickFlowPoolWithScoreRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(FlowPoolWithScoreRecallStrategy.class.getSimpleName()));
+        }
+
+        if (param.getAbCode().equals("60054")) {
+            strategies.add(strategyMap.get(SimHotVideoRecallStrategy.class.getSimpleName()));
+        } else if (param.getAbCode().equals("60068")
+                || param.getAbCode().equals("60081")
+                || param.getAbCode().equals("60084")) {
+            strategies.add(strategyMap.get(SimHotVideoRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(ReturnVideoRecallStrategy.class.getSimpleName()));
+        }
+
+        return strategies;
     }
 
     @Override

+ 3 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallStrategy.java

@@ -1,8 +1,10 @@
 package com.tzld.piaoquan.recommend.server.service.recall;
 
+import java.util.List;
+
 /**
  * @author dyp
  */
 public interface RecallStrategy {
-    RecallResult recall(RecallParam param);
+    List<RecallResult.RecallData> recall(RecallParam param);
 }

+ 158 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/SpecialRegionRecallStrategy.java

@@ -0,0 +1,158 @@
+package com.tzld.piaoquan.recommend.server.service.recall;
+
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup224HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Dup324HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.Region24HRegionRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionHRegionRecallStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * @author dyp
+ */
+@Service
+@Slf4j
+public class SpecialRegionRecallStrategy implements ApplicationContextAware, RecallStrategy {
+
+    private final Map<String, RecallStrategy> strategyMap = new HashMap<>();
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    private ApplicationContext applicationContext;
+
+    @ApolloJsonValue("city_code")
+    private Set<String> cityCodes;
+
+    private ExecutorService pool;
+
+    @PostConstruct
+    public void init() {
+        // init thread pool
+        pool = new ThreadPoolExecutor(8, 32,
+                0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000),
+                new ThreadFactoryBuilder().setNameFormat("video-title-%d").build(), new ThreadPoolExecutor.AbortPolicy());
+
+        Map<String, RecallStrategy> type = applicationContext.getBeansOfType(RecallStrategy.class);
+        for (Map.Entry<String, RecallStrategy> entry : type.entrySet()) {
+            RecallStrategy value = entry.getValue();
+            strategyMap.put(value.getClass().getSimpleName(), value);
+        }
+
+    }
+
+    public List<RecallResult.RecallData> recall(RecallParam param) {
+
+        // 1. region recall
+        String provinceCode = StringUtils.isNotBlank(param.getProvinceCode())
+                ? param.getProvinceCode()
+                : "-1";
+        String cityCode = StringUtils.isNotBlank(param.getCityCode())
+                ? param.getCityCode()
+                : "-1";
+        String regionCode;
+        if (cityCodes.contains(cityCode)) {
+            regionCode = cityCode;
+        } else {
+            regionCode = provinceCode;
+        }
+
+        List<RecallStrategy> strategies = new ArrayList<>();
+        if (regionCode.equals("-1")) {
+            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
+        } else {
+            strategies.add(strategyMap.get(RegionHRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Region24HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup224HRegionRecallStrategy.class.getSimpleName()));
+            strategies.add(strategyMap.get(Dup324HRegionRecallStrategy.class.getSimpleName()));
+        }
+
+        // execute
+        CountDownLatch cdl = new CountDownLatch(strategies.size());
+        List<Future<List<RecallResult.RecallData>>> recallResultFutures = new ArrayList<>(strategies.size());
+        for (RecallStrategy strategy : strategies) {
+            Future<List<RecallResult.RecallData>> future = pool.submit(() -> {
+                List<RecallResult.RecallData> result = strategy.recall(param);
+                cdl.countDown();
+                return result;
+            });
+            recallResultFutures.add(future);
+        }
+        try {
+            cdl.await(3000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("rov_pool_recall_with_region recall error", e);
+            return null;
+        }
+
+        // 2. 去重
+        Set<Long> videoIds = new HashSet<>();
+        List<RecallResult.RecallData> results = new ArrayList<>();
+        for (Future<List<RecallResult.RecallData>> f : recallResultFutures) {
+            if (results.size() >= param.getSize()) {
+                break;
+            }
+            try {
+                List<RecallResult.RecallData> datas = f.get();
+                if (CollectionUtils.isNotEmpty(f.get())) {
+                    for (RecallResult.RecallData data : datas) {
+                        if (!videoIds.contains(data.getVideoId())) {
+                            videoIds.add(data.getVideoId());
+                            results.add(data);
+                        }
+                        if (results.size() >= param.getSize()) {
+                            break;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                // log
+            }
+        }
+
+
+        // 3. 对在流量池中存在的视频添加标记字段
+        for (RecallResult.RecallData data : results) {
+
+            Long videoId = data.getVideoId();
+            String quick_flow_pool_isin_flow_pool_key =
+                    String.format("flow:pool:quick:video:ids:%s:3", param.getAppType());
+            String quick_flow_pool_flow_pool_key =
+                    String.format("flow:pool:quick:video:%s:3:%s", param.getAppType(), videoId);
+            if (redisTemplate.opsForSet().isMember(quick_flow_pool_isin_flow_pool_key, videoId)) {
+                data.setFlowPool(redisTemplate.opsForSet().randomMember(quick_flow_pool_flow_pool_key));
+                data.setInFlowPool(true);
+            } else {
+                String isIn_flow_pool_key =
+                        String.format("flow:pool:video:ids:%s:3", param.getAppType());
+                String flow_pool_key =
+                        String.format("flow:pool:video:%s:%s", param.getAppType(), videoId);
+                if (redisTemplate.opsForSet().isMember(isIn_flow_pool_key, videoId)) {
+                    data.setFlowPool(redisTemplate.opsForSet().randomMember(flow_pool_key));
+                    data.setInFlowPool(true);
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+}

+ 2 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java

@@ -30,7 +30,7 @@ public abstract class AbstractFlowPoolWithLevelRecallStrategy implements RecallS
     protected FlowPoolWithLevelFilterService flowPoolFilterService;
 
     @Override
-    public RecallResult recall(RecallParam param) {
+    public List<RecallResult.RecallData> recall(RecallParam param) {
         Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
         String flowPoolKey = flowPoolKeyAndLevel.getLeft();
         String level = flowPoolKeyAndLevel.getRight();
@@ -65,9 +65,7 @@ public abstract class AbstractFlowPoolWithLevelRecallStrategy implements RecallS
             });
         }
 
-        RecallResult result = new RecallResult();
-        result.setRecallData(results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize()));
-        return result;
+        return results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize());
     }
 
     abstract Pair<String, String> flowPoolKeyAndLevel(RecallParam param);

+ 2 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelScoreRecallStrategy.java

@@ -3,13 +3,11 @@ package com.tzld.piaoquan.recommend.server.service.recall.strategy;
 import com.google.common.collect.Lists;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
-import com.tzld.piaoquan.recommend.server.service.filter.FlowPoolWithLevelFilterService;
 import com.tzld.piaoquan.recommend.server.service.filter.FlowPoolWithLevelScoreFilterService;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -29,7 +27,7 @@ public abstract class AbstractFlowPoolWithLevelScoreRecallStrategy implements Re
     protected FlowPoolWithLevelScoreFilterService flowPoolFilterService;
 
     @Override
-    public RecallResult recall(RecallParam param) {
+    public List<RecallResult.RecallData> recall(RecallParam param) {
         Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
         String flowPoolKey = flowPoolKeyAndLevel.getLeft();
         String level = flowPoolKeyAndLevel.getRight();
@@ -71,9 +69,7 @@ public abstract class AbstractFlowPoolWithLevelScoreRecallStrategy implements Re
             });
         }
 
-        RecallResult result = new RecallResult();
-        result.setRecallData(results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize()));
-        return result;
+        return results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize());
     }
 
     abstract Pair<String, String> flowPoolKeyAndLevel(RecallParam param);

+ 2 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithScoreRecallStrategy.java

@@ -26,7 +26,7 @@ public abstract class AbstractFlowPoolWithScoreRecallStrategy implements RecallS
     protected FlowPoolWithScoreFilterService flowPoolFilterService;
 
     @Override
-    public RecallResult recall(RecallParam param) {
+    public List<RecallResult.RecallData> recall(RecallParam param) {
         String flowPoolKey = flowPoolKey(param);
 
         int idx = 0;
@@ -65,9 +65,7 @@ public abstract class AbstractFlowPoolWithScoreRecallStrategy implements RecallS
             });
         }
 
-        RecallResult result = new RecallResult();
-        result.setRecallData(results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize()));
-        return result;
+        return results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize());
     }
 
     abstract String flowPoolKey(RecallParam param);

+ 3 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java

@@ -30,7 +30,7 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
     protected FilterService filterService;
 
     @Override
-    public RecallResult recall(RecallParam param) {
+    public List<RecallResult.RecallData> recall(RecallParam param) {
 
         String recordKey = recordKey(param);
         String lastVideoKey = lastVideoKey(param);
@@ -112,7 +112,7 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
         }
 
 
-        Collections.sort(results, (o1, o2) -> (int) (o2.getRovScore() - o1.getRovScore()));
+        Collections.sort(results, Comparator.comparingDouble(o -> -o.getRovScore()));
 
         if (StringUtils.isNotBlank(lastVideoId)
                 && CollectionUtils.isNotEmpty(results)
@@ -121,9 +121,7 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
             redisTemplate.opsForValue().set(lastVideoKey, lastVideoId, 2, TimeUnit.HOURS);
         }
 
-        RecallResult result = new RecallResult();
-        result.setRecallData(results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize()));
-        return result;
+        return results.subList(0, results.size() < param.getSize() ? results.size() : param.getSize());
     }
 
     private int getIdx(String lastVideoKey, String poolKey) {

+ 3 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractVideoRecallStrategy.java

@@ -12,7 +12,6 @@ import lombok.Data;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 
@@ -32,7 +31,7 @@ public abstract class AbstractVideoRecallStrategy implements RecallStrategy {
     protected VideoFilterService simHotItemFilterService;
 
     @Override
-    public RecallResult recall(RecallParam param) {
+    public List<RecallResult.RecallData> recall(RecallParam param) {
 
 
         String recall_key = recallKey(param);
@@ -67,9 +66,7 @@ public abstract class AbstractVideoRecallStrategy implements RecallStrategy {
             });
         }
 
-        RecallResult result = new RecallResult();
-        result.setRecallData(results);
-        return result;
+        return results;
     }
 
     @Data
@@ -79,5 +76,6 @@ public abstract class AbstractVideoRecallStrategy implements RecallStrategy {
     }
 
     abstract String recallKey(RecallParam param);
+
     abstract String pushFrom();
 }