|
@@ -9,6 +9,7 @@ import org.apache.commons.collections4.MapUtils;
|
|
|
import org.apache.commons.lang3.math.NumberUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
@@ -32,16 +33,14 @@ public class FlowPoolService {
|
|
|
private FlowPoolConfigService flowPoolConfigService;
|
|
|
|
|
|
private final String localDistributeCountFormat = "flow:pool:local:distribute:count:%s:%s";
|
|
|
- /**
|
|
|
- * 供给池 本地缓存
|
|
|
- * flow:pool:supply:local:distribute:count:{videoId}:{flowPool标记}
|
|
|
- */
|
|
|
- private final String supplyLocalDistributeCountFormat = "flow:pool:supply:local:distribute:count:%s:%s";
|
|
|
|
|
|
public final String valueFormat = "%s-%s";
|
|
|
|
|
|
private ExecutorService pool = ThreadPoolFactory.defaultPool();
|
|
|
|
|
|
+ @Value("${flow.pool.upgrade.switch:true}")
|
|
|
+ private boolean flowPoolUpgradeSwitch;
|
|
|
+
|
|
|
private int[] appTypes = {VLOG.getCode(), LOVE_MOVIE.getCode(), LOVE_LIVE.getCode(), LONG_VIDEO.getCode(),
|
|
|
SHORT_VIDEO.getCode(), H5.getCode(), APP_SPEED.getCode(), WAN_NENG_VIDEO.getCode(),
|
|
|
LAO_HAO_KAN_VIDEO.getCode(), ZUI_JING_QI.getCode(), PIAO_QUAN_VIDEO_PLUS.getCode(), JOURNEY.getCode()};
|
|
@@ -78,16 +77,20 @@ public class FlowPoolService {
|
|
|
}
|
|
|
|
|
|
private void asyncDelDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_LEVEL_FORMAT, appType, level);
|
|
|
- Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
- redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
|
|
|
- });
|
|
|
+ if (flowPoolUpgradeSwitch) {
|
|
|
+ asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT_V2, appType, level), values);
|
|
|
+ redisTemplate.opsForSet().remove(KEY_QUICK_WITH_LEVEL_FORMAT_V2, values);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
+ redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForSet().remove(KEY_WITH_LEVEL_FORMAT_V2, values);
|
|
|
- redisTemplate.opsForSet().remove(KEY_QUICK_WITH_LEVEL_FORMAT_V2, values);
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
public Map<Long, Integer> getDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
@@ -118,54 +121,18 @@ public class FlowPoolService {
|
|
|
}
|
|
|
|
|
|
private void asyncLocalDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level);
|
|
|
- redisTemplate.opsForZSet().remove(key, values);
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
|
|
|
- });
|
|
|
-
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForZSet().remove(KEY_WITH_LEVEL_SCORE_FORMAT_V2, values);
|
|
|
- redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT_V2, values);
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public Map<Long, Integer> getDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
- if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
- Map<Long, Integer> result = getDistributeCount(videoFlowPoolMap);
|
|
|
- // 处理脏数据:分发数<0
|
|
|
- Map<Long, String> dirties = videoFlowPoolMap.entrySet().stream()
|
|
|
- .filter(e -> result.get(e.getKey()) <= 0)
|
|
|
- .collect(Collectors.toMap(
|
|
|
- e -> e.getKey(),
|
|
|
- e -> e.getValue()
|
|
|
- ));
|
|
|
- asyncDelDistributeCountWithScore(dirties);
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- public void updateDistributeCountWithScore(List<Video> videos) {
|
|
|
- if (CollectionUtils.isEmpty(videos)) {
|
|
|
- return;
|
|
|
+ if (flowPoolUpgradeSwitch) {
|
|
|
+ asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT_V2, appType, level), values);
|
|
|
+ redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT_V2, values);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level);
|
|
|
+ redisTemplate.opsForZSet().remove(key, values);
|
|
|
+ redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
|
|
|
+ });
|
|
|
}
|
|
|
- Map<Long, String> removeMap = updateDistributeCount(videos);
|
|
|
- asyncDelDistributeCountWithScore(removeMap);
|
|
|
- }
|
|
|
-
|
|
|
- private void asyncDelDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_SCORE_FORMAT, appType, level);
|
|
|
- Long count = redisTemplate.opsForZSet().remove(key, values);
|
|
|
-
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_SCORE_FORMAT, appType), values);
|
|
|
- });
|
|
|
-
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForZSet().remove(KEY_WITH_SCORE_FORMAT_V2, values);
|
|
|
- redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_SCORE_FORMAT_V2, values);
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
private Map<Long, Integer> getDistributeCount(Map<Long, String> videoFlowPoolMap) {
|
|
@@ -209,7 +176,7 @@ public class FlowPoolService {
|
|
|
.collect(Collectors.toList());
|
|
|
redisTemplate.delete(keys);
|
|
|
|
|
|
- Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight();
|
|
|
+ Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight4FlowPoolWithLevel();
|
|
|
String[] values = new String[videoFlowPoolMap.size()];
|
|
|
int i = 0;
|
|
|
for (Map.Entry v : videoFlowPoolMap.entrySet()) {
|
|
@@ -225,70 +192,28 @@ public class FlowPoolService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- public Map<Long, Integer> getSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
- if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
- return Collections.emptyMap();
|
|
|
- }
|
|
|
-
|
|
|
- Map<Long, Integer> result = getSupplyDistributeCount(videoFlowPoolMap);
|
|
|
-
|
|
|
-
|
|
|
- // 处理脏数据:分发数<0
|
|
|
- Map<Long, String> dirties = videoFlowPoolMap.entrySet().stream()
|
|
|
- .filter(e -> result.get(e.getKey()) <= 0)
|
|
|
- .collect(Collectors.toMap(
|
|
|
- e -> e.getKey(),
|
|
|
- e -> e.getValue()
|
|
|
- ));
|
|
|
- asyncDelSupplyDistributeCountWithLevel(dirties);
|
|
|
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- private Map<Long, Integer> getSupplyDistributeCount(Map<Long, String> videoFlowPoolMap) {
|
|
|
- // 为了保证有序
|
|
|
- List<Map.Entry<Long, String>> entries = videoFlowPoolMap.entrySet().stream()
|
|
|
- .sorted(Comparator.comparingLong(e -> e.getKey()))
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- List<String> keys = entries.stream()
|
|
|
- .map(v -> String.format(supplyLocalDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- List<String> counts = redisTemplate.opsForValue().multiGet(keys);
|
|
|
- Map<Long, Integer> result = new HashMap<>();
|
|
|
- for (int i = 0; i < entries.size(); i++) {
|
|
|
- result.put(entries.get(i).getKey(), NumberUtils.toInt(counts.get(i), 0));
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- private Map<Long, String> updateSupplyDistributeCount(List<Video> videos) {
|
|
|
- // TODO 异步更新
|
|
|
- Map<Long, String> removeMap = new HashMap<>();
|
|
|
- videos.stream().forEach(v -> {
|
|
|
- String key = String.format(supplyLocalDistributeCountFormat, v.getVideoId(), v.getFlowPool());
|
|
|
- Long count = redisTemplate.opsForValue().decrement(key);
|
|
|
- if (count <= 0) {
|
|
|
- removeMap.put(v.getVideoId(), v.getFlowPool());
|
|
|
- }
|
|
|
- });
|
|
|
- return removeMap;
|
|
|
- }
|
|
|
-
|
|
|
- public void updateSupplyDistributeCountWithLevel(List<Video> videos) {
|
|
|
- if (CollectionUtils.isEmpty(videos)) {
|
|
|
+ private void asyncDelDistributeCountV2(Map<Long, String> videoFlowPoolMap,
|
|
|
+ TripleConsumer<Integer, String, String[]> flowPoolRemoveConsumer) {
|
|
|
+ if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
return;
|
|
|
}
|
|
|
- Map<Long, String> removeMap = updateSupplyDistributeCount(videos);
|
|
|
-
|
|
|
- asyncDelSupplyDistributeCountWithLevel(removeMap);
|
|
|
+ pool.execute(() -> {
|
|
|
+ List<String> keys = videoFlowPoolMap.entrySet().stream()
|
|
|
+ .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ redisTemplate.delete(keys);
|
|
|
|
|
|
- }
|
|
|
+ String[] values = new String[videoFlowPoolMap.size()];
|
|
|
+ int i = 0;
|
|
|
+ for (Map.Entry v : videoFlowPoolMap.entrySet()) {
|
|
|
+ values[i++] = String.format(valueFormat, v.getKey(), v.getValue());
|
|
|
+ }
|
|
|
|
|
|
- private void asyncDelSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_LEVEL_SUPPLY_FORMAT, appType, level);
|
|
|
- Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
+ Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight4FlowPoolWithLevel();
|
|
|
+ for (String level : levelWeight.keySet()) {
|
|
|
+ flowPoolRemoveConsumer.accept(0, level, values);
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
}
|