浏览代码

feat:添加热点宝流量池供给

zhaohaipeng 2 周之前
父节点
当前提交
425bfbfe7a

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/feign/FlowPoolFeign.java

@@ -14,4 +14,7 @@ public interface FlowPoolFeign {
 
     @PostMapping("/flowpool/video/getFlowPoolVideo")
     FlowPoolResponse<List<FlowPoolVideoInfo>> getFlowPoolVideo(@RequestBody JSONObject param);
+
+    @PostMapping("/flowpool/video/remainViewCount")
+    FlowPoolResponse<List<FlowPoolVideoInfo>> remainViewCount(@RequestBody List<JSONObject> param);
 }

+ 14 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -493,6 +493,7 @@ public class RecommendService {
 
         updateFlowPoolCache(request, param, videos);
 
+        updateDouHotVideoCache(request, param, videos);
     }
 
     private void updateFlowPoolCache(RecommendRequest request, RecommendParam param,
@@ -514,6 +515,19 @@ public class RecommendService {
         flowPoolService.updateDistributeCountWithLevel(flowPoolVideos);
     }
 
+    private void updateDouHotVideoCache(RecommendRequest request, RecommendParam param, List<Video> videos) {
+        if (CollectionUtils.isEmpty(videos)) {
+            return;
+        }
+        List<Video> douHotFlowPoolVideo = videos.stream()
+                .filter(v -> v.getPushFrom().equals(DouHotFlowPoolRecallStrategy.PUSH_FROM))
+                .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(douHotFlowPoolVideo)) {
+            return;
+        }
+        flowPoolService.asyncHandleDouHotCache(douHotFlowPoolVideo, param.getProvince());
+    }
+
     private void updateLastVideoCache(List<Video> videos) {
 
         if (CollectionUtils.isEmpty(videos)) {

+ 82 - 11
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/flowpool/FlowPoolService.java

@@ -2,6 +2,7 @@ package com.tzld.piaoquan.recommend.server.service.flowpool;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.feign.FlowPoolFeign;
@@ -23,10 +24,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -153,10 +151,40 @@ public class FlowPoolService {
         });
     }
 
+    public void asyncHandleDouHotCache(List<Video> videos, String province) {
+        if (StringUtils.isBlank(province) || CollectionUtils.isEmpty(videos)) {
+            return;
+        }
+        pool.execute(() -> {
+            List<Long> needRemoveVideoIds = new ArrayList<>();
+            for (Video video : videos) {
+                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, video.getVideoId(), video.getFlowPool());
+                Long count = redisTemplate.opsForValue().decrement(distributeKey);
+                if (Objects.isNull(count) || count <= 0) {
+                    redisTemplate.delete(distributeKey);
+                    needRemoveVideoIds.add(video.getVideoId());
+                }
+            }
+            if (CollectionUtils.isEmpty(needRemoveVideoIds)) {
+                return;
+            }
+
+            // 从流量池缓存中移除达到分发次数限制的视频ID
+            String itemKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
+            redisTemplate.opsForSet().remove(itemKey, needRemoveVideoIds);
+        });
+    }
+
     public void syncDouHotFlowPoolVideo() {
+
+        // 获取流量池中 全部热点宝视频
         List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
         log.info("[DouHot video size]: {}", allDouHotVideo.size());
 
+        // 获取视频对应的可分发数量
+        Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
+        log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
+
         List<Long> allVideoId = allDouHotVideo.stream()
                 .map(FlowPoolVideoInfo::getVideoId)
                 .collect(Collectors.toList());
@@ -169,6 +197,7 @@ public class FlowPoolService {
             }
         }
 
+        // 过滤掉省份不存在的数据
         allDouHotVideo = allDouHotVideo.stream()
                 .filter(i -> StringUtils.isNotBlank(i.getProvince()))
                 .collect(Collectors.toList());
