|
@@ -10,7 +10,10 @@ import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolResponse;
|
|
|
import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolVideoInfo;
|
|
|
import com.tzld.piaoquan.recommend.server.model.TripleConsumer;
|
|
|
import com.tzld.piaoquan.recommend.server.model.Video;
|
|
|
-import com.tzld.piaoquan.recommend.server.repository.*;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMapping;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMappingRepository;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoPortraitDataRepository;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoProvince;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.collections4.MapUtils;
|
|
@@ -186,23 +189,24 @@ public class FlowPoolService {
|
|
|
.map(FlowPoolVideoInfo::getVideoId)
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
- // 补充省份信息
|
|
|
- Map<Long, String> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
|
|
|
- for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
|
|
|
- if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
|
|
|
- flowPoolVideoInfo.setProvince(allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()));
|
|
|
- }
|
|
|
- }
|
|
|
+ // 获取视频对应的省份信息
|
|
|
+ Map<Long, List<String>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
|
|
|
+ log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
|
|
|
|
|
|
- // 过滤掉省份不存在的数据
|
|
|
- allDouHotVideo = allDouHotVideo.stream()
|
|
|
- .filter(i -> StringUtils.isNotBlank(i.getProvince()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- log.info("[DouHot filter empty province after video size]: {}", allDouHotVideo.size());
|
|
|
+ Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
|
|
|
|
|
|
-
|
|
|
- Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = allDouHotVideo.stream()
|
|
|
- .collect(Collectors.groupingBy(FlowPoolVideoInfo::getProvince, Collectors.toList()));
|
|
|
+ // 获取省份与对应视频的映射
|
|
|
+ Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
|
|
|
+ .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
|
|
|
+ for (Map.Entry<Long, List<String>> entry : allVideoAndProvinceMap.entrySet()) {
|
|
|
+ Long videoId = entry.getKey();
|
|
|
+ List<String> provinceList = entry.getValue();
|
|
|
+ for (String province : provinceList) {
|
|
|
+ List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
|
|
|
+ flowPoolVideoInfos.addAll(videoIdList.get(videoId));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
|
|
|
|
|
|
// 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
|
|
|
for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
|
|
@@ -235,7 +239,7 @@ public class FlowPoolService {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private Map<Long, String> findAllVideoAndProvinceMap(List<Long> videoIds) {
|
|
|
+ private Map<Long, List<String>> findAllVideoAndProvinceMap(List<Long> videoIds) {
|
|
|
// 获取票圈视频ID与热点宝vid的映射
|
|
|
List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
|
|
|
Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
|
|
@@ -244,19 +248,33 @@ public class FlowPoolService {
|
|
|
|
|
|
// 获取热点宝vid与地域的映射
|
|
|
List<List<String>> vidPartition = Lists.partition(new ArrayList<>(videoIdAndVidMap.values()), 500);
|
|
|
- Map<String, String> vidAndProvinceMap = vidPartition.stream().map(i -> douHotVideoPortraitDataRepository.findAllByVidInAndType(i, 4))
|
|
|
+ List<DouHotVideoProvince> douHotVideoProvince = vidPartition.stream()
|
|
|
+ .map(douHotVideoPortraitDataRepository::findRecordTGIGe100AndRateDesc)
|
|
|
.flatMap(List::stream)
|
|
|
- .collect(Collectors.toMap(DouHotVideoPortraitData::getVid, DouHotVideoPortraitData::getName, (o1, o2) -> o1));
|
|
|
-
|
|
|
- List<DouHotVideoProvince> recordTGIGe100AndRateDesc = douHotVideoPortraitDataRepository.findRecordTGIGe100AndRateDesc(vidPartition.get(0));
|
|
|
- log.info("recordTGIGe100AndRateDesc.size: {}", recordTGIGe100AndRateDesc.size());
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
|
- Map<Long, String> resultMap = new HashMap<>(videoIdAndVidMap.size());
|
|
|
+ // 按vid和name分组,并取每组TGI倒排前10条
|
|
|
+ Map<String, List<String>> vidAndProvinceListMap = douHotVideoProvince.stream()
|
|
|
+ .collect(Collectors.groupingBy(
|
|
|
+ DouHotVideoProvince::getVid,
|
|
|
+ Collectors.mapping(
|
|
|
+ DouHotVideoProvince::getName,
|
|
|
+ Collectors.collectingAndThen(
|
|
|
+ Collectors.toList(),
|
|
|
+ list -> list.stream()
|
|
|
+ .distinct() // 去重
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ));
|
|
|
+ vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, value));
|
|
|
+
|
|
|
+ Map<Long, List<String>> resultMap = new HashMap<>(videoIdAndVidMap.size());
|
|
|
for (Map.Entry<Long, String> entry : videoIdAndVidMap.entrySet()) {
|
|
|
Long videoId = entry.getKey();
|
|
|
String vid = entry.getValue();
|
|
|
- if (vidAndProvinceMap.containsKey(vid)) {
|
|
|
- resultMap.put(videoId, vidAndProvinceMap.get(vid));
|
|
|
+ if (vidAndProvinceListMap.containsKey(vid)) {
|
|
|
+ resultMap.put(videoId, vidAndProvinceListMap.get(vid));
|
|
|
}
|
|
|
}
|
|
|
|