|
@@ -33,6 +33,11 @@ public class FlowPoolService {
|
|
|
private FlowPoolConfigService flowPoolConfigService;
|
|
|
|
|
|
private final String localDistributeCountFormat = "flow:pool:local:distribute:count:%s:%s";
|
|
|
+ /**
|
|
|
+ * 供给池 本地缓存
|
|
|
+ * flow:pool:supply:local:distribute:count:{videoId}:{flowPool标记}
|
|
|
+ */
|
|
|
+ private final String supplyLocalDistributeCountFormat = "flow:pool:supply:local:distribute:count:%s:%s";
|
|
|
|
|
|
public final String valueFormat = "%s-%s";
|
|
|
|
|
@@ -205,4 +210,71 @@ public class FlowPoolService {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ public Map<Long, Integer> getSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
+ if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
+ return Collections.emptyMap();
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<Long, Integer> result = getSupplyDistributeCount(videoFlowPoolMap);
|
|
|
+
|
|
|
+
|
|
|
+ // 处理脏数据:分发数<0
|
|
|
+ Map<Long, String> dirties = videoFlowPoolMap.entrySet().stream()
|
|
|
+ .filter(e -> result.get(e.getKey()) <= 0)
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ e -> e.getKey(),
|
|
|
+ e -> e.getValue()
|
|
|
+ ));
|
|
|
+ asyncDelSupplyDistributeCountWithLevel(dirties);
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<Long, Integer> getSupplyDistributeCount(Map<Long, String> videoFlowPoolMap) {
|
|
|
+ // 为了保证有序
|
|
|
+ List<Map.Entry<Long, String>> entries = videoFlowPoolMap.entrySet().stream()
|
|
|
+ .sorted(Comparator.comparingLong(e -> e.getKey()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ List<String> keys = entries.stream()
|
|
|
+ .map(v -> String.format(supplyLocalDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ List<String> counts = redisTemplate.opsForValue().multiGet(keys);
|
|
|
+ Map<Long, Integer> result = new HashMap<>();
|
|
|
+ for (int i = 0; i < entries.size(); i++) {
|
|
|
+ result.put(entries.get(i).getKey(), NumberUtils.toInt(counts.get(i), 0));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<Long, String> updateSupplyDistributeCount(List<Video> videos) {
|
|
|
+ // TODO 异步更新
|
|
|
+ Map<Long, String> removeMap = new HashMap<>();
|
|
|
+ videos.stream().forEach(v -> {
|
|
|
+ String key = String.format(supplyLocalDistributeCountFormat, v.getVideoId(), v.getFlowPool());
|
|
|
+ Long count = redisTemplate.opsForValue().decrement(key);
|
|
|
+ if (count <= 0) {
|
|
|
+ removeMap.put(v.getVideoId(), v.getFlowPool());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return removeMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void updateSupplyDistributeCountWithLevel(List<Video> videos) {
|
|
|
+ if (CollectionUtils.isEmpty(videos)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Map<Long, String> removeMap = updateSupplyDistributeCount(videos);
|
|
|
+
|
|
|
+ asyncDelSupplyDistributeCountWithLevel(removeMap);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void asyncDelSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
+ asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_SUPPLY_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|