|
@@ -184,74 +184,129 @@ public class FlowPoolService {
|
|
|
List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
|
|
|
log.info("[DouHot video size]: {}", allDouHotVideo.size());
|
|
|
|
|
|
- // 获取视频对应的可分发数量, videoId-flowpool -> viewCnt
|
|
|
- Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
|
|
|
- log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
|
|
|
-
|
|
|
- List<Long> allVideoId = allDouHotVideo.stream()
|
|
|
- .map(FlowPoolVideoInfo::getVideoId)
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- // 获取视频对应的省份信息, video -> {省份: tgi}
|
|
|
- Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
|
|
|
- log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
|
|
|
-
|
|
|
- Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
|
|
|
-
|
|
|
- // 获取省份与对应视频的映射, 省份 -> [视频信息列表]
|
|
|
- Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
|
|
|
- .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
|
|
|
-
|
|
|
- for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
|
|
|
- Long videoId = entry.getKey();
|
|
|
- Set<String> provinceList = entry.getValue().keySet();
|
|
|
- for (String province : provinceList) {
|
|
|
- if (videoIdList.containsKey(videoId)) {
|
|
|
- List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
|
|
|
- flowPoolVideoInfos.addAll(videoIdList.get(videoId));
|
|
|
- }
|
|
|
- }
|
|
|
+ // 获取省份与视频列表的映射,省份 -> [video]
|
|
|
+ Map<String, Map<String, List<FlowPoolVideoInfo>>> levelAndProvinceAndVideoMap = this.findAllVideoAndProvinceMapByAttribute(allDouHotVideo);
|
|
|
+ for (Map.Entry<String, Map<String, List<FlowPoolVideoInfo>>> levelEntry : levelAndProvinceAndVideoMap.entrySet()) {
|
|
|
+ levelAndProvinceAndVideoMap.forEach((key, value) -> log.info("[DouHot province video size]. level: {}, province: {}, video size: {}", levelEntry.getKey(), key, value.size()));
|
|
|
}
|
|
|
- provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
|
|
|
|
|
|
- // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
|
|
|
- for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
|
|
|
- String province = entry.getKey();
|
|
|
- List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
|
|
|
+ // 获取某个省份下所有视频的分数, 省份 -> video -> score
|
|
|
+ Map<String, Map<Long, Double>> videoInProvinceScore = this.findVideoInProvinceScore(allDouHotVideo);
|
|
|
|
|
|
- String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
|
|
|
- log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
|
|
|
- redisTemplate.delete(redisKey);
|
|
|
+ // 获取视频的可分发数,videoId-flowPool -> viewCnt
|
|
|
+ Map<String, Integer> videoDistributeCount = this.findDouHotVideoDistributeCount(allDouHotVideo);
|
|
|
+ log.info("[DouHot view distribute count size]: {}", videoDistributeCount.size());
|
|
|
|
|
|
- // 将视频添加到Redis缓存中,并设置可分发数量
|
|
|
- for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
|
|
|
- String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
+ this.douHotVideoWriteRedis(levelAndProvinceAndVideoMap, videoInProvinceScore, videoDistributeCount);
|
|
|
+ }
|
|
|
|
|
|
- // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
|
|
|
- if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
|
|
|
- // 获取视频在某个省份的tgi,作为视频的分数
|
|
|
- double score = 0;
|
|
|
- if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
|
|
|
- score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
|
|
|
- }
|
|
|
+ // public void syncDouHotFlowPoolVideoV1() {
|
|
|
+ // // 获取流量池中 全部热点宝视频
|
|
|
+ // List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
|
|
|
+ // log.info("[DouHot video size]: {}", allDouHotVideo.size());
|
|
|
+ //
|
|
|
+ // List<Long> allVideoId = allDouHotVideo.stream()
|
|
|
+ // .map(FlowPoolVideoInfo::getVideoId)
|
|
|
+ // .collect(Collectors.toList());
|
|
|
+ //
|
|
|
+ // // 获取视频对应的省份信息, video -> {省份: tgi}
|
|
|
+ // Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
|
|
|
+ // log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
|
|
|
+ //
|
|
|
+ // Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
|
|
|
+ //
|
|
|
+ // // 获取省份与对应视频的映射, 省份 -> [视频信息列表]
|
|
|
+ // Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
|
|
|
+ // .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
|
|
|
+ //
|
|
|
+ // for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
|
|
|
+ // Long videoId = entry.getKey();
|
|
|
+ // Set<String> provinceList = entry.getValue().keySet();
|
|
|
+ // for (String province : provinceList) {
|
|
|
+ // if (videoIdList.containsKey(videoId)) {
|
|
|
+ // List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
|
|
|
+ // flowPoolVideoInfos.addAll(videoIdList.get(videoId));
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
|
|
|
+ //
|
|
|
+ // // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
|
|
|
+ // // 获取视频对应的可分发数量, videoId-flowpool -> viewCnt
|
|
|
+ //
|
|
|
+ // Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
|
|
|
+ // log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
|
|
|
+ //
|
|
|
+ // for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
|
|
|
+ // String province = entry.getKey();
|
|
|
+ // List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
|
|
|
+ //
|
|
|
+ // String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
|
|
|
+ // log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
|
|
|
+ // redisTemplate.delete(redisKey);
|
|
|
+ //
|
|
|
+ // // 将视频添加到Redis缓存中,并设置可分发数量
|
|
|
+ // for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
|
|
|
+ // String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
+ //
|
|
|
+ // // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
|
|
|
+ // if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // // 获取视频在某个省份的tgi,作为视频的分数
|
|
|
+ // double score = 0;
|
|
|
+ // if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
|
|
|
+ // score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // redisTemplate.opsForZSet().add(redisKey, item, score);
|
|
|
+ //
|
|
|
+ // String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
+ // redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
|
|
|
+ // redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // }
|
|
|
|
|
|
- redisTemplate.opsForZSet().add(redisKey, item, score);
|
|
|
|
|
|
- String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
- redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
|
|
|
- redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
|
|
|
+ List<FlowPoolVideoInfo> result = new ArrayList<>();
|
|
|
+ int pageNum = 1;
|
|
|
+ while (true) {
|
|
|
+ JSONObject paramJson = new JSONObject();
|
|
|
+ paramJson.put("flowPoolId", douHotFlowPoolId);
|
|
|
+ paramJson.put("appType", 0);
|
|
|
+ paramJson.put("pageSize", 1000);
|
|
|
+ paramJson.put("pageNum", pageNum++);
|
|
|
+
|
|
|
+ log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
|
|
|
+ FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
|
|
|
+ if (0 != response.getCode()) {
|
|
|
+ log.error("[get DouHot flow pool video request error] responseJson: {}", response);
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
- redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ if (CollectionUtils.isEmpty(response.getData())) {
|
|
|
+ log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ result.addAll(response.getData());
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
- private Map<Long, Map<String, Double>> findAllVideoAndProvinceMap(List<Long> videoIds) {
|
|
|
+ // 根据TGI获取流量池视频和省份的对应关系
|
|
|
+ private Map<Long, Map<String, Double>> findAllVideoAndProvinceMapByTGI(List<FlowPoolVideoInfo> allDouHotVideo) {
|
|
|
+ List<Long> videoIds = allDouHotVideo.stream()
|
|
|
+ .map(FlowPoolVideoInfo::getVideoId)
|
|
|
+ .distinct()
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
// 获取票圈视频ID与热点宝vid的映射
|
|
|
List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
|
|
|
Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
|
|
@@ -268,15 +323,15 @@ public class FlowPoolService {
|
|
|
// 按vid和name分组,并取每组TGI倒排前10条
|
|
|
Map<String, Map<String, Double>> vidAndProvinceListMap = douHotVideoProvince.stream()
|
|
|
.collect(Collectors.groupingBy(
|
|
|
- DouHotVideoProvince::getVid, // 按 vid 分组
|
|
|
- Collectors.collectingAndThen(
|
|
|
- Collectors.toList(),
|
|
|
- list -> list.stream()
|
|
|
- .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
|
|
|
- .limit(3) // 取前10个
|
|
|
- .sorted(Comparator.comparingDouble(DouHotVideoProvince::getTgi).reversed()) // 按 TGI 降序
|
|
|
- .limit(1) // 取第一个
|
|
|
- .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi, (o1, o2) -> o1)))
|
|
|
+ DouHotVideoProvince::getVid, // 按 vid 分组
|
|
|
+ Collectors.collectingAndThen(
|
|
|
+ Collectors.toList(),
|
|
|
+ list -> list.stream()
|
|
|
+ .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
|
|
|
+ .limit(3) // 取前10个
|
|
|
+ .sorted(Comparator.comparingDouble(DouHotVideoProvince::getTgi).reversed()) // 按 TGI 降序
|
|
|
+ .limit(1) // 取第一个
|
|
|
+ .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi, (o1, o2) -> o1)))
|
|
|
)
|
|
|
);
|
|
|
vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, JSON.toJSONString(value)));
|
|
@@ -292,30 +347,35 @@ public class FlowPoolService {
|
|
|
return resultMap;
|
|
|
}
|
|
|
|
|
|
- private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
|
|
|
- List<FlowPoolVideoInfo> result = new ArrayList<>();
|
|
|
- int pageNum = 1;
|
|
|
- while (true) {
|
|
|
- JSONObject paramJson = new JSONObject();
|
|
|
- paramJson.put("flowPoolId", douHotFlowPoolId);
|
|
|
- paramJson.put("appType", 0);
|
|
|
- paramJson.put("pageSize", 1000);
|
|
|
- paramJson.put("pageNum", pageNum++);
|
|
|
+ // 根据属性获取流量池视频和省份的对应关系
|
|
|
+ private Map<String, Map<String, List<FlowPoolVideoInfo>>> findAllVideoAndProvinceMapByAttribute(List<FlowPoolVideoInfo> allDouHotVideo) {
|
|
|
+ Map<String, Map<String, List<FlowPoolVideoInfo>>> resultMap = new HashMap<>(16);
|
|
|
|
|
|
- log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
|
|
|
- FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
|
|
|
- if (0 != response.getCode()) {
|
|
|
- log.error("[get DouHot flow pool video request error] responseJson: {}", response);
|
|
|
- break;
|
|
|
+ for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
|
|
|
+ if (MapUtils.isEmpty(flowPoolVideoInfo.getAttribute())) {
|
|
|
+ continue;
|
|
|
}
|
|
|
|
|
|
- if (CollectionUtils.isEmpty(response.getData())) {
|
|
|
- log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
|
|
|
- break;
|
|
|
+ Map<String, List<FlowPoolVideoInfo>> provinceMap = resultMap.computeIfAbsent(String.valueOf(flowPoolVideoInfo.getLevel()), l -> new HashMap<>());
|
|
|
+
|
|
|
+ String province = flowPoolVideoInfo.getAttribute().getString("province");
|
|
|
+ provinceMap.computeIfAbsent(province, s -> new ArrayList<>()).add(flowPoolVideoInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ return resultMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取某个省份下所有视频的score, 省份 -> video -> score
|
|
|
+ private Map<String, Map<Long, Double>> findVideoInProvinceScore(List<FlowPoolVideoInfo> allDouHotVideo) {
|
|
|
+ Map<String, Map<Long, Double>> resultMap = new HashMap<>(allDouHotVideo.size());
|
|
|
+ for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
|
|
|
+ if (MapUtils.isEmpty(flowPoolVideoInfo.getAttribute())) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- result.addAll(response.getData());
|
|
|
+ String province = RecallUtils.douHotProvinceConvert(flowPoolVideoInfo.getAttribute().getString("province"));
|
|
|
+ resultMap.computeIfAbsent(province, s -> new HashMap<>()).put(flowPoolVideoInfo.getVideoId(), 1d);
|
|
|
}
|
|
|
- return result;
|
|
|
+ return resultMap;
|
|
|
}
|
|
|
|
|
|
private Map<String, Integer> findDouHotVideoDistributeCount(List<FlowPoolVideoInfo> flowPoolVideoInfos) {
|
|
@@ -347,5 +407,52 @@ public class FlowPoolService {
|
|
|
}
|
|
|
return distributeCountMap;
|
|
|
}
|
|
|
+
|
|
|
+ private void douHotVideoWriteRedis(Map<String, Map<String, List<FlowPoolVideoInfo>>> levelAndProvinceAndVideoMap, Map<String, Map<Long, Double>> provinceAllVideoScore, Map<String, Integer> videoDistributeCount) {
|
|
|
+ if (MapUtils.isEmpty(levelAndProvinceAndVideoMap) || MapUtils.isEmpty(videoDistributeCount)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (MapUtils.isEmpty(provinceAllVideoScore)) {
|
|
|
+ provinceAllVideoScore = Maps.newHashMap();
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Map.Entry<String, Map<String, List<FlowPoolVideoInfo>>> levelEntry : levelAndProvinceAndVideoMap.entrySet()) {
|
|
|
+
|
|
|
+ String level = levelEntry.getKey();
|
|
|
+ Map<String, List<FlowPoolVideoInfo>> provinceAndVideoMap = levelEntry.getValue();
|
|
|
+
|
|
|
+ for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoMap.entrySet()) {
|
|
|
+ String province = entry.getKey();
|
|
|
+ List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
|
|
|
+
|
|
|
+ String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, level);
|
|
|
+ log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
|
|
|
+ redisTemplate.delete(redisKey);
|
|
|
+
|
|
|
+ Map<Long, Double> videoScoreInProvince = provinceAllVideoScore.getOrDefault(province, new HashMap<>());
|
|
|
+
|
|
|
+ // 将视频添加到Redis缓存中,并设置可分发数量
|
|
|
+ for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
|
|
|
+ String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
+
|
|
|
+ // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
|
|
|
+ if (!videoDistributeCount.containsKey(item) || videoDistributeCount.get(item) <= 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取视频在这个省份里的分数
|
|
|
+ double score = videoScoreInProvince.getOrDefault(flowPoolVideoInfo.getVideoId(), 0d);
|
|
|
+
|
|
|
+ redisTemplate.opsForZSet().add(redisKey, item, score);
|
|
|
+
|
|
|
+ String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
|
|
|
+ redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoDistributeCount.get(item)));
|
|
|
+ redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|