|
@@ -4,6 +4,8 @@ import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
|
|
import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
|
|
|
|
|
+import io.grpc.Status;
|
|
|
|
|
+import io.grpc.StatusRuntimeException;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import net.devh.boot.grpc.client.inject.GrpcClient;
|
|
import net.devh.boot.grpc.client.inject.GrpcClient;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
@@ -15,39 +17,131 @@ import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
|
|
+ * FeatureV2 gRPC 客户端
|
|
|
|
|
+ *
|
|
|
|
|
+ * 优化说明:
|
|
|
|
|
+ * 1. 增加异常处理:捕获 StatusRuntimeException,避免错误向上传播
|
|
|
|
|
+ * 2. 增加重试机制:对 UNAVAILABLE 等可恢复错误自动重试(50ms 延迟)
|
|
|
|
|
+ * 3. 增加降级处理:重试失败后返回空结果,保证服务稳定
|
|
|
|
|
+ * 4. 增加详细日志:记录错误详情,便于问题排查
|
|
|
|
|
+ *
|
|
|
* @author dyp
|
|
* @author dyp
|
|
|
*/
|
|
*/
|
|
|
@Component
|
|
@Component
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
public class FeatureV2Client {
|
|
public class FeatureV2Client {
|
|
|
|
|
+
|
|
|
@GrpcClient("recommend-feature")
|
|
@GrpcClient("recommend-feature")
|
|
|
private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
|
|
private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 最大重试次数
|
|
|
|
|
+ * 说明:对于网络连接问题,重试可以触发连接重建
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final int MAX_RETRY_ATTEMPTS = 2;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 重试延迟(毫秒)
|
|
|
|
|
+ * 说明:50ms 快速重试,给连接重建预留时间
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final long RETRY_DELAY_MS = 50;
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量获取特征数据
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param protos 特征请求列表
|
|
|
|
|
+ * @return 特征数据 Map,key 为 uniqueKey,value 为特征值 JSON 字符串
|
|
|
|
|
+ */
|
|
|
public Map<String, String> multiGetFeature(List<FeatureKeyProto> protos) {
|
|
public Map<String, String> multiGetFeature(List<FeatureKeyProto> protos) {
|
|
|
if (CollectionUtils.isEmpty(protos)) {
|
|
if (CollectionUtils.isEmpty(protos)) {
|
|
|
return Collections.emptyMap();
|
|
return Collections.emptyMap();
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // 从第 0 次尝试开始
|
|
|
|
|
+ return multiGetFeatureWithRetry(protos, 0);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 带重试的特征获取方法
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param protos 特征请求列表
|
|
|
|
|
+ * @param attemptCount 当前重试次数(从 0 开始)
|
|
|
|
|
+ * @return 特征数据 Map
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<String, String> multiGetFeatureWithRetry(List<FeatureKeyProto> protos, int attemptCount) {
|
|
|
MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
|
|
MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
|
|
|
.addAllFeatureKey(protos)
|
|
.addAllFeatureKey(protos)
|
|
|
.build();
|
|
.build();
|
|
|
- // 显式设置 deadline:3秒超时,避免无限等待
|
|
|
|
|
- MultiGetFeatureResponse response = client
|
|
|
|
|
- .withDeadlineAfter(3, TimeUnit.SECONDS)
|
|
|
|
|
- .multiGetFeature(request);
|
|
|
|
|
- if (response == null || !response.hasResult()) {
|
|
|
|
|
- log.info("multiGetFeature grpc error");
|
|
|
|
|
- return null;
|
|
|
|
|
- }
|
|
|
|
|
- if (response.getResult().getCode() != 1) {
|
|
|
|
|
- log.info("multiGetFeature grpc code={}, msg={}", response.getResult().getCode(),
|
|
|
|
|
- response.getResult().getMessage());
|
|
|
|
|
- return null;
|
|
|
|
|
- }
|
|
|
|
|
- if (response.getFeatureCount() == 0) {
|
|
|
|
|
- log.info("multiGetFeature no feature");
|
|
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 调用 gRPC 服务,设置 3 秒超时
|
|
|
|
|
+ MultiGetFeatureResponse response = client
|
|
|
|
|
+ .withDeadlineAfter(3, TimeUnit.SECONDS)
|
|
|
|
|
+ .multiGetFeature(request);
|
|
|
|
|
+
|
|
|
|
|
+ // 响应为空或没有结果
|
|
|
|
|
+ if (response == null || !response.hasResult()) {
|
|
|
|
|
+ log.info("multiGetFeature grpc error: response is null or has no result, attempt={}", attemptCount);
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 业务错误码检查
|
|
|
|
|
+ if (response.getResult().getCode() != 1) {
|
|
|
|
|
+ log.info("multiGetFeature grpc code={}, msg={}, attempt={}", response.getResult().getCode(),
|
|
|
|
|
+ response.getResult().getMessage(), attemptCount);
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 特征数据为空
|
|
|
|
|
+ if (response.getFeatureCount() == 0) {
|
|
|
|
|
+ log.info("multiGetFeature no feature, attempt={}", attemptCount);
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 成功返回特征数据
|
|
|
|
|
+ return response.getFeatureMap();
|
|
|
|
|
+
|
|
|
|
|
+ } catch (StatusRuntimeException e) {
|
|
|
|
|
+ Status.Code code = e.getStatus().getCode();
|
|
|
|
|
+ String description = e.getStatus().getDescription();
|
|
|
|
|
+
|
|
|
|
|
+ // 记录详细的错误信息
|
|
|
|
|
+ log.error("gRPC call failed: code={}, description={}, attempt={}/{}, protos.size={}",
|
|
|
|
|
+ code, description, attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e);
|
|
|
|
|
+
|
|
|
|
|
+ // 判断是否应该重试
|
|
|
|
|
+ if (shouldRetry(code) && attemptCount < MAX_RETRY_ATTEMPTS) {
|
|
|
|
|
+ log.warn("Retrying gRPC call after {}ms, attempt={}/{}, reason={}", RETRY_DELAY_MS, attemptCount + 1, MAX_RETRY_ATTEMPTS, code);
|
|
|
|
|
+
|
|
|
|
|
+ // 等待一段时间后重试(给连接重建预留时间)
|
|
|
|
|
+ try {
|
|
|
|
|
+ Thread.sleep(RETRY_DELAY_MS);
|
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.warn("Retry sleep interrupted", ie);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 递归调用,进行重试
|
|
|
|
|
+ return multiGetFeatureWithRetry(protos, attemptCount + 1);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 重试失败或不可重试的错误,降级返回空结果
|
|
|
|
|
+ log.error("gRPC call failed after {} attempts, returning empty result for graceful degradation. code={}",
|
|
|
|
|
+ attemptCount + 1, code);
|
|
|
return Collections.emptyMap();
|
|
return Collections.emptyMap();
|
|
|
}
|
|
}
|
|
|
- return response.getFeatureMap();
|
|
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 判断错误是否应该重试
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param code gRPC 状态码
|
|
|
|
|
+ * @return true 表示应该重试,false 表示不应该重试
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean shouldRetry(Status.Code code) {
|
|
|
|
|
+ // UNAVAILABLE: 连接不可用(如网络断开、连接关闭)- 应该重试
|
|
|
|
|
+ // DEADLINE_EXCEEDED: 超时 - 应该重试
|
|
|
|
|
+ // RESOURCE_EXHAUSTED: 资源耗尽(如连接池满)- 应该重试
|
|
|
|
|
+ return code == Status.Code.UNAVAILABLE || code == Status.Code.DEADLINE_EXCEEDED || code == Status.Code.RESOURCE_EXHAUSTED;
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|