浏览代码

feat:热点宝召回修改

zhaohaipeng 1 周之前
父节点
当前提交
bf5d9e41bb

+ 3 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/feign/model/FlowPoolVideoInfo.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.feign.model;
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.Data;
 import lombok.ToString;
 
@@ -20,4 +21,6 @@ public class FlowPoolVideoInfo {
     private Integer flowPoolLevelId;
 
     private Integer level;
+
+    private JSONObject attribute;
 }

+ 178 - 83
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/flowpool/FlowPoolService.java

@@ -184,74 +184,127 @@ public class FlowPoolService {
         List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
         log.info("[DouHot video size]: {}", allDouHotVideo.size());
 
-        // 获取视频对应的可分发数量, videoId-flowpool -> viewCnt
-        Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
-        log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
+        // 获取省份与视频列表的映射,省份 -> [video]
+        Map<String, List<FlowPoolVideoInfo>> provinceAndVideoMap = this.findAllVideoAndProvinceMapByAttribute(allDouHotVideo);
+        provinceAndVideoMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
 
-        List<Long> allVideoId = allDouHotVideo.stream()
-                .map(FlowPoolVideoInfo::getVideoId)
-                .collect(Collectors.toList());
-
-        // 获取视频对应的省份信息, video -> {省份: tgi}
-        Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
-        log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
-
-        Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
-
-        // 获取省份与对应视频的映射, 省份 -> [视频信息列表]
-        Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
-                .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
-
-        for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
-            Long videoId = entry.getKey();
-            Set<String> provinceList = entry.getValue().keySet();
-            for (String province : provinceList) {
-                if (videoIdList.containsKey(videoId)) {
-                    List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
-                    flowPoolVideoInfos.addAll(videoIdList.get(videoId));
-                }
-            }
-        }
-        provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
+        // 获取某个省份下所有视频的分数, 省份 -> video -> score
+        Map<String, Map<Long, Double>> videoInProvinceScore = this.findVideoInProvinceScore(allDouHotVideo);
 
-        // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
-        for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
-            String province = entry.getKey();
-            List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
+        // 获取视频的可分发数,videoId-flowPool -> viewCnt
+        Map<String, Integer> videoDistributeCount = this.findDouHotVideoDistributeCount(allDouHotVideo);
+        log.info("[DouHot view distribute count size]: {}", videoDistributeCount.size());
 
-            String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
-            log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
-            redisTemplate.delete(redisKey);
+        this.douHotVideoWriteRedis(provinceAndVideoMap, videoInProvinceScore, videoDistributeCount);
+    }
 
-            // 将视频添加到Redis缓存中,并设置可分发数量
-            for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
-                String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
 
-                // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
-                if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
-                    continue;
-                }
+    // public void syncDouHotFlowPoolVideoV1() {
+    //     // 获取流量池中 全部热点宝视频
+    //     List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
+    //     log.info("[DouHot video size]: {}", allDouHotVideo.size());
+    //
+    //     List<Long> allVideoId = allDouHotVideo.stream()
+    //             .map(FlowPoolVideoInfo::getVideoId)
+    //             .collect(Collectors.toList());
+    //
+    //     // 获取视频对应的省份信息, video -> {省份: tgi}
+    //     Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
+    //     log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
+    //
+    //     Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
+    //
+    //     // 获取省份与对应视频的映射, 省份 -> [视频信息列表]
+    //     Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
+    //             .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
+    //
+    //     for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
+    //         Long videoId = entry.getKey();
+    //         Set<String> provinceList = entry.getValue().keySet();
+    //         for (String province : provinceList) {
+    //             if (videoIdList.containsKey(videoId)) {
+    //                 List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
+    //                 flowPoolVideoInfos.addAll(videoIdList.get(videoId));
+    //             }
+    //         }
+    //     }
+    //     provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
+    //
+    //     // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
+    //     // 获取视频对应的可分发数量, videoId-flowpool -> viewCnt
+    //
+    //     Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
+    //     log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
+    //
+    //     for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
+    //         String province = entry.getKey();
+    //         List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
+    //
+    //         String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
+    //         log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
+    //         redisTemplate.delete(redisKey);
+    //
+    //         // 将视频添加到Redis缓存中,并设置可分发数量
+    //         for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
+    //             String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+    //
+    //             // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
+    //             if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
+    //                 continue;
+    //             }
+    //
+    //             // 获取视频在某个省份的tgi,作为视频的分数
+    //             double score = 0;
+    //             if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
+    //                 score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
+    //             }
+    //
+    //             redisTemplate.opsForZSet().add(redisKey, item, score);
+    //
+    //             String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+    //             redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
+    //             redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
+    //         }
+    //
+    //         redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
+    //     }
+    //
+    // }
 
-                // 获取视频在某个省份的tgi,作为视频的分数
-                double score = 0;
-                if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
-                    score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
-                }
 
-                redisTemplate.opsForZSet().add(redisKey, item, score);
+    private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
+        List<FlowPoolVideoInfo> result = new ArrayList<>();
+        int pageNum = 1;
+        while (true) {
+            JSONObject paramJson = new JSONObject();
+            paramJson.put("flowPoolId", douHotFlowPoolId);
+            paramJson.put("appType", 0);
+            paramJson.put("pageSize", 1000);
+            paramJson.put("pageNum", pageNum++);
 
-                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
-                redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
-                redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
+            log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
+            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
+            if (0 != response.getCode()) {
+                log.error("[get DouHot flow pool video request error] responseJson: {}", response);
+                break;
             }
 
-            redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
+            if (CollectionUtils.isEmpty(response.getData())) {
+                log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
+                break;
+            }
+            result.addAll(response.getData());
         }
-
-
+        return result;
     }
 
