|  | @@ -1,22 +1,39 @@
 | 
	
		
			
				|  |  |  package com.tzld.piaoquan.recommend.server.service.flowpool;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSON;
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSONObject;
 | 
	
		
			
				|  |  | +import com.google.common.collect.Lists;
 | 
	
		
			
				|  |  | +import com.google.common.collect.Maps;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.feign.FlowPoolFeign;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolResponse;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolVideoInfo;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.model.TripleConsumer;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.model.Video;
 | 
	
		
			
				|  |  | -import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMapping;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMappingRepository;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.repository.DouHotVideoPortraitDataRepository;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.repository.DouHotVideoProvince;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.recommend.server.util.RecallUtils;
 | 
	
		
			
				|  |  |  import lombok.extern.slf4j.Slf4j;
 | 
	
		
			
				|  |  |  import org.apache.commons.collections4.CollectionUtils;
 | 
	
		
			
				|  |  |  import org.apache.commons.collections4.MapUtils;
 | 
	
		
			
				|  |  | +import org.apache.commons.lang3.StringUtils;
 | 
	
		
			
				|  |  |  import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  |  import org.springframework.beans.factory.annotation.Qualifier;
 | 
	
		
			
				|  |  | +import org.springframework.beans.factory.annotation.Value;
 | 
	
		
			
				|  |  |  import org.springframework.data.redis.core.RedisTemplate;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.util.*;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ExecutorService;
 | 
	
		
			
				|  |  | +import java.util.concurrent.TimeUnit;
 | 
	
		
			
				|  |  |  import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum.*;
 | 
	
		
			
				|  |  | -import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.*;
 | 
	
		
			
				|  |  | +import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT;
 | 
	
		
			
				|  |  | +import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT_V2;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /**
 | 
	
		
			
				|  |  |   * @author dyp
 | 
	
	
		
			
				|  | @@ -30,6 +47,17 @@ public class FlowPoolService {
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      private FlowPoolConfigService flowPoolConfigService;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private DouHotVideoMappingRepository douHotVideoMappingRepository;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private DouHotVideoPortraitDataRepository douHotVideoPortraitDataRepository;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Value("${dou.hot.flow.pool.id:18}")
 | 
	
		
			
				|  |  | +    private Integer douHotFlowPoolId;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private FlowPoolFeign flowPoolFeign;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private final String localDistributeCountFormat = "flow:pool:local:distribute:count:%s:%s";
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public final String valueFormat = "%s-%s";
 | 
	
	
		
			
				|  | @@ -124,5 +152,195 @@ public class FlowPoolService {
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public void asyncHandleDouHotCache(List<Video> videos, String province) {
 | 
	
		
			
				|  |  | +        if (StringUtils.isBlank(province) || CollectionUtils.isEmpty(videos)) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        pool.execute(() -> {
 | 
	
		
			
				|  |  | +            List<String> needRemoveVideoIds = new ArrayList<>();
 | 
	
		
			
				|  |  | +            for (Video video : videos) {
 | 
	
		
			
				|  |  | +                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, video.getVideoId(), video.getFlowPool());
 | 
	
		
			
				|  |  | +                Long count = redisTemplate.opsForValue().decrement(distributeKey);
 | 
	
		
			
				|  |  | +                if (Objects.isNull(count) || count <= 0) {
 | 
	
		
			
				|  |  | +                    redisTemplate.delete(distributeKey);
 | 
	
		
			
				|  |  | +                    needRemoveVideoIds.add(String.format("%d-%s", video.getVideoId(), video.getFlowPool()));
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            if (CollectionUtils.isEmpty(needRemoveVideoIds)) {
 | 
	
		
			
				|  |  | +                return;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 从流量池缓存中移除达到分发次数限制的视频ID
 | 
	
		
			
				|  |  | +            String itemKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
 | 
	
		
			
				|  |  | +            log.info("[DouHot view distribute count remove cache] key: {}, item: {}", itemKey, needRemoveVideoIds);
 | 
	
		
			
				|  |  | +            redisTemplate.opsForSet().remove(itemKey, needRemoveVideoIds.toArray());
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public void syncDouHotFlowPoolVideo() {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 获取流量池中 全部热点宝视频
 | 
	
		
			
				|  |  | +        List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
 | 
	
		
			
				|  |  | +        log.info("[DouHot video size]: {}", allDouHotVideo.size());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 获取视频对应的可分发数量
 | 
	
		
			
				|  |  | +        Map<String, Integer> videoAndDistributeCountMap = this.findDouHotVideoDistributeCount(allDouHotVideo);
 | 
	
		
			
				|  |  | +        log.info("[DouHot view distribute count size]: {}", videoAndDistributeCountMap.size());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<Long> allVideoId = allDouHotVideo.stream()
 | 
	
		
			
				|  |  | +                .map(FlowPoolVideoInfo::getVideoId)
 | 
	
		
			
				|  |  | +                .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 获取视频对应的省份信息, video -> {省份: tgi}
 | 
	
		
			
				|  |  | +        Map<Long, Map<String, Double>> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
 | 
	
		
			
				|  |  | +        log.info("[DouHot video and province mapping size]: {}", allVideoAndProvinceMap.size());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = new HashMap<>(allVideoAndProvinceMap.size());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 获取省份与对应视频的映射
 | 
	
		
			
				|  |  | +        Map<Long, List<FlowPoolVideoInfo>> videoIdList = allDouHotVideo.stream()
 | 
	
		
			
				|  |  | +                .collect(Collectors.groupingBy(FlowPoolVideoInfo::getVideoId));
 | 
	
		
			
				|  |  | +        for (Map.Entry<Long, Map<String, Double>> entry : allVideoAndProvinceMap.entrySet()) {
 | 
	
		
			
				|  |  | +            Long videoId = entry.getKey();
 | 
	
		
			
				|  |  | +            Set<String> provinceList = entry.getValue().keySet();
 | 
	
		
			
				|  |  | +            for (String province : provinceList) {
 | 
	
		
			
				|  |  | +                List<FlowPoolVideoInfo> flowPoolVideoInfos = provinceAndVideoListMap.computeIfAbsent(province, s -> new ArrayList<>());
 | 
	
		
			
				|  |  | +                flowPoolVideoInfos.addAll(videoIdList.get(videoId));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        provinceAndVideoListMap.forEach((key, value) -> log.info("[DouHot province video size]: province: {}, video size: {}", key, value.size()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 将每个省份的数据写入Redis,并同步写入每个视频在对应流量池中的可分发数量
 | 
	
		
			
				|  |  | +        for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
 | 
	
		
			
				|  |  | +            String province = entry.getKey();
 | 
	
		
			
				|  |  | +            List<FlowPoolVideoInfo> flowPoolVideoInfos = entry.getValue();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
 | 
	
		
			
				|  |  | +            log.info("[DouHot item redis key]: redisKey: {}, video size: {}", redisKey, flowPoolVideoInfos.size());
 | 
	
		
			
				|  |  | +            redisTemplate.delete(redisKey);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 将视频添加到Redis缓存中,并设置可分发数量
 | 
	
		
			
				|  |  | +            for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
 | 
	
		
			
				|  |  | +                String item = String.format("%d-%s", flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // 如果剩余的可分发数量不存在或者小于为 则不添加到Redis中
 | 
	
		
			
				|  |  | +                if (!videoAndDistributeCountMap.containsKey(item) || videoAndDistributeCountMap.get(item) <= 0) {
 | 
	
		
			
				|  |  | +                    continue;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // 获取视频在某个省份的tgi,作为视频的分数
 | 
	
		
			
				|  |  | +                double score = 0;
 | 
	
		
			
				|  |  | +                if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
 | 
	
		
			
				|  |  | +                    score = allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()).getOrDefault(province, 0d);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                redisTemplate.opsForZSet().add(redisKey, item, score);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                String distributeKey = String.format(RedisKeyConstants.DouHot.LOCAL_DISTRIBUTE_KEY_FORMAT, flowPoolVideoInfo.getVideoId(), flowPoolVideoInfo.getFlowPool());
 | 
	
		
			
				|  |  | +                redisTemplate.opsForValue().set(distributeKey, String.valueOf(videoAndDistributeCountMap.get(item)));
 | 
	
		
			
				|  |  | +                redisTemplate.expire(distributeKey, 24 * 60 * 60, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private Map<Long, Map<String, Double>> findAllVideoAndProvinceMap(List<Long> videoIds) {
 | 
	
		
			
				|  |  | +        // 获取票圈视频ID与热点宝vid的映射
 | 
	
		
			
				|  |  | +        List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
 | 
	
		
			
				|  |  | +        Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
 | 
	
		
			
				|  |  | +                .flatMap(List::stream)
 | 
	
		
			
				|  |  | +                .collect(Collectors.toMap(DouHotVideoMapping::getVideoId, DouHotVideoMapping::getVid, (o1, o2) -> o1));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 获取热点宝vid与地域的映射
 | 
	
		
			
				|  |  | +        List<List<String>> vidPartition = Lists.partition(new ArrayList<>(videoIdAndVidMap.values()), 500);
 | 
	
		
			
				|  |  | +        List<DouHotVideoProvince> douHotVideoProvince = vidPartition.stream()
 | 
	
		
			
				|  |  | +                .map(douHotVideoPortraitDataRepository::findRecordTGIGe100AndRateDesc)
 | 
	
		
			
				|  |  | +                .flatMap(List::stream)
 | 
	
		
			
				|  |  | +                .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 按vid和name分组,并取每组TGI倒排前10条
 | 
	
		
			
				|  |  | +        Map<String, Map<String, Double>> vidAndProvinceListMap = douHotVideoProvince.stream()
 | 
	
		
			
				|  |  | +                .collect(Collectors.groupingBy(
 | 
	
		
			
				|  |  | +                        DouHotVideoProvince::getVid,  // 按 vid 分组
 | 
	
		
			
				|  |  | +                        Collectors.collectingAndThen(
 | 
	
		
			
				|  |  | +                                Collectors.toList(),
 | 
	
		
			
				|  |  | +                                list -> list.stream()
 | 
	
		
			
				|  |  | +                                        .sorted(Comparator.comparingDouble(DouHotVideoProvince::getRate).reversed()) // 按 占比 降序
 | 
	
		
			
				|  |  | +                                        .limit(10) // 取前10个
 | 
	
		
			
				|  |  | +                                        .collect(Collectors.toMap(i -> RecallUtils.douHotProvinceConvert(i.getName()), DouHotVideoProvince::getTgi))
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  | +                ));
 | 
	
		
			
				|  |  | +        vidAndProvinceListMap.forEach((key, value) -> log.info("[DouHot vid and province mapping]: vid: {}, province: {}", key, JSON.toJSONString(value)));
 | 
	
		
			
				|  |  | +        Map<Long, Map<String, Double>> resultMap = new HashMap<>(videoIdAndVidMap.size());
 | 
	
		
			
				|  |  | +        for (Map.Entry<Long, String> entry : videoIdAndVidMap.entrySet()) {
 | 
	
		
			
				|  |  | +            Long videoId = entry.getKey();
 | 
	
		
			
				|  |  | +            String vid = entry.getValue();
 | 
	
		
			
				|  |  | +            if (vidAndProvinceListMap.containsKey(vid)) {
 | 
	
		
			
				|  |  | +                resultMap.put(videoId, vidAndProvinceListMap.get(vid));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return resultMap;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
 | 
	
		
			
				|  |  | +        List<FlowPoolVideoInfo> result = new ArrayList<>();
 | 
	
		
			
				|  |  | +        int pageNum = 0;
 | 
	
		
			
				|  |  | +        while (true) {
 | 
	
		
			
				|  |  | +            JSONObject paramJson = new JSONObject();
 | 
	
		
			
				|  |  | +            paramJson.put("flowPoolId", douHotFlowPoolId);
 | 
	
		
			
				|  |  | +            paramJson.put("appType", 0);
 | 
	
		
			
				|  |  | +            paramJson.put("pageSize", 1000);
 | 
	
		
			
				|  |  | +            paramJson.put("pageNum", pageNum++);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
 | 
	
		
			
				|  |  | +            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
 | 
	
		
			
				|  |  | +            if (0 != response.getCode()) {
 | 
	
		
			
				|  |  | +                log.error("[get DouHot flow pool video request error] responseJson: {}", response);
 | 
	
		
			
				|  |  | +                break;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            if (CollectionUtils.isEmpty(response.getData())) {
 | 
	
		
			
				|  |  | +                log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
 | 
	
		
			
				|  |  | +                break;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            result.addAll(response.getData());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return result;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private Map<String, Integer> findDouHotVideoDistributeCount(List<FlowPoolVideoInfo> flowPoolVideoInfos) {
 | 
	
		
			
				|  |  | +        List<JSONObject> paramJsonList = Lists.newArrayList();
 | 
	
		
			
				|  |  | +        for (FlowPoolVideoInfo flowPoolVideoInfo : flowPoolVideoInfos) {
 | 
	
		
			
				|  |  | +            JSONObject paramJson = new JSONObject();
 | 
	
		
			
				|  |  | +            paramJson.put("videoId", flowPoolVideoInfo.getVideoId());
 | 
	
		
			
				|  |  | +            paramJson.put("flowPool", flowPoolVideoInfo.getFlowPool());
 | 
	
		
			
				|  |  | +            paramJsonList.add(paramJson);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<FlowPoolVideoInfo> remainFlowPoolVideoInfos = new ArrayList<>(flowPoolVideoInfos.size());
 | 
	
		
			
				|  |  | +        List<List<JSONObject>> partition = Lists.partition(paramJsonList, 10);
 | 
	
		
			
				|  |  | +        for (List<JSONObject> videos : partition) {
 | 
	
		
			
				|  |  | +            JSONObject paramJson = new JSONObject();
 | 
	
		
			
				|  |  | +            paramJson.put("videos", videos);
 | 
	
		
			
				|  |  | +            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.remainViewCount(paramJson);
 | 
	
		
			
				|  |  | +            if (0 != response.getCode()) {
 | 
	
		
			
				|  |  | +                log.error("[remain view count error] responseJson: {}", response);
 | 
	
		
			
				|  |  | +                continue;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            remainFlowPoolVideoInfos.addAll(response.getData());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Map<String, Integer> distributeCountMap = Maps.newHashMap();
 | 
	
		
			
				|  |  | +        for (FlowPoolVideoInfo videoInfo : remainFlowPoolVideoInfos) {
 | 
	
		
			
				|  |  | +            String key = String.format("%s-%s", videoInfo.getVideoId(), videoInfo.getFlowPool());
 | 
	
		
			
				|  |  | +            distributeCountMap.put(key, videoInfo.getDistributeCount());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return distributeCountMap;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 |