Selaa lähdekoodia

Merge branch 'ljd/server-monitor' into pre-master

# Conflicts:
#	recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java
jiandong.liu 1 viikko sitten
vanhempi
commit
b0e43cc6e0
18 muutettua tiedostoa jossa 1086 lisäystä ja 80 poistoa
  1. 1 1
      recommend-feature-client/pom.xml
  2. 127 14
      recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java
  3. 4 0
      recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/domain/ad/base/AdRankItem.java
  4. 12 0
      recommend-feature-service/pom.xml
  5. 2 1
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/Application.java
  6. 43 0
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolConfig.java
  7. 545 0
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolManager.java
  8. 58 0
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ResizableLinkedBlockingQueue.java
  9. 22 21
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ThreadPoolFactory.java
  10. 11 2
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/RedisTemplateConfig.java
  11. 10 1
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/TairTemplateConfig.java
  12. 11 4
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/grpcservice/FeatureV2GrpcService.java
  13. 144 18
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java
  14. 7 7
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/UserAndVideoFeatureService.java
  15. 5 1
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/JSONUtils.java
  16. 34 0
      recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/web/ThreadPoolMonitorController.java
  17. 35 10
      recommend-feature-service/src/main/resources/application-prod.yml
  18. 15 0
      recommend-feature-service/src/main/resources/application.yml

+ 1 - 1
recommend-feature-client/pom.xml

@@ -10,7 +10,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>recommend-feature-client</artifactId>
-    <version>1.1.22</version>
+    <version>1.1.24</version>
 
     <dependencies>
         <dependency>

+ 127 - 14
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java

@@ -4,6 +4,8 @@ import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.client.inject.GrpcClient;
 import org.springframework.stereotype.Component;
