jiandong.yh hai 21 horas
pai
achega
1968b2717d

+ 5 - 1
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java

@@ -12,6 +12,7 @@ import org.springframework.util.CollectionUtils;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author dyp
@@ -29,7 +30,10 @@ public class FeatureV2Client {
         MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
                 .addAllFeatureKey(protos)
                 .build();
-        MultiGetFeatureResponse response = client.multiGetFeature(request);
+        // 显式设置 deadline:3秒超时,避免无限等待
+        MultiGetFeatureResponse response = client
+                .withDeadlineAfter(3, TimeUnit.SECONDS)
+                .multiGetFeature(request);
         if (response == null || !response.hasResult()) {
             log.info("multiGetFeature grpc error");
             return null;

+ 176 - 10
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolManager.java

@@ -9,6 +9,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -46,8 +47,8 @@ public class DynamicThreadPoolManager {
      * Apollo 配置的线程池参数
      * 配置格式:
      * [
-     * {"poolName":"DEFAULT","corePoolSize":32,"maxPoolSize":64,"queueCapacity":3000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
-     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":64,"maxPoolSize":128,"queueCapacity":5000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
+     * {"poolName":"DEFAULT","corePoolSize":8,"maxPoolSize":16,"queueCapacity":1000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
+     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":16,"maxPoolSize":32,"queueCapacity":1800,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
      * ]
      */
     @ApolloJsonValue("${thread.pool.configs:[]}")
@@ -65,7 +66,7 @@ public class DynamicThreadPoolManager {
         DEFAULT_CONFIG.setPoolName(DEFAULT_POOL);
         DEFAULT_CONFIG.setCorePoolSize(8);
         DEFAULT_CONFIG.setMaxPoolSize(16);
-        DEFAULT_CONFIG.setQueueCapacity(3000);
+        DEFAULT_CONFIG.setQueueCapacity(1000);
         DEFAULT_CONFIG.setKeepAliveSeconds(60);
         DEFAULT_CONFIG.setRejectedPolicy("CALLER_RUNS");
 
@@ -74,11 +75,31 @@ public class DynamicThreadPoolManager {
         MULTI_GET_FEATURE_CONFIG.setPoolName(MULTI_GET_FEATURE_POOL);
         MULTI_GET_FEATURE_CONFIG.setCorePoolSize(16);
         MULTI_GET_FEATURE_CONFIG.setMaxPoolSize(32);
-        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(5000);
+        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(1800);
         MULTI_GET_FEATURE_CONFIG.setKeepAliveSeconds(60);
         MULTI_GET_FEATURE_CONFIG.setRejectedPolicy("CALLER_RUNS");
     }
 
+    /**
+     * 监控阈值配置(可通过 Apollo 动态调整)
+     */
+    @Value("${thread.pool.monitor.enabled:true}")
+    private boolean monitorEnabled;
+
+    @Value("${thread.pool.monitor.interval:30}")
+    private int monitorIntervalSeconds;
+
+    @Value("${thread.pool.monitor.thread.threshold:0.8}")
+    private double threadUsageThreshold;
+
+    @Value("${thread.pool.monitor.queue.threshold:0.8}")
+    private double queueUsageThreshold;
+
+    /**
+     * 定时监控调度器
+     */
+    private ScheduledExecutorService monitorScheduler;
+
     @PostConstruct
     public void init() {
         // 初始化默认线程池
@@ -93,6 +114,93 @@ public class DynamicThreadPoolManager {
         }
 
         log.info("DynamicThreadPoolManager initialized, pools: {}", threadPoolRegistry.keySet());
+
+        // 启动定时监控任务
+        startMonitor();
+    }
+
+    /**
+     * 启动线程池监控任务
+     */
+    private void startMonitor() {
+        if (!monitorEnabled) {
+            log.info("Thread pool monitor is disabled");
+            return;
+        }
+
+        monitorScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").setDaemon(true).build()
+        );
+
+        monitorScheduler.scheduleAtFixedRate(this::checkThreadPoolStatus, 
+                monitorIntervalSeconds, monitorIntervalSeconds, TimeUnit.SECONDS);
+        
+        log.info("Thread pool monitor started, interval={}s, threadThreshold={}%, queueThreshold={}%",
+                monitorIntervalSeconds, (int)(threadUsageThreshold * 100), (int)(queueUsageThreshold * 100));
+    }
+
+    /**
+     * 检查线程池状态,超过阈值时输出告警
+     */
+    private void checkThreadPoolStatus() {
+        try {
+            for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+                String poolName = entry.getKey();
+                ThreadPoolExecutor executor = entry.getValue();
+
+                int activeCount = executor.getActiveCount();
+                int maxPoolSize = executor.getMaximumPoolSize();
+                int queueSize = executor.getQueue().size();
+                int queueCapacity = getQueueCapacity(poolName);
+
+                double threadUsage = (double) activeCount / maxPoolSize;
+                double queueUsage = queueCapacity > 0 ? (double) queueSize / queueCapacity : 0;
+
+                // 线程使用率超过阈值
+                if (threadUsage >= threadUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 线程使用率过高! activeCount={}/{} ({}%), " +
+                            "poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                            poolName, activeCount, maxPoolSize, (int)(threadUsage * 100),
+                            executor.getPoolSize(), queueSize, queueCapacity,
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+
+                // 队列使用率超过阈值
+                if (queueUsage >= queueUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 队列使用率过高! queueSize={}/{} ({}%), " +
+                            "activeCount={}/{}, poolSize={}, completedTasks={}, totalTasks={}",
+                            poolName, queueSize, queueCapacity, (int)(queueUsage * 100),
+                            activeCount, maxPoolSize, executor.getPoolSize(),
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Thread pool monitor error", e);
+        }
+    }
+
+    /**
+     * 手动输出所有线程池当前状态(可用于排查问题)
+     */
+    public void printAllPoolStatus() {
+        log.info("===== Thread Pool Status Report =====");
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            String poolName = entry.getKey();
+            ThreadPoolExecutor executor = entry.getValue();
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.info("[{}] coreSize={}, maxSize={}, poolSize={}, activeCount={}, " +
+                    "queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName,
+                    executor.getCorePoolSize(),
+                    executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getActiveCount(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(),
+                    executor.getTaskCount());
+        }
+        log.info("===== End of Report =====");
     }
 
     /**
@@ -121,20 +229,71 @@ public class DynamicThreadPoolManager {
     }
 
     /**
-     * 获取拒绝策略
+     * 获取拒绝策略(带告警包装)
      */
     private RejectedExecutionHandler getRejectedExecutionHandler(String policy) {
+        RejectedExecutionHandler originalHandler;
         switch (policy.toUpperCase()) {
             case "ABORT":
-                return new ThreadPoolExecutor.AbortPolicy();
+                originalHandler = new ThreadPoolExecutor.AbortPolicy();
+                break;
             case "DISCARD":
-                return new ThreadPoolExecutor.DiscardPolicy();
+                originalHandler = new ThreadPoolExecutor.DiscardPolicy();
+                break;
             case "DISCARD_OLDEST":
-                return new ThreadPoolExecutor.DiscardOldestPolicy();
+                originalHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
+                break;
             case "CALLER_RUNS":
             default:
-                return new ThreadPoolExecutor.CallerRunsPolicy();
+                originalHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                break;
+        }
+        // 包装原始策略,在任务被拒绝时实时输出告警
+        return new AlertingRejectedExecutionHandler(originalHandler, policy);
+    }
+
+    /**
+     * 带告警功能的拒绝策略包装器
+     * 当任务被拒绝时,实时输出线程池状态
+     */
+    private class AlertingRejectedExecutionHandler implements RejectedExecutionHandler {
+        private final RejectedExecutionHandler delegate;
+        private final String policyName;
+        
+        public AlertingRejectedExecutionHandler(RejectedExecutionHandler delegate, String policyName) {
+            this.delegate = delegate;
+            this.policyName = policyName;
+        }
+        
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            // 实时输出告警:任务被拒绝
+            String poolName = getPoolNameByExecutor(executor);
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.error("[ThreadPool REJECTED] [{}] 任务被拒绝! 策略={}, " +
+                    "activeCount={}/{}, poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName, policyName,
+                    executor.getActiveCount(), executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(), executor.getTaskCount());
+            
+            // 执行原始拒绝策略
+            delegate.rejectedExecution(r, executor);
+        }
+    }
+    
+    /**
+     * 根据 executor 实例查找线程池名称
+     */
+    private String getPoolNameByExecutor(ThreadPoolExecutor executor) {
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            if (entry.getValue() == executor) {
+                return entry.getKey();
+            }
         }
+        return "UNKNOWN";
     }
 
     /**
@@ -286,12 +445,19 @@ public class DynamicThreadPoolManager {
         } else if (MULTI_GET_FEATURE_POOL.equals(poolName)) {
             return MULTI_GET_FEATURE_CONFIG.getQueueCapacity();
         }
-        return 3000;
+        return DEFAULT_CONFIG.getQueueCapacity();
     }
 
     @PreDestroy
     public void shutdown() {
         log.info("Shutting down thread pools...");
+        
+        // 先关闭监控调度器
+        if (monitorScheduler != null && !monitorScheduler.isShutdown()) {
+            monitorScheduler.shutdown();
+            log.info("Thread pool monitor shutdown");
+        }
+        
         for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
             ThreadPoolExecutor executor = entry.getValue();
             executor.shutdown();

+ 11 - 4
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/grpcservice/FeatureV2GrpcService.java

@@ -1,10 +1,10 @@
 package com.tzld.piaoquan.recommend.feature.grpcservice;
 
-import com.tzld.piaoquan.recommend.feature.client.ProtobufUtils;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
 import com.tzld.piaoquan.recommend.feature.service.FeatureV2Service;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.server.service.GrpcService;
@@ -23,9 +23,16 @@ public class FeatureV2GrpcService extends FeatureV2ServiceGrpc.FeatureV2ServiceI
     @Override
     public void multiGetFeature(MultiGetFeatureRequest request,
                                 StreamObserver<MultiGetFeatureResponse> responseObserver) {
-        MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
+        try {
+            MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            log.error("multiGetFeature error, keyCount={}", request.getFeatureKeyCount(), e);
+            responseObserver.onError(
+                Status.INTERNAL.withDescription("Feature query failed: " + e.getMessage()).asRuntimeException()
+            );
+        }
     }
 
 }

+ 29 - 8
recommend-feature-service/src/main/resources/application-prod.yml

@@ -1,6 +1,26 @@
 server:
   port: 8080
 
+# gRPC 服务端配置
+grpc:
+  server:
+    # KeepAlive 配置(与客户端配合使用)
+    keep-alive-time: 30s
+    keep-alive-timeout: 5s
+    permit-keep-alive-without-calls: true
+    permit-keep-alive-time: 20s
+
+# 线程池监控配置
+thread:
+  pool:
+    monitor:
+      enabled: true
+      interval: 30
+      thread:
+        threshold: 0.8
+      queue:
+        threshold: 0.8
+
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
@@ -20,10 +40,11 @@ spring:
     timeout: 1000
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        # 增大连接池容量,防止高并发时连接等待
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
   tair:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
@@ -31,10 +52,10 @@ spring:
     timeout: 1000
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
 
 apollo:
   meta: http://apolloconfig-internal.piaoquantv.com