@@ -178,21 +207,35 @@ public class FlowPoolService {
         Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = allDouHotVideo.stream()
                 .collect(Collectors.groupingBy(FlowPoolVideoInfo::getProvince, Collectors.toList()));
 
+        // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
         for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
             String province = entry.getKey();
-            List<String> items = entry.getValue().stream()
-                    .map(i -> String.format("%d-%s", i.getVideoId(), i.getFlowPool()))
-                    .collect(Collectors.toList());
-
-            log.info("[DouHot province video size]. province: {}, video size: {}", entry.getKey(), items.size());
+            List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
 
             String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
-            log.info("[DouHot item redis key]: {}", redisKey);
+            log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
             redisTemplate.delete(redisKey);
-            redisTemplate.opsForSet().add(redisKey, items.toArray(new String[0]));
+
+            // 将视频添加到Redis缓存中,并设置可分发数量
+            for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
+                String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+
+                // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
+                if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
+                    continue;
+                }
+
+                redisTemplate.opsForSet().add(redisKey, item);
+
+                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+                redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
+                redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
+            }
+
             redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
         }
 
+
     }
 
     private Map<Long, String> findAllVideoAndProvinceMap(List<Long> videoIds) {
@@ -245,5 +288,33 @@ public class FlowPoolService {
         }
         return result;
     }
+
+    private Map<String, Integer> findDouHotVideoDistributeCount(List<FlowPoolVideoInfo> flowPoolVideoInfos) {
+        List<JSONObject> paramJsonList = Lists.newArrayList();
+        for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
+            JSONObject paramJson = new JSONObject();
+            paramJson.put("videoId", flowPoolVideoInfo.getVideoId());
+            paramJson.put("flowPool", flowPoolVideoInfo.getFlowPool());
+            paramJsonList.add(paramJson);
+        }
+
+        List<FlowPoolVideoInfo> remainFlowPoolVideoInfos = new ArrayList<>(flowPoolVideoInfos.size());
+        List<List<JSONObject>> partition = Lists.partition(paramJsonList, 10);
+        for (List<JSONObject> param : partition) {
+            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.remainViewCount(param);
+            if (0 != response.getCode()) {
+                log.error("[remain view count error] responseJson: {}", response);
+                continue;
+            }
+            remainFlowPoolVideoInfos.addAll(response.getData());
+        }
+
+        Map<String, Integer> distributeCountMap = Maps.newHashMap();
+        for (FlowPoolVideoInfo videoInfo : remainFlowPoolVideoInfos) {
+            String key = String.format("%s-%s", videoInfo.getVideoId(), videoInfo.getFlowPool());
+            distributeCountMap.put(key, videoInfo.getDistributeCount());
+        }
+        return distributeCountMap;
+    }
 }
 

+ 17 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -85,11 +85,13 @@ public abstract class RankService {
         List<Video> rovRecallRank = mergeAndRankRovRecall(param);
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);
 
-        removeDuplicate(param, rovRecallRank, flowPoolRank);
+        List<Video> douHotFlowPoolRank = extractAndSort(param, DouHotFlowPoolRecallStrategy.PUSH_FROM);
+
+        removeDuplicate(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
 
 
         // 融合排序
-        return mergeAndSort(param, rovRecallRank, flowPoolRank);
+        return mergeAndSort(param, rovRecallRank, flowPoolRank, douHotFlowPoolRank);
     }
 
     private void tagDuplicateVideos(RankParam param) {
@@ -200,7 +202,7 @@ public abstract class RankService {
         }
     }
 