@@ -12,38 +14,149 @@ import org.springframework.util.CollectionUtils;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
+ * FeatureV2 gRPC 客户端
+ * 
+ * 优化说明:
+ * 1. 增加异常处理:捕获 StatusRuntimeException,避免错误向上传播
+ * 2. 增加重试机制:对 UNAVAILABLE 等可恢复错误自动重试(50ms 延迟)
+ * 3. 增加降级处理:重试失败后返回空结果,保证服务稳定
+ * 4. 增加详细日志:记录错误详情,便于问题排查
+ * 
  * @author dyp
  */
 @Component
 @Slf4j
 public class FeatureV2Client {
+    
     @GrpcClient("recommend-feature")
     private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
+    
+    /**
+     * 最大重试次数
+     * 说明:对于网络连接问题,重试可以触发连接重建
+     */
+    private static final int MAX_RETRY_ATTEMPTS = 2;
+    
+    /**
+     * 重试延迟(毫秒)
+     * 说明:50ms 快速重试,给连接重建预留时间
+     */
+    private static final long RETRY_DELAY_MS = 50;
 
+    /**
+     * 批量获取特征数据
+     * 
+     * @param protos 特征请求列表
+     * @return 特征数据 Map,key 为 uniqueKey,value 为特征值 JSON 字符串
+     */
     public Map<String, String> multiGetFeature(List<FeatureKeyProto> protos) {
         if (CollectionUtils.isEmpty(protos)) {
+            log.warn("multiGetFeature: protos is empty, return empty map");
             return Collections.emptyMap();
         }
+
+        long startTime = System.currentTimeMillis();
+        // 从第 0 次尝试开始
+        Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
+        log.info("multiGetFeature: end, result.size={}, cost={}", result != null ? result.size() : 0, System.currentTimeMillis() - startTime);
+        return result;
+    }
+    
+    /**
+     * 带重试的特征获取方法
+     * 
+     * @param protos 特征请求列表
+     * @param attemptCount 当前重试次数(从 0 开始)
+     * @return 特征数据 Map
+     */
+    private Map<String, String> multiGetFeatureWithRetry(List<FeatureKeyProto> protos, int attemptCount) {
+        log.warn("multiGetFeatureWithRetry: start, attempt={}, protos.size={}", attemptCount, protos.size());
+        
         MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
                 .addAllFeatureKey(protos)
                 .build();
-        MultiGetFeatureResponse response = client.multiGetFeature(request);
-        if (response == null || !response.hasResult()) {
-            log.info("multiGetFeature grpc error");
-            return null;
-        }
-        if (response.getResult().getCode() != 1) {
-            log.info("multiGetFeature grpc code={}, msg={}", response.getResult().getCode(),
-                    response.getResult().getMessage());
-            return null;
-        }
-        if (response.getFeatureCount() == 0) {
-            log.info("multiGetFeature no feature");
+        
+        try {
+            // 调用 gRPC 服务,设置 3 秒超时
+            MultiGetFeatureResponse response = client
+                    .withDeadlineAfter(3, TimeUnit.SECONDS)
+                    .multiGetFeature(request);
+            
+            // 响应为空或没有结果
+            if (response == null || !response.hasResult()) {
+                log.info("multiGetFeature grpc error: response is null or has no result, attempt={}", attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 业务错误码检查
+            if (response.getResult().getCode() != 1) {
+                log.info("multiGetFeature grpc code={}, msg={}, attempt={}", response.getResult().getCode(),
+                        response.getResult().getMessage(), attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 特征数据为空
+            if (response.getFeatureCount() == 0) {
+                log.info("multiGetFeature no feature, attempt={}", attemptCount);
+                return Collections.emptyMap();
+            }
+            
+            // 成功返回特征数据
+            return response.getFeatureMap();
+            
+        } catch (StatusRuntimeException e) {
+            Status.Code code = e.getStatus().getCode();
+            String description = e.getStatus().getDescription();
+            
+            // 记录详细的错误信息(使用 error 级别确保一定会输出)
+            log.error("gRPC call failed: code={}, description={}, attempt={}/{}, protos.size={}, exception={}", 
+                    code, description, attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e.getClass().getName(), e);
+            
+            // 判断是否应该重试
+            if (shouldRetry(code) && attemptCount < MAX_RETRY_ATTEMPTS) {
+                log.warn("Retrying gRPC call after {}ms, attempt={}/{}, reason={}", RETRY_DELAY_MS, attemptCount + 1, MAX_RETRY_ATTEMPTS, code);
+                
+                // 等待一段时间后重试(给连接重建预留时间)
+                try {
+                    Thread.sleep(RETRY_DELAY_MS);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    log.warn("Retry sleep interrupted", ie);
+                }
+                
+                // 递归调用,进行重试
+                return multiGetFeatureWithRetry(protos, attemptCount + 1);
+            }
+            
+            // 重试失败或不可重试的错误,降级返回空结果
+            log.error("gRPC call failed after {} attempts, returning empty result for graceful degradation. code={}", 
+                    attemptCount + 1, code);
+            return Collections.emptyMap();
+        } catch (Exception e) {
+            // 捕获所有其他异常(不应该发生,但为了安全起见)
+            log.error("multiGetFeatureWithRetry: unexpected exception, attempt={}/{}, protos.size={}, exception={}", 
+                    attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e.getClass().getName(), e);
             return Collections.emptyMap();
         }
-        return response.getFeatureMap();
     }
-
+    
+    /**
+     * 判断错误是否应该重试
+     * 
+     * @param code gRPC 状态码
+     * @return true 表示应该重试,false 表示不应该重试
+     */
+    private boolean shouldRetry(Status.Code code) {
+        // UNAVAILABLE: 连接不可用(如网络断开、连接关闭)- 应该重试
+        // DEADLINE_EXCEEDED: 超时 - 应该重试
+        // RESOURCE_EXHAUSTED: 资源耗尽(如连接池满)- 应该重试
+        // CANCELLED: 请求被取消(如服务端处理失败、连接中断)- 应该重试
+        return code == Status.Code.UNAVAILABLE 
+                || code == Status.Code.DEADLINE_EXCEEDED 
+                || code == Status.Code.RESOURCE_EXHAUSTED
+                || code == Status.Code.CANCELLED;
+    }
 }

+ 4 - 0
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/domain/ad/base/AdRankItem.java

@@ -59,6 +59,10 @@ public class AdRankItem implements Comparable<AdRankItem> {
     private Long id;
     // 广告id
     private Long skuId;
+    //客户id
+    private Long customerId;
+    //行业
+    private String profession;
 
     // 特征
     private Map<String, Object> ext = new HashMap<>();

+ 12 - 0
recommend-feature-service/pom.xml

@@ -130,6 +130,18 @@
             <artifactId>snappy-java</artifactId>
             <version>1.1.8.4</version>
         </dependency>
+
+        <!-- Actuator 提供监控端点 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+
+        <!-- Micrometer Prometheus 注册表 -->
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <finalName>recommend-feature-service</finalName>

+ 2 - 1
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/Application.java

@@ -17,7 +17,8 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
         "com.tzld.piaoquan.recommend.feature.service",
         "com.tzld.piaoquan.recommend.feature.grpcservice",
         "com.tzld.piaoquan.recommend.feature.web",
-        "com.tzld.piaoquan.recommend.feature.config"
+        "com.tzld.piaoquan.recommend.feature.config",
+        "com.tzld.piaoquan.recommend.feature.common"
 })
 @EnableEurekaClient
 @EnableAspectJAutoProxy

+ 43 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolConfig.java

@@ -0,0 +1,43 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import lombok.Data;
+
+/**
+ * 动态线程池配置
+ *
+ * @author ljd
+ */
+@Data
+public class DynamicThreadPoolConfig {
+
+    /**
+     * 线程池名称
+     */
+    private String poolName;
+
+    /**
+     * 核心线程数(默认值针对 2 核 CPU 优化)
+     */
+    private int corePoolSize = 8;
+
+    /**
+     * 最大线程数
+     */
+    private int maxPoolSize = 16;
+
+    /**
+     * 队列容量
+     */
+    private int queueCapacity = 3000;
+
+    /**
+     * 空闲线程存活时间(秒)
+     */
+    private long keepAliveSeconds = 60;
+
+    /**
+     * 拒绝策略: ABORT, CALLER_RUNS, DISCARD, DISCARD_OLDEST
+     */
+    private String rejectedPolicy = "CALLER_RUNS";
+
+}

+ 545 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolManager.java

@@ -0,0 +1,545 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import com.alibaba.fastjson.JSON;
+import com.ctrip.framework.apollo.model.ConfigChange;
+import com.ctrip.framework.apollo.model.ConfigChangeEvent;
+import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 动态线程池管理器
+ * 支持通过 Apollo 配置动态调整线程池参数
+ *
+ * @author ljd
+ */
+@Slf4j
+@Component
+public class DynamicThreadPoolManager {
+
+    /**
+     * 线程池注册表
+     */
+    private final Map<String, ThreadPoolExecutor> threadPoolRegistry = new ConcurrentHashMap<>();
+
+    /**
+     * 默认线程池名称
+     */
+    public static final String DEFAULT_POOL = "DEFAULT";
+
+    /**
+     * 特征查询线程池名称
+     */
+    public static final String MULTI_GET_FEATURE_POOL = "MULTI_GET_FEATURE";
+
+    /**
+     * Apollo 配置的线程池参数
+     * 配置格式:
+     * [
+     * {"poolName":"DEFAULT","corePoolSize":8,"maxPoolSize":16,"queueCapacity":1000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
+     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":16,"maxPoolSize":32,"queueCapacity":1800,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
+     * ]
+     */
+    @ApolloJsonValue("${thread.pool.configs:[]}")
+    private List<DynamicThreadPoolConfig> threadPoolConfigs;
+
+    /**
+     * 默认配置
+     */
+    private static final DynamicThreadPoolConfig DEFAULT_CONFIG;
+    private static final DynamicThreadPoolConfig MULTI_GET_FEATURE_CONFIG;
+
+    static {
+        // 针对 2 核 CPU 优化的默认配置
+        DEFAULT_CONFIG = new DynamicThreadPoolConfig();
+        DEFAULT_CONFIG.setPoolName(DEFAULT_POOL);
+        DEFAULT_CONFIG.setCorePoolSize(8);
+        DEFAULT_CONFIG.setMaxPoolSize(16);
+        DEFAULT_CONFIG.setQueueCapacity(1000);
+        DEFAULT_CONFIG.setKeepAliveSeconds(60);
+        DEFAULT_CONFIG.setRejectedPolicy("CALLER_RUNS");
+
+        // IO 密集型任务(Redis 查询),可以多一些线程
+        MULTI_GET_FEATURE_CONFIG = new DynamicThreadPoolConfig();
+        MULTI_GET_FEATURE_CONFIG.setPoolName(MULTI_GET_FEATURE_POOL);
+        MULTI_GET_FEATURE_CONFIG.setCorePoolSize(16);
+        MULTI_GET_FEATURE_CONFIG.setMaxPoolSize(32);
+        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(1800);
+        MULTI_GET_FEATURE_CONFIG.setKeepAliveSeconds(60);
+        MULTI_GET_FEATURE_CONFIG.setRejectedPolicy("CALLER_RUNS");
+    }
+
+    /**
+     * 监控阈值配置(可通过 Apollo 动态调整)
+     */
+    @Value("${thread.pool.monitor.enabled:true}")
+    private boolean monitorEnabled;
+
+    @Value("${thread.pool.monitor.interval:30}")
+    private int monitorIntervalSeconds;
+
+    @Value("${thread.pool.monitor.thread.threshold:0.8}")
+    private double threadUsageThreshold;
+
+    @Value("${thread.pool.monitor.queue.threshold:0.8}")
+    private double queueUsageThreshold;
+
+    @Autowired
+    private MeterRegistry meterRegistry;
+
+    /**
+     * 定时监控调度器
+     */
+    private ScheduledExecutorService monitorScheduler;
+
+    @PostConstruct
+    public void init() {
+        // 初始化默认线程池
+        initThreadPool(DEFAULT_CONFIG);
+        initThreadPool(MULTI_GET_FEATURE_CONFIG);
+
+        // 根据 Apollo 配置覆盖
+        if (threadPoolConfigs != null && !threadPoolConfigs.isEmpty()) {
+            for (DynamicThreadPoolConfig config : threadPoolConfigs) {
+                updateThreadPool(config);
+            }
+        }
+
+        log.info("DynamicThreadPoolManager initialized, pools: {}", threadPoolRegistry.keySet());
+
+        // 启动定时监控任务
+        startMonitor();
+    }
+
+    /**
+     * 启动线程池监控任务
+     */
+    private void startMonitor() {
+        if (!monitorEnabled) {
+            log.info("Thread pool monitor is disabled");
+            return;
+        }
+
+        monitorScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").setDaemon(true).build()
+        );
+
+        monitorScheduler.scheduleAtFixedRate(this::checkThreadPoolStatus, 
+                monitorIntervalSeconds, monitorIntervalSeconds, TimeUnit.SECONDS);
+        
+        log.info("Thread pool monitor started, interval={}s, threadThreshold={}%, queueThreshold={}%",
+                monitorIntervalSeconds, (int)(threadUsageThreshold * 100), (int)(queueUsageThreshold * 100));
+    }
+
+    /**
+     * 检查线程池状态,超过阈值时输出告警
+     */
+    private void checkThreadPoolStatus() {
+        try {
+            for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+                String poolName = entry.getKey();
+                ThreadPoolExecutor executor = entry.getValue();
+
+                int activeCount = executor.getActiveCount();
+                int maxPoolSize = executor.getMaximumPoolSize();
+                int queueSize = executor.getQueue().size();
+                int queueCapacity = getQueueCapacity(poolName);
+
+                double threadUsage = (double) activeCount / maxPoolSize;
+                double queueUsage = queueCapacity > 0 ? (double) queueSize / queueCapacity : 0;
+
+                // 线程使用率超过阈值
+                if (threadUsage >= threadUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 线程使用率过高! activeCount={}/{} ({}%), " +
+                            "poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                            poolName, activeCount, maxPoolSize, (int)(threadUsage * 100),
+                            executor.getPoolSize(), queueSize, queueCapacity,
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+
+                // 队列使用率超过阈值
+                if (queueUsage >= queueUsageThreshold) {
+                    log.warn("[ThreadPool ALERT] [{}] 队列使用率过高! queueSize={}/{} ({}%), " +
+                            "activeCount={}/{}, poolSize={}, completedTasks={}, totalTasks={}",
+                            poolName, queueSize, queueCapacity, (int)(queueUsage * 100),
+                            activeCount, maxPoolSize, executor.getPoolSize(),
+                            executor.getCompletedTaskCount(), executor.getTaskCount());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Thread pool monitor error", e);
+        }
+    }
+
+    /**
+     * 手动输出所有线程池当前状态(可用于排查问题)
+     */
+    public void printAllPoolStatus() {
+        log.info("===== Thread Pool Status Report =====");
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            String poolName = entry.getKey();
+            ThreadPoolExecutor executor = entry.getValue();
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.info("[{}] coreSize={}, maxSize={}, poolSize={}, activeCount={}, " +
+                    "queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName,
+                    executor.getCorePoolSize(),
+                    executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getActiveCount(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(),
+                    executor.getTaskCount());
+        }
+        log.info("===== End of Report =====");
+    }
+
+    /**
+     * 初始化线程池
+     */
+    private void initThreadPool(DynamicThreadPoolConfig config) {
+        ThreadPoolExecutor executor = createThreadPoolExecutor(config);
+        threadPoolRegistry.put(config.getPoolName(), executor);
+        registerMetrics(executor, config.getPoolName());
+        log.info("Thread pool [{}] initialized: coreSize={}, maxSize={}, queueCapacity={}",
+                config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize(), config.getQueueCapacity());
+    }
+
+    /**
+     * 注册线程池指标到 Micrometer
+     */
+    private void registerMetrics(ThreadPoolExecutor executor, String poolName) {
+        Gauge.builder("threadpool.core.size", executor, ThreadPoolExecutor::getCorePoolSize)
+                .tag("pool", poolName)
+                .description("Core pool size")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.max.size", executor, ThreadPoolExecutor::getMaximumPoolSize)
+                .tag("pool", poolName)
+                .description("Maximum pool size")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.active.count", executor, ThreadPoolExecutor::getActiveCount)
+                .tag("pool", poolName)
+                .description("Active thread count")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
+                .tag("pool", poolName)
+                .description("Current pool size")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.queue.size", executor, e -> e.getQueue().size())
+                .tag("pool", poolName)
+                .description("Queue size")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.queue.capacity", () -> getQueueCapacity(poolName))
+                .tag("pool", poolName)
+                .description("Queue capacity")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.completed.tasks", executor, ThreadPoolExecutor::getCompletedTaskCount)
+                .tag("pool", poolName)
+                .description("Completed task count")
+                .register(meterRegistry);
+
+        Gauge.builder("threadpool.task.count", executor, ThreadPoolExecutor::getTaskCount)
+                .tag("pool", poolName)
+                .description("Total task count")
+                .register(meterRegistry);
+    }
+
+    /**
+     * 创建线程池
+     */
+    private ThreadPoolExecutor createThreadPoolExecutor(DynamicThreadPoolConfig config) {
+        return new ThreadPoolExecutor(
+                config.getCorePoolSize(),
+                config.getMaxPoolSize(),
+                config.getKeepAliveSeconds(),
+                TimeUnit.SECONDS,
+                new ResizableLinkedBlockingQueue<>(config.getQueueCapacity()),
+                new ThreadFactoryBuilder().setNameFormat(config.getPoolName() + "-%d").build(),
+                getRejectedExecutionHandler(config.getRejectedPolicy())
+        );
+    }
+
+    /**
+     * 获取拒绝策略(带告警包装)
+     */
+    private RejectedExecutionHandler getRejectedExecutionHandler(String policy) {
+        RejectedExecutionHandler originalHandler;
+        switch (policy.toUpperCase()) {
+            case "ABORT":
+                originalHandler = new ThreadPoolExecutor.AbortPolicy();
+                break;
+            case "DISCARD":
+                originalHandler = new ThreadPoolExecutor.DiscardPolicy();
+                break;
+            case "DISCARD_OLDEST":
+                originalHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
+                break;
+            case "CALLER_RUNS":
+            default:
+                originalHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                break;
+        }
+        // 包装原始策略,在任务被拒绝时实时输出告警
+        return new AlertingRejectedExecutionHandler(originalHandler, policy);
+    }
+
+    /**
+     * 带告警功能的拒绝策略包装器
+     * 当任务被拒绝时,实时输出线程池状态
+     */
+    private class AlertingRejectedExecutionHandler implements RejectedExecutionHandler {
+        private final RejectedExecutionHandler delegate;
+        private final String policyName;
+        
+        public AlertingRejectedExecutionHandler(RejectedExecutionHandler delegate, String policyName) {
+            this.delegate = delegate;
+            this.policyName = policyName;
+        }
+        
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            // 实时输出告警:任务被拒绝
+            String poolName = getPoolNameByExecutor(executor);
+            int queueCapacity = getQueueCapacity(poolName);
+            
+            log.error("[ThreadPool REJECTED] [{}] 任务被拒绝! 策略={}, " +
+                    "activeCount={}/{}, poolSize={}, queueSize={}/{}, completedTasks={}, totalTasks={}",
+                    poolName, policyName,
+                    executor.getActiveCount(), executor.getMaximumPoolSize(),
+                    executor.getPoolSize(),
+                    executor.getQueue().size(), queueCapacity,
+                    executor.getCompletedTaskCount(), executor.getTaskCount());
+            
+            // 执行原始拒绝策略
+            delegate.rejectedExecution(r, executor);
+        }
+    }
+    
+    /**
+     * 根据 executor 实例查找线程池名称
+     */
+    private String getPoolNameByExecutor(ThreadPoolExecutor executor) {
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            if (entry.getValue() == executor) {
+                return entry.getKey();
+            }
+        }
+        return "UNKNOWN";
+    }
+
+    /**
+     * 监听 Apollo 配置变更
+     * 注意:手动解析新配置值,避免与 @ApolloJsonValue 自动注入产生竞态条件
+     */
+    @ApolloConfigChangeListener
+    public void onConfigChange(ConfigChangeEvent changeEvent) {
+        if (changeEvent.isChanged("thread.pool.configs")) {
+            ConfigChange change = changeEvent.getChange("thread.pool.configs");
+            log.info("Thread pool config changed, old={}, new={}", change.getOldValue(), change.getNewValue());
+
+            // 手动解析新配置值,避免竞态条件
+            String newValue = change.getNewValue();
+            if (StringUtils.isNotBlank(newValue)) {
+                try {
+                    List<DynamicThreadPoolConfig> newConfigs = JSON.parseArray(newValue, DynamicThreadPoolConfig.class);
+                    if (newConfigs != null) {
+                        for (DynamicThreadPoolConfig config : newConfigs) {
+                            updateThreadPool(config);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Failed to parse thread pool config: {}", newValue, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * 动态更新线程池参数
+     */
+    public void updateThreadPool(DynamicThreadPoolConfig config) {
+        // 参数验证
+        if (config.getCorePoolSize() <= 0 || config.getMaxPoolSize() <= 0) {
+            log.error("Invalid pool size for [{}]: corePoolSize={}, maxPoolSize={} must be positive",
+                    config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
+            return;
+        }
+        if (config.getCorePoolSize() > config.getMaxPoolSize()) {
+            log.error("Invalid pool size for [{}]: corePoolSize={} > maxPoolSize={}",
+                    config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
+            return;
+        }
+        if (config.getQueueCapacity() <= 0) {
+            log.error("Invalid queueCapacity for [{}]: {} must be positive",
+                    config.getPoolName(), config.getQueueCapacity());
+            return;
+        }
+
+        ThreadPoolExecutor executor = threadPoolRegistry.get(config.getPoolName());
+        if (executor == null) {
+            log.warn("Thread pool [{}] not found, creating new one", config.getPoolName());
+            initThreadPool(config);
+            return;
+        }
+
+        // 动态调整参数
+        int oldCoreSize = executor.getCorePoolSize();
+        int oldMaxSize = executor.getMaximumPoolSize();
+
+        // 注意:调整顺序很重要,避免 coreSize > maxSize 的情况
+        if (config.getCorePoolSize() > executor.getMaximumPoolSize()) {
+            executor.setMaximumPoolSize(config.getMaxPoolSize());
+            executor.setCorePoolSize(config.getCorePoolSize());
+        } else {
+            executor.setCorePoolSize(config.getCorePoolSize());
+            executor.setMaximumPoolSize(config.getMaxPoolSize());
+        }
+
+        executor.setKeepAliveTime(config.getKeepAliveSeconds(), TimeUnit.SECONDS);
+
+        // 更新拒绝策略
+        RejectedExecutionHandler newHandler = getRejectedExecutionHandler(config.getRejectedPolicy());
+        executor.setRejectedExecutionHandler(newHandler);
+
+        // 更新队列容量(如果是可调整大小的队列)
+        if (executor.getQueue() instanceof ResizableLinkedBlockingQueue) {
+            ((ResizableLinkedBlockingQueue<?>) executor.getQueue()).setCapacity(config.getQueueCapacity());
+        }
+
+        log.info("Thread pool [{}] updated: coreSize {} -> {}, maxSize {} -> {}, queueCapacity={}, rejectedPolicy={}",
+                config.getPoolName(), oldCoreSize, config.getCorePoolSize(),
+                oldMaxSize, config.getMaxPoolSize(), config.getQueueCapacity(), config.getRejectedPolicy());
+    }
+
+    /**
+     * 获取线程池
+     */
+    public ExecutorService getThreadPool(String poolName) {
+        ExecutorService executor = threadPoolRegistry.get(poolName);
+        if (executor == null) {
+            log.warn("Thread pool [{}] not found, using DEFAULT pool", poolName);
+            return threadPoolRegistry.get(DEFAULT_POOL);
+        }
+        return executor;
+    }
+
+    /**
+     * 获取默认线程池
+     */
+    public ExecutorService getDefaultPool() {
+        return getThreadPool(DEFAULT_POOL);
+    }
+
+    /**
+     * 获取特征查询线程池
+     */
+    public ExecutorService getMultiGetFeaturePool() {
+        return getThreadPool(MULTI_GET_FEATURE_POOL);
+    }
+
+    /**
+     * 获取线程池状态信息
+     */
+    public Map<String, ThreadPoolStats> getThreadPoolStats() {
+        Map<String, ThreadPoolStats> statsMap = new ConcurrentHashMap<>();
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            ThreadPoolExecutor executor = entry.getValue();
+            ThreadPoolStats stats = new ThreadPoolStats();
+            stats.setPoolName(entry.getKey());
+            stats.setCorePoolSize(executor.getCorePoolSize());
+            stats.setMaxPoolSize(executor.getMaximumPoolSize());
+            stats.setActiveCount(executor.getActiveCount());
+            stats.setPoolSize(executor.getPoolSize());
+            stats.setQueueSize(executor.getQueue().size());
+            stats.setQueueCapacity(getQueueCapacity(entry.getKey()));
+            stats.setCompletedTaskCount(executor.getCompletedTaskCount());
+            stats.setTaskCount(executor.getTaskCount());
+            statsMap.put(entry.getKey(), stats);
+        }
+        return statsMap;
+    }
+
+    /**
+     * 获取指定线程池的队列容量
+     */
+    private int getQueueCapacity(String poolName) {
+        if (threadPoolConfigs != null) {
+            for (DynamicThreadPoolConfig config : threadPoolConfigs) {
+                if (config.getPoolName().equals(poolName)) {
+                    return config.getQueueCapacity();
+                }
+            }
+        }
+        // 返回默认值
+        if (DEFAULT_POOL.equals(poolName)) {
+            return DEFAULT_CONFIG.getQueueCapacity();
+        } else if (MULTI_GET_FEATURE_POOL.equals(poolName)) {
+            return MULTI_GET_FEATURE_CONFIG.getQueueCapacity();
+        }
+        return DEFAULT_CONFIG.getQueueCapacity();
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("Shutting down thread pools...");
+        
+        // 先关闭监控调度器
+        if (monitorScheduler != null && !monitorScheduler.isShutdown()) {
+            monitorScheduler.shutdown();
+            log.info("Thread pool monitor shutdown");
+        }
+        
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            ThreadPoolExecutor executor = entry.getValue();
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+            log.info("Thread pool [{}] shutdown completed", entry.getKey());
+        }
+    }
+
+    /**
+     * 线程池状态信息
+     */
+    @Data
+    public static class ThreadPoolStats {
+        private String poolName;
+        private int corePoolSize;
+        private int maxPoolSize;
+        private int activeCount;
+        private int poolSize;
+        private int queueSize;
+        private int queueCapacity;
+        private long completedTaskCount;
+        private long taskCount;
+    }
+
+}

+ 58 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ResizableLinkedBlockingQueue.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * 可动态调整容量的阻塞队列
+ * 继承 LinkedBlockingQueue,支持运行时修改队列容量
+ *
+ * <p>注意:容量限制是 "尽力而为" 的软限制。由于 {@link #offer(Object)} 方法中的
+ * {@code size() >= capacity} 检查与 {@code super.offer(e)} 调用不是原子操作,
+ * 在高并发场景下队列实际大小可能略超过设定容量。对于动态线程池的使用场景,这种偏差是可接受的。</p>
+ *
+ * @author ljd
+ */
+public class ResizableLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 动态容量
+     */
+    private volatile int capacity;
+
+    public ResizableLinkedBlockingQueue(int capacity) {
+        super(Integer.MAX_VALUE);
+        this.capacity = capacity;
+    }
+
+    /**
+     * 设置新的容量
+     * 注意:如果当前队列大小已超过新容量,不会删除已有元素,但会阻止新元素入队
+     */
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    /**
+     * 获取当前容量
+     */
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public boolean offer(E e) {
+        // 如果当前队列大小已达到容量限制,拒绝入队
+        if (size() >= capacity) {
+            return false;
+        }
+        return super.offer(e);
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Math.max(0, capacity - size());
+    }
+
+}

+ 22 - 21
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ThreadPoolFactory.java

@@ -1,37 +1,38 @@
 package com.tzld.piaoquan.recommend.feature.common;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
+ * 线程池工厂
+ * 委托给 DynamicThreadPoolManager 管理,支持动态配置
+ *
  * @author dyp
  */
-public final class ThreadPoolFactory {
-    private final static ExecutorService DEFAULT = new CommonThreadPoolExecutor(
-            128,
-            128,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
-            new ThreadFactoryBuilder().setNameFormat("DEFAULT-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
-    private final static ExecutorService MULTI_GET_FEATURE = new CommonThreadPoolExecutor(
-            256,
-            256,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
-            new ThreadFactoryBuilder().setNameFormat("MultiGetFeaturePool-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+@Component
+public class ThreadPoolFactory {
+
+    private static DynamicThreadPoolManager dynamicThreadPoolManager;
+
+    @Autowired
+    public void setDynamicThreadPoolManager(DynamicThreadPoolManager manager) {
+        ThreadPoolFactory.dynamicThreadPoolManager = manager;
+    }
 
     public static ExecutorService defaultPool() {
-        return DEFAULT;
+        if (dynamicThreadPoolManager == null) {
+            throw new IllegalStateException("DynamicThreadPoolManager not initialized");
+        }
+        return dynamicThreadPoolManager.getDefaultPool();
     }
 
     public static ExecutorService multiGetFeaturePool() {
-        return MULTI_GET_FEATURE;
+        if (dynamicThreadPoolManager == null) {
+            throw new IllegalStateException("DynamicThreadPoolManager not initialized");
+        }
+        return dynamicThreadPoolManager.getMultiGetFeaturePool();
     }
 
 }

+ 11 - 2
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/RedisTemplateConfig.java

@@ -2,6 +2,7 @@ package com.tzld.piaoquan.recommend.feature.config;
 
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,9 +15,14 @@ import org.springframework.data.redis.connection.lettuce.LettucePoolingClientCon
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
+import java.time.Duration;
+
 @Configuration
 public class RedisTemplateConfig {
 
+    @Value("${spring.redis.timeout:5000}")
+    private long redisTimeout;
+
     @Bean("redisPool")
     @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
     public GenericObjectPoolConfig<LettucePoolingClientConfiguration> redisPool() {
@@ -25,7 +31,7 @@ public class RedisTemplateConfig {
 
     @Bean("redisConfig")
     @ConfigurationProperties(prefix = "spring.redis")
-    public RedisStandaloneConfiguration tairConfig() {
+    public RedisStandaloneConfiguration redisConfig() {
         return new RedisStandaloneConfiguration();
     }
 
@@ -34,7 +40,10 @@ public class RedisTemplateConfig {
     public LettuceConnectionFactory factory(GenericObjectPoolConfig<LettucePoolingClientConfiguration> redisPool,
                                             RedisStandaloneConfiguration redisConfig) {
         LettuceClientConfiguration lettuceClientConfiguration =
-                LettucePoolingClientConfiguration.builder().poolConfig(redisPool).build();
+                LettucePoolingClientConfiguration.builder()
+                        .poolConfig(redisPool)
+                        .commandTimeout(Duration.ofMillis(redisTimeout))
+                        .build();
         return new LettuceConnectionFactory(redisConfig, lettuceClientConfiguration);
     }
 

+ 10 - 1
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/TairTemplateConfig.java

@@ -2,6 +2,7 @@ package com.tzld.piaoquan.recommend.feature.config;
 
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,9 +15,14 @@ import org.springframework.data.redis.connection.lettuce.LettucePoolingClientCon
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
+import java.time.Duration;
+
 @Configuration
 public class TairTemplateConfig {
 
+    @Value("${spring.tair.timeout:5000}")
+    private long tairTimeout;
+
     @Bean("tairPool")
     @ConfigurationProperties(prefix = "spring.tair.lettuce.pool")
     public GenericObjectPoolConfig<LettucePoolingClientConfiguration> tairPool() {
@@ -34,7 +40,10 @@ public class TairTemplateConfig {
     public LettuceConnectionFactory factory(@Qualifier("tairPool") GenericObjectPoolConfig<LettucePoolingClientConfiguration> tairPool,
                                             @Qualifier("tairConfig") RedisStandaloneConfiguration tairConfig) {
         LettuceClientConfiguration lettuceClientConfiguration =
-                LettucePoolingClientConfiguration.builder().poolConfig(tairPool).build();
+                LettucePoolingClientConfiguration.builder()
+                        .poolConfig(tairPool)
+                        .commandTimeout(Duration.ofMillis(tairTimeout))
+                        .build();
         return new LettuceConnectionFactory(tairConfig, lettuceClientConfiguration);
     }
 

+ 11 - 4
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/grpcservice/FeatureV2GrpcService.java

@@ -1,10 +1,10 @@
 package com.tzld.piaoquan.recommend.feature.grpcservice;
 
-import com.tzld.piaoquan.recommend.feature.client.ProtobufUtils;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureV2ServiceGrpc;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
 import com.tzld.piaoquan.recommend.feature.service.FeatureV2Service;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.server.service.GrpcService;
@@ -23,9 +23,16 @@ public class FeatureV2GrpcService extends FeatureV2ServiceGrpc.FeatureV2ServiceI
     @Override
     public void multiGetFeature(MultiGetFeatureRequest request,
                                 StreamObserver<MultiGetFeatureResponse> responseObserver) {
-        MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
+        try {
+            MultiGetFeatureResponse response = featureV2Service.multiGetFeature(request);
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            log.error("multiGetFeature error, keyCount={}", request.getFeatureKeyCount(), e);
+            responseObserver.onError(
+                Status.INTERNAL.withDescription("Feature query failed: " + e.getMessage()).asRuntimeException()
+            );
+        }
     }
 
 }

+ 144 - 18
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java

@@ -1,23 +1,29 @@
 package com.tzld.piaoquan.recommend.feature.service;
 
+import com.alibaba.fastjson.JSON;
+import com.ctrip.framework.apollo.model.ConfigChange;
+import com.ctrip.framework.apollo.model.ConfigChangeEvent;
+import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.base.Strings;
+import com.tzld.piaoquan.recommend.feature.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.feature.model.common.Result;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
 import com.tzld.piaoquan.recommend.feature.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.feature.util.CompressionUtil;
-import com.tzld.piaoquan.recommend.feature.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
-import java.util.List;
-import java.util.Optional;
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * @author dyp
@@ -32,40 +38,160 @@ public class FeatureV2Service {
     @ApolloJsonValue("${dts.config:}")
     private List<DTSConfig> newDtsConfigs;
 
+    @Value("${feature.mget.batch.size:200}")
+    private Integer batchSize;
+
+    /**
+     * DTSConfig 缓存,按 tableName 索引,避免每次 redisKey() 都遍历列表
+     * 启动时通过 @PostConstruct 初始化,配置变更时通过 @ApolloConfigChangeListener 更新
+     */
+    private volatile Map<String, DTSConfig> dtsConfigCache = Collections.emptyMap();
+
+    @PostConstruct
+    public void init() {
+        rebuildDtsConfigCache();
+    }
+
+    /**
+     * 重建 DTSConfig 缓存(从 newDtsConfigs 字段)
+     */
+    private void rebuildDtsConfigCache() {
+        rebuildDtsConfigCacheFromList(newDtsConfigs);
+    }
+
+    /**
+     * 重建 DTSConfig 缓存
+     * @param configs 配置列表
+     */
+    private void rebuildDtsConfigCacheFromList(List<DTSConfig> configs) {
+        if (configs == null || configs.isEmpty()) {
+            dtsConfigCache = Collections.emptyMap();
+            log.info("DTSConfig cache rebuilt, size=0");
+            return;
+        }
+        Map<String, DTSConfig> cache = new HashMap<>(configs.size());
+        for (DTSConfig config : configs) {
+            if (config.getOdps() != null && StringUtils.isNotBlank(config.getOdps().getTable())) {
+                cache.put(config.getOdps().getTable(), config);
+            }
+        }
+        dtsConfigCache = cache;
+        log.info("DTSConfig cache rebuilt, size={}", cache.size());
+    }
+
+    /**
+     * 监听 Apollo 配置变更,自动重建缓存
+     * 手动解析新配置值,避免与 @ApolloJsonValue 自动注入产生竞态条件
+     */
+    @ApolloConfigChangeListener
+    public void onConfigChange(ConfigChangeEvent changeEvent) {
+        if (changeEvent.isChanged("dts.config")) {
+            ConfigChange change = changeEvent.getChange("dts.config");
+            String newValue = change.getNewValue();
+            log.info("dts.config changed, old={}, new={}", change.getOldValue(), newValue);
+
+            if (StringUtils.isNotBlank(newValue)) {
+                try {
+                    List<DTSConfig> newConfigs = JSON.parseArray(newValue, DTSConfig.class);
+                    rebuildDtsConfigCacheFromList(newConfigs);
+                } catch (Exception e) {
+                    log.error("Failed to parse dts.config: {}", newValue, e);
+                }
+            } else {
+                dtsConfigCache = Collections.emptyMap();
+                log.info("dts.config is empty, cache cleared");
+            }
+        }
+    }
+
     public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
-        if (request.getFeatureKeyCount() == 0) {
+        int keyCount = request.getFeatureKeyCount();
+
+        if (keyCount == 0) {
             return MultiGetFeatureResponse.newBuilder()
                     .setResult(Result.newBuilder().setCode(1))
                     .build();
         }
-        // 目前都在一个Redis,所以放在一个list简化处理
-        List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), fk -> redisKey(fk));
-        List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
-        // 兼容老的数据
-        values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
 
-        log.info("feature key {} value {}", JSONUtils.toJson(redisKeys), JSONUtils.toJson(values));
+        long startTime = System.currentTimeMillis();
+
+        // 1. 生成Redis Key列表
+        List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
+
+        int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
+
+        // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
+        List<BatchFuture> futures = new ArrayList<>();
+        for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
+            int end = Math.min(start + safeBatchSize, redisKeys.size());
+            List<String> batchKeys = redisKeys.subList(start, end);
+            Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
+                    .submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
+            futures.add(new BatchFuture(start, end - start, future));
+        }
+
+        // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
+        List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
+        long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
+        for (BatchFuture bf : futures) {
+            long remainingNanos = deadlineNanos - System.nanoTime();
+            if (remainingNanos <= 0) {
+                bf.future.cancel(true);
+                continue;
+            }
+            try {
+                List<String> batchValues = bf.future.get(remainingNanos, TimeUnit.NANOSECONDS);
+                if (batchValues == null) {
+                    continue;
+                }
+                int copyLen = Math.min(bf.size, batchValues.size());
+                for (int i = 0; i < copyLen; i++) {
+                    allValues.set(bf.startIndex + i, batchValues.get(i));
+                }
+            } catch (TimeoutException e) {
+                bf.future.cancel(true);
+                log.warn("Batch mGet timeout, startIndex={}, size={}", bf.startIndex, bf.size);
+            } catch (Exception e) {
+                log.error("Batch mGet failed, startIndex={}, size={}", bf.startIndex, bf.size, e);
+            }
+        }
 
+        // 4. 解压缩
+        List<String> values = CommonCollectionUtils.toList(allValues, CompressionUtil::snappyDecompress);
+
+        // 5. 构建响应
         MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
         builder.setResult(Result.newBuilder().setCode(1));
-        for (int i = 0; i < request.getFeatureKeyCount(); i++) {
+        for (int i = 0; i < keyCount; i++) {
             builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
                     Strings.nullToEmpty(values.get(i)));
         }
-        return builder.build();
+        MultiGetFeatureResponse build = builder.build();
+        log.info("multiGetFeature, cost={}ms", System.currentTimeMillis() - startTime);
+        return build;
+    }
+
+    private static final class BatchFuture {
+        private final int startIndex;
+        private final int size;
+        private final Future<List<String>> future;
+
+        private BatchFuture(int startIndex, int size, Future<List<String>> future) {
+            this.startIndex = startIndex;
+            this.size = size;
+            this.future = future;
+        }
     }
 
     // Note:写入和读取的key生成规则应保持一致
     private String redisKey(FeatureKeyProto fk) {
-
-        Optional<DTSConfig> optional = newDtsConfigs.stream()
-                .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
-                .findFirst();
-        if (!optional.isPresent()) {
+        // 使用缓存查找配置,O(1) 复杂度
+        // 缓存在启动时初始化,配置变更时通过 Apollo 监听器更新
+        DTSConfig config = dtsConfigCache.get(fk.getTableName());
+        if (config == null) {
             log.error("table {} not config", fk.getTableName());
             return "";
         }
-        DTSConfig config = optional.get();
 
         // Note:写入和读取的key生成规则应保持一致
         List<String> fields = config.getRedis().getKey();

+ 7 - 7
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/UserAndVideoFeatureService.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.recommend.feature.service;
 
 import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.recommend.feature.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.feature.model.feature.GetUserFeatureInfo;
 import com.tzld.piaoquan.recommend.feature.model.feature.GetVideoFeatureInfo;
 import org.slf4j.Logger;
@@ -22,9 +23,6 @@ import java.util.stream.Collectors;
 @Service
 public class UserAndVideoFeatureService {
 
-    ExecutorService executorService = new ThreadPoolExecutor(16, 16, 0L,
-            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
-
     private final static Logger log = LoggerFactory.getLogger(UserAndVideoFeatureService.class);
     private final static String USER_FEATURE_KEY = "recommend.feature.user.recsys.info.{mid}";
 
@@ -68,11 +66,12 @@ public class UserAndVideoFeatureService {
             });
         }
 
-        // 并发执行字段转换操作
+        // 并发执行字段转换操作,使用统一的动态线程池
         try {
-            executorService.invokeAll(callableList);
+            ThreadPoolFactory.defaultPool().invokeAll(callableList);
         } catch (InterruptedException e) {
             log.error("queryUserFeature error, mids:{}", mids, e);
+            Thread.currentThread().interrupt();
         }
 
         return result;
@@ -113,11 +112,12 @@ public class UserAndVideoFeatureService {
             });
         }
 
-        // 并发执行字段转换操作
+        // 并发执行字段转换操作,使用统一的动态线程池
         try {
-            executorService.invokeAll(callableList);
+            ThreadPoolFactory.defaultPool().invokeAll(callableList);
         } catch (InterruptedException e) {
             log.error("queryVideoFeature error, videoIds:{}", videoIds, e);
+            Thread.currentThread().interrupt();
         }
 
         return result;

+ 5 - 1
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/JSONUtils.java

@@ -9,13 +9,17 @@ import org.apache.commons.lang3.StringUtils;
 @Slf4j
 public class JSONUtils {
 
+    /**
+     * Gson 单例,线程安全,避免重复创建
+     */
+    private static final Gson GSON = new Gson();
 
     public static String toJson(Object obj) {
         if (obj == null) {
             return "";
         }
         try {
-            return new Gson().toJson(obj);
+            return GSON.toJson(obj);
         } catch (Exception e) {
             log.error("toJson exception", e);
             return "";

+ 34 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/web/ThreadPoolMonitorController.java

@@ -0,0 +1,34 @@
+package com.tzld.piaoquan.recommend.feature.web;
+
+import com.tzld.piaoquan.recommend.feature.common.DynamicThreadPoolManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+/**
+ * 线程池监控接口
+ *
+ * @author ljd
+ */
+@RestController
+@RequestMapping("/threadpool")
+@Slf4j
+public class ThreadPoolMonitorController {
+
+    @Autowired
+    private DynamicThreadPoolManager dynamicThreadPoolManager;
+
+    /**
+     * 获取所有线程池状态
+     * 访问: GET /threadpool/stats
+     */
+    @GetMapping("/stats")
+    public Map<String, DynamicThreadPoolManager.ThreadPoolStats> getThreadPoolStats() {
+        return dynamicThreadPoolManager.getThreadPoolStats();
+    }
+
+}

+ 35 - 10
recommend-feature-service/src/main/resources/application-prod.yml

@@ -1,6 +1,30 @@
 server:
   port: 8080
 
+# gRPC 服务端配置
+grpc:
+  server:
+    # KeepAlive 配置(与客户端配合使用)
+    keep-alive-time: 20s
+    keep-alive-timeout: 5s
+    permit-keep-alive-without-calls: true
+    permit-keep-alive-time: 10s
+    # 连接生命周期管理(服务端配置)
+    max-connection-idle: 300s        # 5分钟空闲后关闭连接
+    max-connection-age: 3600s        # 1小时后强制关闭连接
+    max-connection-age-grace: 5s     # 关闭前宽限期,让进行中的请求完成
+
+# 线程池监控配置
+thread:
+  pool:
+    monitor:
+      enabled: true
+      interval: 30
+      thread:
+        threshold: 0.8
+      queue:
+        threshold: 0.8
+
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
@@ -17,24 +41,25 @@ spring:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
-    timeout: 1000
+    timeout: 5000  # 增加到5秒,防止批量查询超时
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        # 增大连接池容量,防止高并发时连接等待
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
   tair:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
-    timeout: 1000
+    timeout: 5000  # 增加到5秒,防止批量查询超时
     lettuce:
       pool:
-        max-active: 8
-        max-wait: -1
-        max-idle: 8
-        min-idle: 0
+        max-active: 64
+        max-wait: 2000
+        max-idle: 32
+        min-idle: 8
 
 apollo:
   meta: http://apolloconfig-internal.piaoquantv.com

+ 15 - 0
recommend-feature-service/src/main/resources/application.yml

@@ -14,3 +14,18 @@ apollo:
     enabled: true
     namespaces: application
   cacheDir: /datalog/apollo-cache-dir
+
+# Actuator 监控配置
+management:
+  endpoints:
+    web:
+      exposure:
+        include: prometheus,health,info,metrics
+  endpoint:
+    prometheus:
+      enabled: true
+    health:
+      show-details: when_authorized
+  metrics:
+    tags:
+      application: ${spring.application.name}