|
@@ -9,6 +9,7 @@ import org.apache.commons.collections4.MapUtils;
|
|
|
import org.apache.commons.lang3.math.NumberUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
@@ -32,16 +33,14 @@ public class FlowPoolService {
|
|
|
private FlowPoolConfigService flowPoolConfigService;
|
|
|
|
|
|
private final String localDistributeCountFormat = "flow:pool:local:distribute:count:%s:%s";
|
|
|
- /**
|
|
|
- * 供给池 本地缓存
|
|
|
- * flow:pool:supply:local:distribute:count:{videoId}:{flowPool标记}
|
|
|
- */
|
|
|
- private final String supplyLocalDistributeCountFormat = "flow:pool:supply:local:distribute:count:%s:%s";
|
|
|
|
|
|
public final String valueFormat = "%s-%s";
|
|
|
|
|
|
private ExecutorService pool = ThreadPoolFactory.defaultPool();
|
|
|
|
|
|
+ @Value("${flow.pool.upgrade.switch:true}")
|
|
|
+ private boolean flowPoolUpgradeSwitch;
|
|
|
+
|
|
|
private int[] appTypes = {VLOG.getCode(), LOVE_MOVIE.getCode(), LOVE_LIVE.getCode(), LONG_VIDEO.getCode(),
|
|
|
SHORT_VIDEO.getCode(), H5.getCode(), APP_SPEED.getCode(), WAN_NENG_VIDEO.getCode(),
|
|
|
LAO_HAO_KAN_VIDEO.getCode(), ZUI_JING_QI.getCode(), PIAO_QUAN_VIDEO_PLUS.getCode(), JOURNEY.getCode()};
|
|
@@ -78,16 +77,20 @@ public class FlowPoolService {
|
|
|
}
|
|
|
|
|
|
private void asyncDelDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_LEVEL_FORMAT, appType, level);
|
|
|
- Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
- redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
|
|
|
- });
|
|
|
+ if (flowPoolUpgradeSwitch) {
|
|
|
+ asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT_V2, appType, level), values);
|
|
|
+ redisTemplate.opsForSet().remove(KEY_QUICK_WITH_LEVEL_FORMAT_V2, values);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
+ redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
- asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT_V2, appType, level), values);
|
|
|
- redisTemplate.opsForSet().remove(KEY_QUICK_WITH_LEVEL_FORMAT_V2, values);
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
public Map<Long, Integer> getDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
@@ -118,16 +121,18 @@ public class FlowPoolService {
|
|
|
}
|
|
|
|
|
|
private void asyncLocalDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
- asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- String key = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level);
|
|
|
- redisTemplate.opsForZSet().remove(key, values);
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
|
|
|
- });
|
|
|
-
|
|
|
- asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT_V2, appType, level), values);
|
|
|
- redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT_V2, values);
|
|
|
- });
|
|
|
+ if (flowPoolUpgradeSwitch) {
|
|
|
+ asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT_V2, appType, level), values);
|
|
|
+ redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT_V2, values);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level);
|
|
|
+ redisTemplate.opsForZSet().remove(key, values);
|
|
|
+ redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private Map<Long, Integer> getDistributeCount(Map<Long, String> videoFlowPoolMap) {
|
|
@@ -199,12 +204,13 @@ public class FlowPoolService {
|
|
|
.collect(Collectors.toList());
|
|
|
redisTemplate.delete(keys);
|
|
|
|
|
|
- Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight4FlowPoolWithLevel();
|
|
|
String[] values = new String[videoFlowPoolMap.size()];
|
|
|
int i = 0;
|
|
|
for (Map.Entry v : videoFlowPoolMap.entrySet()) {
|
|
|
values[i++] = String.format(valueFormat, v.getKey(), v.getValue());
|
|
|
}
|
|
|
+
|
|
|
+ Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight4FlowPoolWithLevel();
|
|
|
for (String level : levelWeight.keySet()) {
|
|
|
flowPoolRemoveConsumer.accept(0, level, values);
|
|
|
}
|