-    private Map<Long, Map<String, Double>> findAllVideoAndProvinceMap(List<Long> videoIds) {
+    // 根据TGI获取流量池视频和省份的对应关系
+    private Map<Long, Map<String, Double>> findAllVideoAndProvinceMapByTGI(List<FlowPoolVideoInfo> allDouHotVideo) {
+        List<Long> videoIds = allDouHotVideo.stream()
+                .map(FlowPoolVideoInfo::getVideoId)
+                .distinct()
+                .collect(Collectors.toList());
+
         // 获取票圈视频ID与热点宝vid的映射
         List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
         Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
@@ -268,15 +321,15 @@ public class FlowPoolService {
         // 按vid和name分组,并取每组TGI倒排前10条
         Map<String, Map<String, Double>> vidAndProvinceListMap = douHotVideoProvince.stream()
                 .collect(Collectors.groupingBy(
-                        DouHotVideoProvince::getVid,  // 按 vid 分组
-                        Collectors.collectingAndThen(
-                                Collectors.toList(),
-                                list -> list.stream()
-                                        .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
-                                        .limit(3) // 取前10个
-                                        .sorted(Comparator.comparingDouble(DouHotVideoProvince::getTgi).reversed()) // 按 TGI 降序
-                                        .limit(1) // 取第一个
-                                        .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi, (o1, o2) -> o1)))
+                                DouHotVideoProvince::getVid,  // 按 vid 分组
+                                Collectors.collectingAndThen(
+                                        Collectors.toList(),
+                                        list -> list.stream()
+                                                .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
+                                                .limit(3) // 取前10个
+                                                .sorted(Comparator.comparingDouble(DouHotVideoProvince::getTgi).reversed()) // 按 TGI 降序
+                                                .limit(1) // 取第一个
+                                                .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi, (o1, o2) -> o1)))
                         )
                 );
         vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, JSON.toJSONString(value)));
@@ -292,30 +345,32 @@ public class FlowPoolService {
         return resultMap;
     }
 
-    private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
-        List<FlowPoolVideoInfo> result = new ArrayList<>();
-        int pageNum = 1;
-        while (true) {
-            JSONObject paramJson = new JSONObject();
-            paramJson.put("flowPoolId", douHotFlowPoolId);
-            paramJson.put("appType", 0);
-            paramJson.put("pageSize", 1000);
-            paramJson.put("pageNum", pageNum++);
+    // 根据属性获取流量池视频和省份的对应关系
+    private Map<String, List<FlowPoolVideoInfo>> findAllVideoAndProvinceMapByAttribute(List<FlowPoolVideoInfo> allDouHotVideo) {
+        Map<String, List<FlowPoolVideoInfo>> resultMap = new HashMap<>(allDouHotVideo.size());
 
-            log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
-            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
-            if (0 != response.getCode()) {
-                log.error("[get DouHot flow pool video request error] responseJson: {}", response);
-                break;
+        for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
+            if (MapUtils.isEmpty(flowPoolVideoInfo.getAttribute())) {
+                continue;
             }
+            String province = flowPoolVideoInfo.getAttribute().getString("province");
+            resultMap.computeIfAbsent(province, s -> new ArrayList<>()).add(flowPoolVideoInfo);
+        }
 
-            if (CollectionUtils.isEmpty(response.getData())) {
-                log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
-                break;
+        return resultMap;
+    }
+
+    // 获取某个省份下所有视频的score, 省份 -> video -> score
+    private Map<String, Map<Long, Double>> findVideoInProvinceScore(List<FlowPoolVideoInfo> allDouHotVideo) {
+        Map<String, Map<Long, Double>> resultMap = new HashMap<>(allDouHotVideo.size());
+        for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
+            if (MapUtils.isEmpty(flowPoolVideoInfo.getAttribute())) {
+                continue;
             }
-            result.addAll(response.getData());
+            String province = flowPoolVideoInfo.getAttribute().getString("province");
+            resultMap.computeIfAbsent(province, s -> new HashMap<>()).put(flowPoolVideoInfo.getVideoId(), 1d);
         }
-        return result;
+        return resultMap;
     }
 
     private Map<String, Integer> findDouHotVideoDistributeCount(List<FlowPoolVideoInfo> flowPoolVideoInfos) {
@@ -347,5 +402,45 @@ public class FlowPoolService {
         }
         return distributeCountMap;
     }
+
+    private void douHotVideoWriteRedis(Map<String, List<FlowPoolVideoInfo>> provinceAndVideoMap, Map<String, Map<Long, Double>> provinceAllVideoScore, Map<String, Integer> videoDistributeCount) {
+        if (MapUtils.isEmpty(provinceAllVideoScore) || MapUtils.isEmpty(videoDistributeCount)) {
+            return;
+        }
+        if (MapUtils.isEmpty(provinceAllVideoScore)) {
+            provinceAllVideoScore = Maps.newHashMap();
+        }
+        for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoMap.entrySet()) {
+            String province = entry.getKey();
+            List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
+
+            String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
+            log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
+            redisTemplate.delete(redisKey);
+
+            Map<Long, Double> videoScoreInProvince = provinceAllVideoScore.getOrDefault(province, new HashMap<>());
+
+            // 将视频添加到Redis缓存中,并设置可分发数量
+            for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
+                String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+
+                // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
+                if (!videoDistributeCount.containsKey(item) || videoDistributeCount.get(item) <= 0) {
+                    continue;
+                }
+
+                // 获取视频在这个省份里的分数
+                double score = videoScoreInProvince.getOrDefault(flowPoolVideoInfo.getVideoId(), 0d);
+
+                redisTemplate.opsForZSet().add(redisKey, item, score);
+
+                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
+                redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoDistributeCount.get(item)));
+                redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
+            }
+
+            redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
+        }
+    }
 }