|
|
@@ -11,44 +11,76 @@ import net.devh.boot.grpc.client.inject.GrpcClient;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
|
* FeatureV2 gRPC 客户端
|
|
|
- *
|
|
|
+ *
|
|
|
* 优化说明:
|
|
|
* 1. 增加异常处理:捕获 StatusRuntimeException,避免错误向上传播
|
|
|
* 2. 增加重试机制:对 UNAVAILABLE 等可恢复错误自动重试(50ms 延迟)
|
|
|
* 3. 增加降级处理:重试失败后返回空结果,保证服务稳定
|
|
|
* 4. 增加详细日志:记录错误详情,便于问题排查
|
|
|
- *
|
|
|
+ *
|
|
|
* @author dyp
|
|
|
*/
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
public class FeatureV2Client {
|
|
|
-
|
|
|
+
|
|
|
@GrpcClient("recommend-feature")
|
|
|
private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 最大重试次数
|
|
|
- * 说明:对于网络连接问题,重试可以触发连接重建
|
|
|
+ * 单次 gRPC 请求的拆包大小(按 key 数量)
|
|
|
+ *
|
|
|
+ * 说明:避免单次响应过大触发 gRPC 默认 4MB 限制/链路取消;用户侧指定默认 500。
|
|
|
*/
|
|
|
- private static final int MAX_RETRY_ATTEMPTS = 2;
|
|
|
-
|
|
|
+ private static final int BATCH_SIZE = 500;
|
|
|
+
|
|
|
/**
|
|
|
- * 重试延迟(毫秒)
|
|
|
- * 说明:50ms 快速重试,给连接重建预留时间
|
|
|
+ * 拆包并发线程池
|
|
|
+ *
|
|
|
+ * 说明:机器 4 核时并发建议 4~8,这里按 cpu*2 并限制在 [4, 8]。
|
|
|
*/
|
|
|
- private static final long RETRY_DELAY_MS = 50;
|
|
|
+ private static final int BATCH_PARALLELISM = Math.max(4,
|
|
|
+ Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
|
|
|
+
|
|
|
+ private static final ExecutorService BATCH_EXECUTOR = new ThreadPoolExecutor(
|
|
|
+ BATCH_PARALLELISM,
|
|
|
+ BATCH_PARALLELISM,
|
|
|
+ 60L,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ // 有界队列 + CallerRuns:避免极端情况下无界堆积;队列满时回退到调用线程执行。
|
|
|
+ new LinkedBlockingQueue<>(2048),
|
|
|
+ new ThreadFactory() {
|
|
|
+ private final AtomicInteger idx = new AtomicInteger(1);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = new Thread(r);
|
|
|
+ t.setName("feature-v2-client-batch-" + idx.getAndIncrement());
|
|
|
+ t.setDaemon(true);
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+ },
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
+ );
|
|
|
|
|
|
/**
|
|
|
* 批量获取特征数据
|
|
|
- *
|
|
|
+ *
|
|
|
* @param protos 特征请求列表
|
|
|
* @return 特征数据 Map,key 为 uniqueKey,value 为特征值 JSON 字符串
|
|
|
*/
|
|
|
@@ -59,104 +91,102 @@ public class FeatureV2Client {
|
|
|
}
|
|
|
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
- // 从第 0 次尝试开始
|
|
|
- Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
|
|
|
- log.info("multiGetFeature: end, result.size={}, cost={}", result != null ? result.size() : 0, System.currentTimeMillis() - startTime);
|
|
|
- return result;
|
|
|
+
|
|
|
+ // 小请求不拆包:保持原行为
|
|
|
+ if (protos.size() <= BATCH_SIZE) {
|
|
|
+ Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
|
|
|
+ log.info("multiGetFeature: end, protos.size={}, batchSize={}, batchCount=1, result.size={}, cost={}",
|
|
|
+ protos.size(), BATCH_SIZE, result != null ? result.size() : 0, System.currentTimeMillis() - startTime);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 大请求拆包并发执行,最终合并结果;单个 batch 失败时返回部分结果
|
|
|
+ int total = protos.size();
|
|
|
+ int batchCount = (total + BATCH_SIZE - 1) / BATCH_SIZE;
|
|
|
+ List<CompletableFuture<Map<String, String>>> futures = new ArrayList<>(batchCount);
|
|
|
+ for (int start = 0; start < total; start += BATCH_SIZE) {
|
|
|
+ int end = Math.min(start + BATCH_SIZE, total);
|
|
|
+ List<FeatureKeyProto> batch = protos.subList(start, end);
|
|
|
+ futures.add(CompletableFuture.supplyAsync(() -> multiGetFeatureWithRetry(batch, 0), BATCH_EXECUTOR));
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, String> merged = new HashMap<>();
|
|
|
+ int exceptionBatchCount = 0;
|
|
|
+ for (CompletableFuture<Map<String, String>> f : futures) {
|
|
|
+ try {
|
|
|
+ Map<String, String> part = f.join();
|
|
|
+ if (part != null && !part.isEmpty()) {
|
|
|
+ merged.putAll(part);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 理论上 multiGetFeatureWithRetry 已吞掉异常并返回空;这里兜底,确保其它 batch 不受影响
|
|
|
+ exceptionBatchCount++;
|
|
|
+ log.error("multiGetFeature: batch future failed, ignored for partial result", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("multiGetFeature: end, protos.size={}, batchSize={}, batchCount={}, parallelism={}, merged.size={}, exceptionBatchCount={}, cost={}",
|
|
|
+ protos.size(), BATCH_SIZE, batchCount, BATCH_PARALLELISM, merged.size(), exceptionBatchCount,
|
|
|
+ System.currentTimeMillis() - startTime);
|
|
|
+ return merged;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 带重试的特征获取方法
|
|
|
- *
|
|
|
+ *
|
|
|
* @param protos 特征请求列表
|
|
|
* @param attemptCount 当前重试次数(从 0 开始)
|
|
|
* @return 特征数据 Map
|
|
|
*/
|
|
|
private Map<String, String> multiGetFeatureWithRetry(List<FeatureKeyProto> protos, int attemptCount) {
|
|
|
log.warn("multiGetFeatureWithRetry: start, attempt={}, protos.size={}", attemptCount, protos.size());
|
|
|
-
|
|
|
+
|
|
|
MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
|
|
|
.addAllFeatureKey(protos)
|
|
|
.build();
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
// 调用 gRPC 服务,设置 3 秒超时
|
|
|
MultiGetFeatureResponse response = client
|
|
|
.withDeadlineAfter(3, TimeUnit.SECONDS)
|
|
|
.multiGetFeature(request);
|
|
|
-
|
|
|
+
|
|
|
// 响应为空或没有结果
|
|
|
if (response == null || !response.hasResult()) {
|
|
|
log.info("multiGetFeature grpc error: response is null or has no result, attempt={}", attemptCount);
|
|
|
return Collections.emptyMap();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 业务错误码检查
|
|
|
if (response.getResult().getCode() != 1) {
|
|
|
log.info("multiGetFeature grpc code={}, msg={}, attempt={}", response.getResult().getCode(),
|
|
|
response.getResult().getMessage(), attemptCount);
|
|
|
return Collections.emptyMap();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 特征数据为空
|
|
|
if (response.getFeatureCount() == 0) {
|
|
|
log.info("multiGetFeature no feature, attempt={}", attemptCount);
|
|
|
return Collections.emptyMap();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 成功返回特征数据
|
|
|
return response.getFeatureMap();
|
|
|
-
|
|
|
+
|
|
|
} catch (StatusRuntimeException e) {
|
|
|
Status.Code code = e.getStatus().getCode();
|
|
|
String description = e.getStatus().getDescription();
|
|
|
-
|
|
|
+
|
|
|
// 记录详细的错误信息(使用 error 级别确保一定会输出)
|
|
|
- log.error("gRPC call failed: code={}, description={}, attempt={}/{}, protos.size={}, exception={}",
|
|
|
- code, description, attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e.getClass().getName(), e);
|
|
|
-
|
|
|
- // 判断是否应该重试
|
|
|
- if (shouldRetry(code) && attemptCount < MAX_RETRY_ATTEMPTS) {
|
|
|
- log.warn("Retrying gRPC call after {}ms, attempt={}/{}, reason={}", RETRY_DELAY_MS, attemptCount + 1, MAX_RETRY_ATTEMPTS, code);
|
|
|
-
|
|
|
- // 等待一段时间后重试(给连接重建预留时间)
|
|
|
- try {
|
|
|
- Thread.sleep(RETRY_DELAY_MS);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- log.warn("Retry sleep interrupted", ie);
|
|
|
- }
|
|
|
-
|
|
|
- // 递归调用,进行重试
|
|
|
- return multiGetFeatureWithRetry(protos, attemptCount + 1);
|
|
|
- }
|
|
|
-
|
|
|
- // 重试失败或不可重试的错误,降级返回空结果
|
|
|
- log.error("gRPC call failed after {} attempts, returning empty result for graceful degradation. code={}",
|
|
|
- attemptCount + 1, code);
|
|
|
+ log.error("gRPC call failed: code={}, description={}, protos.size={}, exception={}",
|
|
|
+ code, description, protos.size(), e.getClass().getName(), e);
|
|
|
return Collections.emptyMap();
|
|
|
} catch (Exception e) {
|
|
|
// 捕获所有其他异常(不应该发生,但为了安全起见)
|
|
|
- log.error("multiGetFeatureWithRetry: unexpected exception, attempt={}/{}, protos.size={}, exception={}",
|
|
|
- attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e.getClass().getName(), e);
|
|
|
+ log.error("multiGetFeatureWithRetry: unexpected exception, protos.size={}, exception={}",
|
|
|
+ attemptCount + 1, protos.size(), e.getClass().getName(), e);
|
|
|
return Collections.emptyMap();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 判断错误是否应该重试
|
|
|
- *
|
|
|
- * @param code gRPC 状态码
|
|
|
- * @return true 表示应该重试,false 表示不应该重试
|
|
|
- */
|
|
|
- private boolean shouldRetry(Status.Code code) {
|
|
|
- // UNAVAILABLE: 连接不可用(如网络断开、连接关闭)- 应该重试
|
|
|
- // DEADLINE_EXCEEDED: 超时 - 应该重试
|
|
|
- // RESOURCE_EXHAUSTED: 资源耗尽(如连接池满)- 应该重试
|
|
|
- // CANCELLED: 请求被取消(如服务端处理失败、连接中断)- 应该重试
|
|
|
- return code == Status.Code.UNAVAILABLE
|
|
|
- || code == Status.Code.DEADLINE_EXCEEDED
|
|
|
- || code == Status.Code.RESOURCE_EXHAUSTED
|
|
|
- || code == Status.Code.CANCELLED;
|
|
|
- }
|
|
|
+
|
|
|
}
|