-    public void removeDuplicate(RankParam param, List<Video> rovRecallRank, List<Video> flowPoolRank) {
+    public void removeDuplicate(RankParam param, List<Video> rovRecallRank, List<Video> flowPoolRank, List<Video> douHotFlowPoolRank) {
         // TODO 重构 rov和流量池 融合排序
         //    去重原则:
         //        如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
@@ -222,6 +224,17 @@ public abstract class RankService {
             }
         }
 
+        // dou hot flow pool 移除topK视频
+        Iterator<Video> douHotFlowPoolIte = douHotFlowPoolRank.iterator();
+        while (douHotFlowPoolIte.hasNext()) {
+            Video data = douHotFlowPoolIte.next();
+            if (rovTopKVideoIds.contains(data.getVideoId())) {
+                douHotFlowPoolIte.remove();
+            } else {
+                flowPoolVideoIds.add(data.getVideoId());
+            }
+        }
+
         // rov pool 移除flow中的视频
         Iterator<Video> rovRecallRankIte = rovRecallRank.iterator();
         while (rovRecallRankIte.hasNext()) {
@@ -232,7 +245,7 @@ public abstract class RankService {
         }
     }
 
-    public abstract RankResult mergeAndSort(RankParam param, List<Video> rovRecallRank, List<Video> flowPoolRank);
+    public abstract RankResult mergeAndSort(RankParam param, List<Video> rovRecallRank, List<Video> flowPoolRank,List<Video> douHotFlowPoolRank);
 
     private boolean matchSpecialApp(int appId) {
         Set<Integer> notSpecialApp = new HashSet<>(Arrays.asList(0, 4, 5));

+ 8 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/extractor/RankExtractorItemTags.java

@@ -10,7 +10,7 @@ public class RankExtractorItemTags {
         this.redisTemplate = redisTemplate;
     }
 
-    public void processor(List<Video> rovVideos, List<Video> flowVideos){
+    public void processor(List<Video> rovVideos, List<Video> flowVideos, List<Video> douHotFlowPoolVideos) {
         List<Long> videoIds = new ArrayList<>();
         for (Video v : rovVideos) {
             videoIds.add(v.getVideoId());
@@ -18,6 +18,10 @@ public class RankExtractorItemTags {
         for (Video v : flowVideos) {
             videoIds.add(v.getVideoId());
         }
+        for (Video v : douHotFlowPoolVideos) {
+            videoIds.add(v.getVideoId());
+        }
+
         Map<Long, List<String>> videoTagDict = getVideoTags(redisTemplate, videoIds);
         for (Video v : rovVideos) {
             v.setTags(videoTagDict.getOrDefault(v.getVideoId(), new ArrayList<>()));
@@ -25,6 +29,9 @@ public class RankExtractorItemTags {
         for (Video v : flowVideos) {
             v.setTags(videoTagDict.getOrDefault(v.getVideoId(), new ArrayList<>()));
         }
+        for (Video v : douHotFlowPoolVideos) {
+            v.setTags(videoTagDict.getOrDefault(v.getVideoId(), new ArrayList<>()));
+        }
     }
 
     public static Map<Long, List<String>> getVideoTags(RedisTemplate<String, String> redisHelper, List<Long> videoIds) {

+ 12 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/processor/RankProcessorTagFilter.java

@@ -6,7 +6,7 @@ import com.tzld.piaoquan.recommend.server.util.ProbabilityCalculator;
 import java.util.*;
 public class RankProcessorTagFilter {
 
-    public static void processor(List<Video> rov, List<Video> flow, Map<String, Map<String, String>> rules) {
+    public static void processor(List<Video> rov, List<Video> flow,List<Video> douHot, Map<String, Map<String, String>> rules) {
 
         Set<String> filterTags = new HashSet<>();
         Random random = new Random();
@@ -56,6 +56,17 @@ public class RankProcessorTagFilter {
                 }
             }
         }
+
+        iterator = douHot.iterator();
+        while (iterator.hasNext()) {
+            Video video = iterator.next();
+            List<String> tags = video.getTags();
+            for (String tag : tags) {
+                if (filterTags.contains(tag)) {
+                    iterator.remove();
+                }
+            }
+        }
     }
 
 }

+ 18 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelBasic.java

@@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.io.BufferedReader;
@@ -38,6 +39,10 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
     @ApolloJsonValue("${RankReduceByMergeCateConfig:{}}")
     private Map<String, Map<String, List<Map<String, String>>>> rankReduceConfig = new HashMap<>();
 
+
+    @Value("${new.flow.pool.select.rate:1}")
+    private double newFlowPoolSelectRate;
+
     String CLASS_NAME = this.getClass().getSimpleName();
 
     public void duplicate(Set<Long> setVideo, List<Video> videos) {
@@ -53,7 +58,7 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
     }
 
     @Override
-    public RankResult mergeAndSort(RankParam param, List<Video> rovVideos, List<Video> flowVideos) {
+    public RankResult mergeAndSort(RankParam param, List<Video> rovVideos, List<Video> flowVideos, List<Video> douHotFlowPoolVideos) {
 
         // 1 兜底策略,rov池子不足时,用冷启池填补。直接返回。
         if (CollectionUtils.isEmpty(rovVideos)) {
@@ -90,11 +95,11 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         // 3 标签读取
         if (rulesMap != null && !rulesMap.isEmpty()) {
             RankExtractorItemTags extractorItemTags = new RankExtractorItemTags(this.redisTemplate);
-            extractorItemTags.processor(rovVideos, flowVideos);
+            extractorItemTags.processor(rovVideos, flowVideos, douHotFlowPoolVideos);
         }
         // 6 合并结果时间卡控
         if (rulesMap != null && !rulesMap.isEmpty()) {
-            RankProcessorTagFilter.processor(rovVideos, flowVideos, rulesMap);
+            RankProcessorTagFilter.processor(rovVideos, flowVideos, douHotFlowPoolVideos, rulesMap);
         }
 
 
@@ -127,6 +132,12 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
                 } else {
                     break;
                 }
+            } else if (this.isInsertDouHotFlowPoolVideo()) {
+                if (flowPoolIndex < douHotFlowPoolVideos.size()) {
+                    result.add(douHotFlowPoolVideos.get(flowPoolIndex++));
+                } else {
+                    break;
+                }
             } else {
                 if (rovPoolIndex < rovVideos.size()) {
                     result.add(rovVideos.get(rovPoolIndex++));
@@ -353,4 +364,8 @@ public abstract class RankStrategy4RegionMergeModelBasic extends RankService {
         return vor;
     }
 
+    private boolean isInsertDouHotFlowPoolVideo() {
+        double rand = RandomUtils.nextDouble(0, 1);
+        return rand <= newFlowPoolSelectRate;
+    }
 }

+ 85 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java

@@ -0,0 +1,85 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterService;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Slf4j
+@Component
+public class DouHotFlowPoolRecallStrategy implements RecallStrategy {
+
+    @Autowired
+    @Qualifier("redisTemplate")
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    private FilterService filterService;
+
+    @Value("${dou.hot.recall.video.count:20}")
+    private Integer douHotRecallVideoCnt;
+
+    public static final String PUSH_FROM = "recall_strategy_hotspot";
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        if (StringUtils.isBlank(param.getProvince())) {
+            return Collections.emptyList();
+        }
+
+        String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, param.getProvince(), "1");
+        log.info("[DouHot recall redisKey] {}", redisKey);
+        Set<String> redisValues = redisTemplate.opsForSet().members(redisKey);
+        if (CollectionUtils.isEmpty(redisValues)) {
+            log.error("[DouHot recall is empty]");
+            return Collections.emptyList();
+        }
+
+        Map<Long, String> videoIdAndFlowPoolMap = new HashMap<>(redisValues.size());
+        for (String value : redisValues) {
+            String[] split = value.split("-");
+            if (split.length != 2) {
+                continue;
+            }
+            videoIdAndFlowPoolMap.put(Long.valueOf(split[0]), split[1]);
+        }
+
+        FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(videoIdAndFlowPoolMap.keySet()));
+        filterService.filter(filterParam);
+        // 对视频随机打断
+        Collections.shuffle(filterParam.getVideoIds());
+
+        int size = Math.min(filterParam.getVideoIds().size(), douHotRecallVideoCnt);
+        List<Long> subList = filterParam.getVideoIds().subList(0, size);
+        List<Video> videos = new ArrayList<>(subList.size());
+        for (Long videoId : subList) {
+            Video video = new Video();
+            video.setVideoId(videoId);
+            video.setFlowPool(videoIdAndFlowPoolMap.get(videoId));
+            video.setPushFrom(pushFrom());
+            // 热点宝供给目前只有一层,所以写死
+            video.setLevel("1");
+            videos.add(video);
+        }
+
+        return videos;
+    }
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FROM;
+    }
+}