Browse Source

feat:热点宝召回修改为zset存储

zhaohaipeng 1 week ago
parent
commit
eea331aba4

+ 18 - 12
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/flowpool/FlowPoolService.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.flowpool;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -14,6 +15,7 @@ import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMapping;
 import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMappingRepository;
 import com.tzld.piaoquan.recommend.server.repository.DouHotVideoPortraitDataRepository;
 import com.tzld.piaoquan.recommend.server.repository.DouHotVideoProvince;
+import com.tzld.piaoquan.recommend.server.util.RecallUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -190,8 +192,8 @@ public class FlowPoolService {
                 .map(FlowPoolVideoInfo::getVideoId)
                 .collect(Collectors.toList());
 
-        // 获取视频对应的省份信息
-        Map<Long, List<String>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
+        // 获取视频对应的省份信息, video -> {省份: tgi}
+        Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
         log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
 
         Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
@@ -199,9 +201,9 @@ public class FlowPoolService {
         // 获取省份与对应视频的映射
         Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
                 .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
-        for (Map.Entry<Long, List<String>> entry : allVideoAndProvinceMap.entrySet()) {
+        for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
             Long videoId = entry.getKey();
-            List<String> provinceList = entry.getValue();
+            Set<String> provinceList = entry.getValue().keySet();
             for (String province : provinceList) {
                 List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
                 flowPoolVideoInfos.addAll(videoIdList.get(videoId));
@@ -227,7 +229,13 @@ public class FlowPoolService {
                     continue;
                 }
 
-                redisTemplate.opsForSet().add(redisKey, item);
+                // 获取视频在某个省份的tgi,作为视频的分数
+                double score = 0;
+                if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
+                    score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
+                }
+
+                redisTemplate.opsForZSet().add(redisKey, item, score);
 
                 String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
                 redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
@@ -240,7 +248,7 @@ public class FlowPoolService {
 
     }
 
-    private Map<Long, List<String>> findAllVideoAndProvinceMap(List<Long> videoIds) {
+    private Map<Long, Map<String, Double>> findAllVideoAndProvinceMap(List<Long> videoIds) {
         // 获取票圈视频ID与热点宝vid的映射
         List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
         Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
@@ -255,7 +263,7 @@ public class FlowPoolService {
                 .collect(Collectors.toList());
 
         // 按vid和name分组,并取每组TGI倒排前10条
-        Map<String, List<String>> vidAndProvinceListMap = douHotVideoProvince.stream()
+        Map<String, Map<String, Double>> vidAndProvinceListMap = douHotVideoProvince.stream()
                 .collect(Collectors.groupingBy(
                         DouHotVideoProvince::getVid,  // 按 vid 分组
                         Collectors.collectingAndThen(
@@ -263,13 +271,11 @@ public class FlowPoolService {
                                 list -> list.stream()
                                         .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
                                         .limit(10) // 取前10个
-                                        .map(DouHotVideoProvince::getName) // 只取 name 字段
-                                        .collect(Collectors.toList())
+                                        .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi))
                         )
                 ));
-        vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, value));
-
-        Map<Long, List<String>> resultMap = new HashMap<>(videoIdAndVidMap.size());
+        vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, JSON.toJSONString(value)));
+        Map<Long, Map<String, Double>> resultMap = new HashMap<>(videoIdAndVidMap.size());
         for (Map.Entry<Long, String> entry : videoIdAndVidMap.entrySet()) {
             Long videoId = entry.getKey();
             String vid = entry.getValue();

+ 12 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/DouHotFlowPoolRecallStrategy.java

@@ -8,6 +8,7 @@ import com.tzld.piaoquan.recommend.server.service.filter.FilterService;
 import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.util.RecallUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -15,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ZSetOperations;
 import org.springframework.stereotype.Component;
 
 import java.util.*;
@@ -41,16 +43,23 @@ public class DouHotFlowPoolRecallStrategy implements RecallStrategy {
             return Collections.emptyList();
         }
 
-        String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, param.getProvince(), "1");
+        String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, RecallUtils.douHotProvinceConvert(param.getProvince()), "1");
         log.info("[DouHot recall redisKey] {}", redisKey);
-        Set<String> redisValues = redisTemplate.opsForSet().members(redisKey);
+        Set<ZSetOperations.TypedTuple<String>> redisValues = redisTemplate.opsForZSet().rangeWithScores(redisKey, 0, 10000000);
+
+
         if (CollectionUtils.isEmpty(redisValues)) {
             log.error("[DouHot recall is empty]");
             return Collections.emptyList();
         }
 
         Map<Long, String> videoIdAndFlowPoolMap = new HashMap<>(redisValues.size());
-        for (String value : redisValues) {
+
+        for (ZSetOperations.TypedTuple<String> redisValue : redisValues) {
+            String value = redisValue.getValue();
+            if (StringUtils.isBlank(value)) {
+                continue;
+            }
             String[] split = value.split("-");
             if (split.length != 2) {
                 continue;

+ 17 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/RecallUtils.java

@@ -5,6 +5,7 @@ import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.*;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -70,4 +71,20 @@ public class RecallUtils {
             setVideo.addAll(list.stream().map(Video::getVideoId).collect(Collectors.toSet()));
         }
     }
+
+    public static String douHotProvinceConvert(String province) {
+        if (StringUtils.isBlank(province)) {
+            return "";
+        }
+        if (StringUtils.startsWith(province, "西藏")) {
+            return "西藏";
+        }
+        if (StringUtils.startsWith(province, "新疆")) {
+            return "新疆";
+        }
+        if (StringUtils.endsWith(province, "市")) {
+            return province.replaceAll("市$", "");
+        }
+        return province.replaceAll("省$", "");
+    }
 }