|
@@ -21,7 +21,10 @@ import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
|
-import java.util.concurrent.*;
|
|
|
|
|
|
|
+import java.util.concurrent.Future;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author dyp
|
|
* @author dyp
|
|
@@ -33,12 +36,19 @@ public class FeatureV2Service {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private RedisTemplate<String, String> redisTemplate;
|
|
private RedisTemplate<String, String> redisTemplate;
|
|
|
|
|
|
|
|
|
|
+ @Qualifier("byteRedisTemplate")
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private RedisTemplate<String, byte[]> byteRedisTemplate;
|
|
|
|
|
+
|
|
|
@ApolloJsonValue("${dts.config:}")
|
|
@ApolloJsonValue("${dts.config:}")
|
|
|
private List<DTSConfig> newDtsConfigs;
|
|
private List<DTSConfig> newDtsConfigs;
|
|
|
|
|
|
|
|
@Value("${feature.mget.batch.size:300}")
|
|
@Value("${feature.mget.batch.size:300}")
|
|
|
private Integer batchSize;
|
|
private Integer batchSize;
|
|
|
|
|
|
|
|
|
|
+ @Value("${remove.base64.switch:false}")
|
|
|
|
|
+ private Boolean removeBase64Switch;
|
|
|
|
|
+
|
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
|
int keyCount = request.getFeatureKeyCount();
|
|
int keyCount = request.getFeatureKeyCount();
|
|
|
|
|
|
|
@@ -48,6 +58,12 @@ public class FeatureV2Service {
|
|
|
.build();
|
|
.build();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 移除Base64编解码的key
|
|
|
|
|
+ if (Boolean.TRUE.equals(removeBase64Switch)) {
|
|
|
|
|
+ return this.multiGetFeatureV2(request);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
long startTime = System.currentTimeMillis();
|
|
long startTime = System.currentTimeMillis();
|
|
|
|
|
|
|
|
// 1. 生成Redis Key列表
|
|
// 1. 生成Redis Key列表
|
|
@@ -56,19 +72,19 @@ public class FeatureV2Service {
|
|
|
int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
|
|
int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
|
|
|
|
|
|
|
|
// 2. 分批并行查询(必须保证返回值与 key 顺序一致)
|
|
// 2. 分批并行查询(必须保证返回值与 key 顺序一致)
|
|
|
- List<BatchFuture> futures = new ArrayList<>();
|
|
|
|
|
|
|
+ List<BatchFuture<String>> futures = new ArrayList<>();
|
|
|
for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
|
|
for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
|
|
|
int end = Math.min(start + safeBatchSize, redisKeys.size());
|
|
int end = Math.min(start + safeBatchSize, redisKeys.size());
|
|
|
List<String> batchKeys = redisKeys.subList(start, end);
|
|
List<String> batchKeys = redisKeys.subList(start, end);
|
|
|
Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
|
|
Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
|
|
|
.submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
|
|
.submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
|
|
|
- futures.add(new BatchFuture(start, end - start, future));
|
|
|
|
|
|
|
+ futures.add(new BatchFuture<>(start, end - start, future));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
|
|
// 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
|
|
|
List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
|
|
List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
|
|
|
long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
|
|
long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
|
|
|
- for (BatchFuture bf : futures) {
|
|
|
|
|
|
|
+ for (BatchFuture<String> bf : futures) {
|
|
|
long remainingNanos = deadlineNanos - System.nanoTime();
|
|
long remainingNanos = deadlineNanos - System.nanoTime();
|
|
|
if (remainingNanos <= 0) {
|
|
if (remainingNanos <= 0) {
|
|
|
bf.future.cancel(true);
|
|
bf.future.cancel(true);
|
|
@@ -106,12 +122,75 @@ public class FeatureV2Service {
|
|
|
return build;
|
|
return build;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private static final class BatchFuture {
|
|
|
|
|
|
|
+ private MultiGetFeatureResponse multiGetFeatureV2(MultiGetFeatureRequest request) {
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
|
+ int keyCount = request.getFeatureKeyCount();
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 生成Redis Key列表, v2去除了base64编码 key格式也变了
|
|
|
|
|
+ List<String> redisKeys = request.getFeatureKeyList().stream()
|
|
|
|
|
+ .map(this::redisKey)
|
|
|
|
|
+ .map(s -> String.format("%s:v2", s))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+ int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
|
|
|
|
|
+ List<BatchFuture<byte[]>> futures = new ArrayList<>();
|
|
|
|
|
+ for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
|
|
|
|
|
+ int end = Math.min(start + safeBatchSize, redisKeys.size());
|
|
|
|
|
+ List<String> batchKeys = redisKeys.subList(start, end);
|
|
|
|
|
+ Future<List<byte[]>> future = ThreadPoolFactory.multiGetFeaturePool()
|
|
|
|
|
+ .submit(() -> byteRedisTemplate.opsForValue().multiGet(batchKeys));
|
|
|
|
|
+ futures.add(new BatchFuture<byte[]>(start, end - start, future));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
|
|
|
|
|
+ List<byte[]> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
|
|
|
|
|
+ long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
|
|
|
|
|
+ for (BatchFuture<byte[]> bf : futures) {
|
|
|
|
|
+ long remainingNanos = deadlineNanos - System.nanoTime();
|
|
|
|
|
+ if (remainingNanos <= 0) {
|
|
|
|
|
+ bf.future.cancel(true);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<byte[]> batchValues = bf.future.get(remainingNanos, TimeUnit.NANOSECONDS);
|
|
|
|
|
+ if (batchValues == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ int copyLen = Math.min(bf.size, batchValues.size());
|
|
|
|
|
+ for (int i = 0; i < copyLen; i++) {
|
|
|
|
|
+ allValues.set(bf.startIndex + i, batchValues.get(i));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
|
+ bf.future.cancel(true);
|
|
|
|
|
+ log.warn("Batch mGetV2 timeout, startIndex={}, size={}", bf.startIndex, bf.size);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("Batch mGetV2 failed, startIndex={}, size={}", bf.startIndex, bf.size, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 解压缩
|
|
|
|
|
+ List<String> values = CommonCollectionUtils.toList(allValues, CompressionUtil::snappyDecompressV2);
|
|
|
|
|
+
|
|
|
|
|
+ // 5. 构建响应
|
|
|
|
|
+ MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
|
|
|
|
|
+ builder.setResult(Result.newBuilder().setCode(1));
|
|
|
|
|
+ for (int i = 0; i < keyCount; i++) {
|
|
|
|
|
+ builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
|
|
|
|
|
+ Strings.nullToEmpty(values.get(i)));
|
|
|
|
|
+ }
|
|
|
|
|
+ MultiGetFeatureResponse build = builder.build();
|
|
|
|
|
+ log.info("multiGetFeatureV2, cost={}ms", System.currentTimeMillis() - startTime);
|
|
|
|
|
+ return build;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static final class BatchFuture<T> {
|
|
|
private final int startIndex;
|
|
private final int startIndex;
|
|
|
private final int size;
|
|
private final int size;
|
|
|
- private final Future<List<String>> future;
|
|
|
|
|
|
|
+ private final Future<List<T>> future;
|
|
|
|
|
|
|
|
- private BatchFuture(int startIndex, int size, Future<List<String>> future) {
|
|
|
|
|
|
|
+ private BatchFuture(int startIndex, int size, Future<List<T>> future) {
|
|
|
this.startIndex = startIndex;
|
|
this.startIndex = startIndex;
|
|
|
this.size = size;
|
|
this.size = size;
|
|
|
this.future = future;
|
|
this.future = future;
|