|
@@ -2,6 +2,7 @@ package com.tzld.piaoquan.recommend.feature.service;
|
|
|
|
|
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.base.Strings;
|
|
import com.google.common.base.Strings;
|
|
|
|
|
+import com.tzld.piaoquan.recommend.feature.common.ThreadPoolFactory;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.common.Result;
|
|
import com.tzld.piaoquan.recommend.feature.model.common.Result;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
|
|
@@ -12,11 +13,15 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author dyp
|
|
* @author dyp
|
|
@@ -31,6 +36,9 @@ public class FeatureV2Service {
|
|
|
@ApolloJsonValue("${dts.config:}")
|
|
@ApolloJsonValue("${dts.config:}")
|
|
|
private List<DTSConfig> newDtsConfigs;
|
|
private List<DTSConfig> newDtsConfigs;
|
|
|
|
|
|
|
|
|
|
+ @Value("${feature.mget.batch.size:500}")
|
|
|
|
|
+ private Integer batchSize;
|
|
|
|
|
+
|
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
|
int keyCount = request.getFeatureKeyCount();
|
|
int keyCount = request.getFeatureKeyCount();
|
|
|
|
|
|
|
@@ -43,13 +51,48 @@ public class FeatureV2Service {
|
|
|
// 1. 生成Redis Key列表
|
|
// 1. 生成Redis Key列表
|
|
|
List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
|
|
List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
|
|
|
|
|
|
|
|
- // 2. Redis批量查询
|
|
|
|
|
- List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
|
|
|
|
|
|
|
+ int safeBatchSize = (batchSize == null || batchSize <= 0) ? 500 : batchSize;
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
|
|
|
|
|
+ List<BatchFuture> futures = new ArrayList<>();
|
|
|
|
|
+ for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
|
|
|
|
|
+ int end = Math.min(start + safeBatchSize, redisKeys.size());
|
|
|
|
|
+ List<String> batchKeys = redisKeys.subList(start, end);
|
|
|
|
|
+ Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
|
|
|
|
|
+ .submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
|
|
|
|
|
+ futures.add(new BatchFuture(start, end - start, future));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
|
|
|
|
|
+ List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
|
|
|
|
|
+ long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
|
|
|
|
|
+ for (BatchFuture bf : futures) {
|
|
|
|
|
+ long remainingNanos = deadlineNanos - System.nanoTime();
|
|
|
|
|
+ if (remainingNanos <= 0) {
|
|
|
|
|
+ bf.future.cancel(true);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<String> batchValues = bf.future.get(remainingNanos, TimeUnit.NANOSECONDS);
|
|
|
|
|
+ if (batchValues == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ int copyLen = Math.min(bf.size, batchValues.size());
|
|
|
|
|
+ for (int i = 0; i < copyLen; i++) {
|
|
|
|
|
+ allValues.set(bf.startIndex + i, batchValues.get(i));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
|
+ bf.future.cancel(true);
|
|
|
|
|
+ log.warn("Batch mGet timeout, startIndex={}, size={}", bf.startIndex, bf.size);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("Batch mGet failed, startIndex={}, size={}", bf.startIndex, bf.size, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 3. 数据解压缩
|
|
|
|
|
- values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
|
|
|
|
|
|
|
+ // 4. 解压缩
|
|
|
|
|
+ List<String> values = CommonCollectionUtils.toList(allValues, CompressionUtil::snappyDecompress);
|
|
|
|
|
|
|
|
- // 4. 构建响应
|
|
|
|
|
|
|
+ // 5. 构建响应
|
|
|
MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
|
|
MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
|
|
|
builder.setResult(Result.newBuilder().setCode(1));
|
|
builder.setResult(Result.newBuilder().setCode(1));
|
|
|
for (int i = 0; i < keyCount; i++) {
|
|
for (int i = 0; i < keyCount; i++) {
|
|
@@ -59,6 +102,18 @@ public class FeatureV2Service {
|
|
|
return builder.build();
|
|
return builder.build();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private static final class BatchFuture {
|
|
|
|
|
+ private final int startIndex;
|
|
|
|
|
+ private final int size;
|
|
|
|
|
+ private final Future<List<String>> future;
|
|
|
|
|
+
|
|
|
|
|
+ private BatchFuture(int startIndex, int size, Future<List<String>> future) {
|
|
|
|
|
+ this.startIndex = startIndex;
|
|
|
|
|
+ this.size = size;
|
|
|
|
|
+ this.future = future;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Note:写入和读取的key生成规则应保持一致
|
|
// Note:写入和读取的key生成规则应保持一致
|
|
|
private String redisKey(FeatureKeyProto fk) {
|
|
private String redisKey(FeatureKeyProto fk) {
|
|
|
|
|
|