Przeglądaj źródła

Merge branch 'ljd/feature-error-fix' into test

# Conflicts:
#	recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/JSONUtils.java
jiandong.liu 2 tygodni temu
rodzic
commit
f94a18d69a

+ 112 - 14
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java

@@ -4,6 +4,8 @@ import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.client.inject.GrpcClient;
 import org.springframework.stereotype.Component;
@@ -12,38 +14,134 @@ import org.springframework.util.CollectionUtils;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
+ * FeatureV2 gRPC 客户端
+ * 
+ * 优化说明:
+ * 1. 增加异常处理:捕获 StatusRuntimeException,避免错误向上传播
+ * 2. 增加重试机制:对 UNAVAILABLE 等可恢复错误自动重试(50ms 延迟)
+ * 3. 增加降级处理:重试失败后返回空结果,保证服务稳定
+ * 4. 增加详细日志:记录错误详情,便于问题排查
+ * 
  * @author dyp
  */
 @Component
 @Slf4j
 public class FeatureV2Client {
+    
     @GrpcClient("recommend-feature")
     private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
+    
+    /**
+     * 最大重试次数
+     * 说明:对于网络连接问题,重试可以触发连接重建
+     */
+    private static final int MAX_RETRY_ATTEMPTS = 2;
+    
+    /**
+     * 重试延迟(毫秒)
+     * 说明:50ms 快速重试,给连接重建预留时间
+     */
+    private static final long RETRY_DELAY_MS = 50;
 
+    /**
+     * 批量获取特征数据
+     * 
+     * @param protos 特征请求列表
+     * @return 特征数据 Map,key 为 uniqueKey,value 为特征值 JSON 字符串
+     */
     public Map<String, String> multiGetFeature(List<FeatureKeyProto> protos) {
         if (CollectionUtils.isEmpty(protos)) {
             return Collections.emptyMap();
         }
+        
+        // 从第 0 次尝试开始
+        return multiGetFeatureWithRetry(protos, 0);
+    }
+    
+    /**
+     * 带重试的特征获取方法
+     * 
+     * @param protos 特征请求列表
+     * @param attemptCount 当前重试次数(从 0 开始)
+     * @return 特征数据 Map
+     */
+    private Map<String, String> multiGetFeatureWithRetry(List<FeatureKeyProto> protos, int attemptCount) {
         MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
                 .addAllFeatureKey(protos)
                 .build();
-        MultiGetFeatureResponse response = client.multiGetFeature(request);
-        if (response == null || !response.hasResult()) {
-            log.info("multiGetFeature grpc error");
-            return null;
-        }
-        if (response.getResult().getCode() != 1) {
-            log.info("multiGetFeature grpc code={}, msg={}", response.getResult().getCode(),
-                    response.getResult().getMessage());
-            return null;
-        }
-        if (response.getFeatureCount() == 0) {
-            log.info("multiGetFeature no feature");
+        
+        try {
+            // 调用 gRPC 服务,设置 3 秒超时
+            MultiGetFeatureResponse response = client
+                    .withDeadlineAfter(3, TimeUnit.SECONDS)
+                    .multiGetFeature(request);
+            
+            // 响应为空或没有结果
+            if (response == null || !response.hasResult()) {
+                log.info("multiGetFeature grpc error: response is null or has no result, attempt={}", attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 业务错误码检查
+            if (response.getResult().getCode() != 1) {
+                log.info("multiGetFeature grpc code={}, msg={}, attempt={}", response.getResult().getCode(),
+                        response.getResult().getMessage(), attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 特征数据为空
+            if (response.getFeatureCount() == 0) {
+                log.info("multiGetFeature no feature, attempt={}", attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 成功返回特征数据
+            return response.getFeatureMap();
+            
+        } catch (StatusRuntimeException e) {
+            Status.Code code = e.getStatus().getCode();
+            String description = e.getStatus().getDescription();
+            
+            // 记录详细的错误信息
+            log.error("gRPC call failed: code={}, description={}, attempt={}/{}, protos.size={}", 
+                    code, description, attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e);
+            
+            // 判断是否应该重试
+            if (shouldRetry(code) && attemptCount < MAX_RETRY_ATTEMPTS) {
+                log.warn("Retrying gRPC call after {}ms, attempt={}/{}, reason={}", RETRY_DELAY_MS, attemptCount + 1, MAX_RETRY_ATTEMPTS, code);
+                
+                // 等待一段时间后重试(给连接重建预留时间)
+                try {
+                    Thread.sleep(RETRY_DELAY_MS);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    log.warn("Retry sleep interrupted", ie);
+                }
+                
+                // 递归调用,进行重试
+                return multiGetFeatureWithRetry(protos, attemptCount + 1);
+            }
+            
+            // 重试失败或不可重试的错误,降级返回空结果
+            log.error("gRPC call failed after {} attempts, returning empty result for graceful degradation. code={}", 
+                    attemptCount + 1, code);
             return Collections.emptyMap();
         }
-        return response.getFeatureMap();
     }
-
+    
+    /**
+     * 判断错误是否应该重试
+     * 
+     * @param code gRPC 状态码
+     * @return true 表示应该重试,false 表示不应该重试
+     */
+    private boolean shouldRetry(Status.Code code) {
+        // UNAVAILABLE: 连接不可用(如网络断开、连接关闭)- 应该重试
+        // DEADLINE_EXCEEDED: 超时 - 应该重试
+        // RESOURCE_EXHAUSTED: 资源耗尽(如连接池满)- 应该重试
+        return code == Status.Code.UNAVAILABLE || code == Status.Code.DEADLINE_EXCEEDED || code == Status.Code.RESOURCE_EXHAUSTED;
+    }
 }

+ 2 - 1
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/Application.java

@@ -19,7 +19,8 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
         "com.tzld.piaoquan.recommend.feature.service",
         "com.tzld.piaoquan.recommend.feature.grpcservice",
         "com.tzld.piaoquan.recommend.feature.web",
-        "com.tzld.piaoquan.recommend.feature.config"
+        "com.tzld.piaoquan.recommend.feature.config",
+        "com.tzld.piaoquan.recommend.feature.common"
 })
 @EnableEurekaClient
 @EnableAspectJAutoProxy

+ 176 - 10
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolManager.java

@@ -9,6 +9,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -46,8 +47,8 @@ public class DynamicThreadPoolManager {
      * Apollo 配置的线程池参数
      * 配置格式:
      * [
-     * {"poolName":"DEFAULT","corePoolSize":32,"maxPoolSize":64,"queueCapacity":3000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
-     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":64,"maxPoolSize":128,"queueCapacity":5000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
+     * {"poolName":"DEFAULT","corePoolSize":8,"maxPoolSize":16,"queueCapacity":1000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
+     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":16,"maxPoolSize":32,"queueCapacity":1800,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
      * ]
      */
     @ApolloJsonValue("${thread.pool.configs:[]}")
@@ -65,7 +66,7 @@ public class DynamicThreadPoolManager {
         DEFAULT_CONFIG.setPoolName(DEFAULT_POOL);
         DEFAULT_CONFIG.setCorePoolSize(8);
         DEFAULT_CONFIG.setMaxPoolSize(16);
-        DEFAULT_CONFIG.setQueueCapacity(3000);
+        DEFAULT_CONFIG.setQueueCapacity(1000);
         DEFAULT_CONFIG.setKeepAliveSeconds(60);
         DEFAULT_CONFIG.setRejectedPolicy("CALLER_RUNS");
 
@@ -74,11 +75,31 @@ public class DynamicThreadPoolManager {
         MULTI_GET_FEATURE_CONFIG.setPoolName(MULTI_GET_FEATURE_POOL);
         MULTI_GET_FEATURE_CONFIG.setCorePoolSize(16);
         MULTI_GET_FEATURE_CONFIG.setMaxPoolSize(32);
-        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(5000);
+        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(1800);
         MULTI_GET_FEATURE_CONFIG.setKeepAliveSeconds(60);
         MULTI_GET_FEATURE_CONFIG.setRejectedPolicy("CALLER_RUNS");
     }
 
+    /**
+     * 监控阈值配置(可通过 Apollo 动态调整)
+     */
+    @Value("${thread.pool.monitor.enabled:true}")
+    private boolean monitorEnabled;
+
+    @Value("${thread.pool.monitor.interval:30}")
+    private int monitorIntervalSeconds;
+
+    @Value("${thread.pool.monitor.thread.threshold:0.8}")
+    private double threadUsageThreshold;
+
+    @Value("${thread.pool.monitor.queue.threshold:0.8}")
+    private double queueUsageThreshold;
+
+    /**
+     * 定时监控调度器
+     */
+    private ScheduledExecutorService monitorScheduler;
+
     @PostConstruct
     public void init() {
         // 初始化默认线程池
@@ -93,6 +114,93 @@ public class DynamicThreadPoolManager {
         }
 
         log.info("DynamicThreadPoolManager initialized, pools: {}", threadPoolRegistry.keySet());
+
+        // 启动定时监控任务
+        startMonitor();
+    }
+
+    /**
+     * 启动线程池监控任务
+     */
+    private void startMonitor() {
+        if (!monitorEnabled) {
+            log.info("Thread pool monitor is disabled");
+            return;
+        }
+
+        monitorScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").setDaemon(true).build()
+        );
+
+        monitorScheduler.scheduleAtFixedRate(this::checkThreadPoolStatus, 
+                monitorIntervalSeconds, monitorIntervalSeconds, TimeUnit.SECONDS);
+        
+        log.info("Thread pool monitor started, interval={}s, threadThreshold={}%, queueThreshold={}%",
+                monitorIntervalSeconds, (int)(threadUsageThreshold * 100), (int)(queueUsageThreshold * 100));
+    }
+
+    /**
+     * 检查线程池状态,超过阈值时输出告警
+     */
+    private void checkThreadPoolStatus() {
+        try {
+            for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+                String poolName = entry.getKey();
+                ThreadPoolExecutor executor = entry.getValue();
+
+                int activeCount = executor.getActiveCount();
+                int maxPoolSize = executor.getMaximumPoolSize();
+                int queueSize = executor.getQueue().size();
+                int queueCapacity = getQueueCapacity(poolName);
+
+                double threadUsage = (double) activeCount / maxPoolSize;
+                double queueUsage = queueCapacity > 0 ? (double) queueSize / queueCapacity : 0;
+
+                // 线程使用率超过阈值
+                if (threadUsage >= threadUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 线程使用率过高! activeCount={}/{} ({}%), " +
+                            "poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                            poolName, activeCount, maxPoolSize, (int)(threadUsage * 100),
+                            executor.getPoolSize(), queueSize, queueCapacity,
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+
+                // 队列使用率超过阈值
+                if (queueUsage >= queueUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 队列使用率过高! queueSize={}/{} ({}%), " +
+                            "activeCount={}/{}, poolSize={}, completedTasks={}, totalTasks={}",
+                            poolName, queueSize, queueCapacity, (int)(queueUsage * 100),
+                            activeCount, maxPoolSize, executor.getPoolSize(),
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Thread pool monitor error", e);
+        }
+    }
+
+    /**
+     * 手动输出所有线程池当前状态(可用于排查问题)
+     */
+    public void printAllPoolStatus() {
+        log.info("===== Thread Pool Status Report =====");
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            String poolName = entry.getKey();
+            ThreadPoolExecutor executor = entry.getValue();
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.info("[{}] coreSize={}, maxSize={}, poolSize={}, activeCount={}, " +
+                    "queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName,
+                    executor.getCorePoolSize(),
+                    executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getActiveCount(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(),
+                    executor.getTaskCount());
+        }
+        log.info("===== End of Report =====");
     }
 
     /**
@@ -121,20 +229,71 @@ public class DynamicThreadPoolManager {
     }
 
     /**
-     * 获取拒绝策略
+     * 获取拒绝策略(带告警包装)
      */
     private RejectedExecutionHandler getRejectedExecutionHandler(String policy) {
+        RejectedExecutionHandler originalHandler;
         switch (policy.toUpperCase()) {
             case "ABORT":
-                return new ThreadPoolExecutor.AbortPolicy();
+                originalHandler = new ThreadPoolExecutor.AbortPolicy();
+                break;
             case "DISCARD":
-                return new ThreadPoolExecutor.DiscardPolicy();
+                originalHandler = new ThreadPoolExecutor.DiscardPolicy();
+                break;
             case "DISCARD_OLDEST":
-                return new ThreadPoolExecutor.DiscardOldestPolicy();
+                originalHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
+                break;
             case "CALLER_RUNS":
             default:
-                return new ThreadPoolExecutor.CallerRunsPolicy();
+                originalHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                break;
+        }
+        // 包装原始策略,在任务被拒绝时实时输出告警
+        return new AlertingRejectedExecutionHandler(originalHandler, policy);
+    }
+
+    /**
+     * 带告警功能的拒绝策略包装器
+     * 当任务被拒绝时,实时输出线程池状态
+     */
+    private class AlertingRejectedExecutionHandler implements RejectedExecutionHandler {
+        private final RejectedExecutionHandler delegate;
+        private final String policyName;
+        
+        public AlertingRejectedExecutionHandler(RejectedExecutionHandler delegate, String policyName) {
+            this.delegate = delegate;
+            this.policyName = policyName;
+        }
+        
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            // 实时输出告警:任务被拒绝
+            String poolName = getPoolNameByExecutor(executor);
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.error("[ThreadPool REJECTED] [{}] 任务被拒绝! 策略={}, " +
+                    "activeCount={}/{}, poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName, policyName,
+                    executor.getActiveCount(), executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(), executor.getTaskCount());
+            
+            // 执行原始拒绝策略
+            delegate.rejectedExecution(r, executor);
+        }
+    }
+    
+    /**
+     * 根据 executor 实例查找线程池名称
+     */
+    private String getPoolNameByExecutor(ThreadPoolExecutor executor) {
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            if (entry.getValue() == executor) {
+                return entry.getKey();
+            }
         }
+        return "UNKNOWN";
     }
 
     /**
@@ -286,12 +445,19 @@ public class DynamicThreadPoolManager {
         } else if (MULTI_GET_FEATURE_POOL.equals(poolName)) {
             return MULTI_GET_FEATURE_CONFIG.getQueueCapacity();
         }
-        return 3000;
+        return DEFAULT_CONFIG.getQueueCapacity();
     }
 
     @PreDestroy
     public void shutdown() {
         log.info("Shutting down thread pools...");
+        
+        // 先关闭监控调度器
+        if (monitorScheduler != null && !monitorScheduler.isShutdown()) {
+            monitorScheduler.shutdown();
+            log.info("Thread pool monitor shutdown");
+        }
+        
         for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
             ThreadPoolExecutor executor = entry.getValue();
             executor.shutdown();

+ 11 - 4
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/grpcservice/FeatureV2GrpcService.java

@@ -1,10 +1,10 @@
 package com.tzld.piaoquan.recommend.feature.grpcservice;
 
-import com.tzld.piaoquan.recommend.feature.client.ProtobufUtils;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
 import com.tzld.piaoquan.recommend.feature.service.FeatureV2Service;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.server.service.GrpcService;
@@ -23,9 +23,16 @@ public class FeatureV2GrpcService extends FeatureV2ServiceGrpc.FeatureV2ServiceI
     @Override
     public void multiGetFeature(MultiGetFeatureRequest request,
                                 StreamObserver<MultiGetFeatureResponse> responseObserver) {
-        MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
+        try {
+            MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            log.error("multiGetFeature error, keyCount={}", request.getFeatureKeyCount(), e);
+            responseObserver.onError(
+                Status.INTERNAL.withDescription("Feature query failed: " + e.getMessage()).asRuntimeException()
+            );
+        }
     }
 
 }

+ 5 - 3
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/JSONUtils.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.feature.util;
 
+import com.alibaba.fastjson.JSONObject;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
@@ -9,7 +10,7 @@ import org.apache.commons.lang3.StringUtils;
 public class JSONUtils {
 
     /**
-     * Gson 单例,避免每次调用创建新对象
+     * Gson 单例,线程安全,避免重复创建
      */
     private static final Gson GSON = new Gson();
 
@@ -26,13 +27,14 @@ public class JSONUtils {
     }
 
     public static <T> T fromJson(String value, TypeToken<T> typeToken, T defaultValue) {
+
         if (StringUtils.isBlank(value)) {
             return defaultValue;
         }
         try {
-            return GSON.fromJson(value, typeToken.getType());
+            return JSONObject.parseObject(value, typeToken.getType());
         } catch (Exception e) {
-            log.error("fromJson error! value=[{}]", value, e);
+            log.error("parseObject error! value=[{}]", value, e);
         }
         return defaultValue;
     }

+ 33 - 8
recommend-feature-service/src/main/resources/application-prod.yml

@@ -1,6 +1,30 @@
 server:
   port: 8080
 
+# gRPC 服务端配置
+grpc:
+  server:
+    # KeepAlive 配置(与客户端配合使用)
+    keep-alive-time: 20s
+    keep-alive-timeout: 5s
+    permit-keep-alive-without-calls: true
+    permit-keep-alive-time: 10s
+    # 连接生命周期管理(服务端配置)
+    max-connection-idle: 300s        # 5分钟空闲后关闭连接
+    max-connection-age: 3600s        # 1小时后强制关闭连接
+    max-connection-age-grace: 5s     # 关闭前宽限期,让进行中的请求完成
+
+# 线程池监控配置
+thread:
+  pool:
+    monitor:
+      enabled: true
+      interval: 30
+      thread:
+        threshold: 0.8
+      queue:
+        threshold: 0.8
+
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
@@ -20,10 +44,11 @@ spring:
     timeout: 1000
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        # 增大连接池容量,防止高并发时连接等待
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
   tair:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
@@ -31,10 +56,10 @@ spring:
     timeout: 1000
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
 
 apollo:
   meta: http://apolloconfig-internal.piaoquantv.com