jiandong.liu 4 روز پیش
والد
کامیت
b0e1534db0

+ 43 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolConfig.java

@@ -0,0 +1,43 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import lombok.Data;
+
+/**
+ * 动态线程池配置
+ *
+ * @author ljd
+ */
+@Data
+public class DynamicThreadPoolConfig {
+
+    /**
+     * 线程池名称
+     */
+    private String poolName;
+
+    /**
+     * 核心线程数(默认值针对 2 核 CPU 优化)
+     */
+    private int corePoolSize = 8;
+
+    /**
+     * 最大线程数
+     */
+    private int maxPoolSize = 16;
+
+    /**
+     * 队列容量
+     */
+    private int queueCapacity = 3000;
+
+    /**
+     * 空闲线程存活时间(秒)
+     */
+    private long keepAliveSeconds = 60;
+
+    /**
+     * 拒绝策略: ABORT, CALLER_RUNS, DISCARD, DISCARD_OLDEST
+     */
+    private String rejectedPolicy = "CALLER_RUNS";
+
+}

+ 326 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/DynamicThreadPoolManager.java

@@ -0,0 +1,326 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import com.alibaba.fastjson.JSON;
+import com.ctrip.framework.apollo.model.ConfigChange;
+import com.ctrip.framework.apollo.model.ConfigChangeEvent;
+import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 动态线程池管理器
+ * 支持通过 Apollo 配置动态调整线程池参数
+ *
+ * @author ljd
+ */
+@Slf4j
+@Component
+public class DynamicThreadPoolManager {
+
+    /**
+     * 线程池注册表
+     */
+    private final Map<String, ThreadPoolExecutor> threadPoolRegistry = new ConcurrentHashMap<>();
+
+    /**
+     * 默认线程池名称
+     */
+    public static final String DEFAULT_POOL = "DEFAULT";
+
+    /**
+     * 特征查询线程池名称
+     */
+    public static final String MULTI_GET_FEATURE_POOL = "MULTI_GET_FEATURE";
+
+    /**
+     * Apollo 配置的线程池参数
+     * 配置格式:
+     * [
+     * {"poolName":"DEFAULT","corePoolSize":32,"maxPoolSize":64,"queueCapacity":3000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
+     * {"poolName":"MULTI_GET_FEATURE","corePoolSize":64,"maxPoolSize":128,"queueCapacity":5000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
+     * ]
+     */
+    @ApolloJsonValue("${thread.pool.configs:[]}")
+    private List<DynamicThreadPoolConfig> threadPoolConfigs;
+
+    /**
+     * 默认配置
+     */
+    private static final DynamicThreadPoolConfig DEFAULT_CONFIG;
+    private static final DynamicThreadPoolConfig MULTI_GET_FEATURE_CONFIG;
+
+    static {
+        // 针对 2 核 CPU 优化的默认配置
+        DEFAULT_CONFIG = new DynamicThreadPoolConfig();
+        DEFAULT_CONFIG.setPoolName(DEFAULT_POOL);
+        DEFAULT_CONFIG.setCorePoolSize(8);
+        DEFAULT_CONFIG.setMaxPoolSize(16);
+        DEFAULT_CONFIG.setQueueCapacity(3000);
+        DEFAULT_CONFIG.setKeepAliveSeconds(60);
+        DEFAULT_CONFIG.setRejectedPolicy("CALLER_RUNS");
+
+        // IO 密集型任务(Redis 查询),可以多一些线程
+        MULTI_GET_FEATURE_CONFIG = new DynamicThreadPoolConfig();
+        MULTI_GET_FEATURE_CONFIG.setPoolName(MULTI_GET_FEATURE_POOL);
+        MULTI_GET_FEATURE_CONFIG.setCorePoolSize(16);
+        MULTI_GET_FEATURE_CONFIG.setMaxPoolSize(32);
+        MULTI_GET_FEATURE_CONFIG.setQueueCapacity(5000);
+        MULTI_GET_FEATURE_CONFIG.setKeepAliveSeconds(60);
+        MULTI_GET_FEATURE_CONFIG.setRejectedPolicy("CALLER_RUNS");
+    }
+
+    @PostConstruct
+    public void init() {
+        // 初始化默认线程池
+        initThreadPool(DEFAULT_CONFIG);
+        initThreadPool(MULTI_GET_FEATURE_CONFIG);
+
+        // 根据 Apollo 配置覆盖
+        if (threadPoolConfigs != null && !threadPoolConfigs.isEmpty()) {
+            for (DynamicThreadPoolConfig config : threadPoolConfigs) {
+                updateThreadPool(config);
+            }
+        }
+
+        log.info("DynamicThreadPoolManager initialized, pools: {}", threadPoolRegistry.keySet());
+    }
+
+    /**
+     * 初始化线程池
+     */
+    private void initThreadPool(DynamicThreadPoolConfig config) {
+        ThreadPoolExecutor executor = createThreadPoolExecutor(config);
+        threadPoolRegistry.put(config.getPoolName(), executor);
+        log.info("Thread pool [{}] initialized: coreSize={}, maxSize={}, queueCapacity={}",
+                config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize(), config.getQueueCapacity());
+    }
+
+    /**
+     * 创建线程池
+     */
+    private ThreadPoolExecutor createThreadPoolExecutor(DynamicThreadPoolConfig config) {
+        return new ThreadPoolExecutor(
+                config.getCorePoolSize(),
+                config.getMaxPoolSize(),
+                config.getKeepAliveSeconds(),
+                TimeUnit.SECONDS,
+                new ResizableLinkedBlockingQueue<>(config.getQueueCapacity()),
+                new ThreadFactoryBuilder().setNameFormat(config.getPoolName() + "-%d").build(),
+                getRejectedExecutionHandler(config.getRejectedPolicy())
+        );
+    }
+
+    /**
+     * 获取拒绝策略
+     */
+    private RejectedExecutionHandler getRejectedExecutionHandler(String policy) {
+        switch (policy.toUpperCase()) {
+            case "ABORT":
+                return new ThreadPoolExecutor.AbortPolicy();
+            case "DISCARD":
+                return new ThreadPoolExecutor.DiscardPolicy();
+            case "DISCARD_OLDEST":
+                return new ThreadPoolExecutor.DiscardOldestPolicy();
+            case "CALLER_RUNS":
+            default:
+                return new ThreadPoolExecutor.CallerRunsPolicy();
+        }
+    }
+
+    /**
+     * 监听 Apollo 配置变更
+     * 注意:手动解析新配置值,避免与 @ApolloJsonValue 自动注入产生竞态条件
+     */
+    @ApolloConfigChangeListener
+    public void onConfigChange(ConfigChangeEvent changeEvent) {
+        if (changeEvent.isChanged("thread.pool.configs")) {
+            ConfigChange change = changeEvent.getChange("thread.pool.configs");
+            log.info("Thread pool config changed, old={}, new={}", change.getOldValue(), change.getNewValue());
+
+            // 手动解析新配置值,避免竞态条件
+            String newValue = change.getNewValue();
+            if (StringUtils.isNotBlank(newValue)) {
+                try {
+                    List<DynamicThreadPoolConfig> newConfigs = JSON.parseArray(newValue, DynamicThreadPoolConfig.class);
+                    if (newConfigs != null) {
+                        for (DynamicThreadPoolConfig config : newConfigs) {
+                            updateThreadPool(config);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Failed to parse thread pool config: {}", newValue, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * 动态更新线程池参数
+     */
+    public void updateThreadPool(DynamicThreadPoolConfig config) {
+        // 参数验证
+        if (config.getCorePoolSize() <= 0 || config.getMaxPoolSize() <= 0) {
+            log.error("Invalid pool size for [{}]: corePoolSize={}, maxPoolSize={} must be positive",
+                    config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
+            return;
+        }
+        if (config.getCorePoolSize() > config.getMaxPoolSize()) {
+            log.error("Invalid pool size for [{}]: corePoolSize={} > maxPoolSize={}",
+                    config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
+            return;
+        }
+        if (config.getQueueCapacity() <= 0) {
+            log.error("Invalid queueCapacity for [{}]: {} must be positive",
+                    config.getPoolName(), config.getQueueCapacity());
+            return;
+        }
+
+        ThreadPoolExecutor executor = threadPoolRegistry.get(config.getPoolName());
+        if (executor == null) {
+            log.warn("Thread pool [{}] not found, creating new one", config.getPoolName());
+            initThreadPool(config);
+            return;
+        }
+
+        // 动态调整参数
+        int oldCoreSize = executor.getCorePoolSize();
+        int oldMaxSize = executor.getMaximumPoolSize();
+
+        // 注意:调整顺序很重要,避免 coreSize > maxSize 的情况
+        if (config.getCorePoolSize() > executor.getMaximumPoolSize()) {
+            executor.setMaximumPoolSize(config.getMaxPoolSize());
+            executor.setCorePoolSize(config.getCorePoolSize());
+        } else {
+            executor.setCorePoolSize(config.getCorePoolSize());
+            executor.setMaximumPoolSize(config.getMaxPoolSize());
+        }
+
+        executor.setKeepAliveTime(config.getKeepAliveSeconds(), TimeUnit.SECONDS);
+
+        // 更新拒绝策略
+        RejectedExecutionHandler newHandler = getRejectedExecutionHandler(config.getRejectedPolicy());
+        executor.setRejectedExecutionHandler(newHandler);
+
+        // 更新队列容量(如果是可调整大小的队列)
+        if (executor.getQueue() instanceof ResizableLinkedBlockingQueue) {
+            ((ResizableLinkedBlockingQueue<?>) executor.getQueue()).setCapacity(config.getQueueCapacity());
+        }
+
+        log.info("Thread pool [{}] updated: coreSize {} -> {}, maxSize {} -> {}, queueCapacity={}, rejectedPolicy={}",
+                config.getPoolName(), oldCoreSize, config.getCorePoolSize(),
+                oldMaxSize, config.getMaxPoolSize(), config.getQueueCapacity(), config.getRejectedPolicy());
+    }
+
+    /**
+     * 获取线程池
+     */
+    public ExecutorService getThreadPool(String poolName) {
+        ExecutorService executor = threadPoolRegistry.get(poolName);
+        if (executor == null) {
+            log.warn("Thread pool [{}] not found, using DEFAULT pool", poolName);
+            return threadPoolRegistry.get(DEFAULT_POOL);
+        }
+        return executor;
+    }
+
+    /**
+     * 获取默认线程池
+     */
+    public ExecutorService getDefaultPool() {
+        return getThreadPool(DEFAULT_POOL);
+    }
+
+    /**
+     * 获取特征查询线程池
+     */
+    public ExecutorService getMultiGetFeaturePool() {
+        return getThreadPool(MULTI_GET_FEATURE_POOL);
+    }
+
+    /**
+     * 获取线程池状态信息
+     */
+    public Map<String, ThreadPoolStats> getThreadPoolStats() {
+        Map<String, ThreadPoolStats> statsMap = new ConcurrentHashMap<>();
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            ThreadPoolExecutor executor = entry.getValue();
+            ThreadPoolStats stats = new ThreadPoolStats();
+            stats.setPoolName(entry.getKey());
+            stats.setCorePoolSize(executor.getCorePoolSize());
+            stats.setMaxPoolSize(executor.getMaximumPoolSize());
+            stats.setActiveCount(executor.getActiveCount());
+            stats.setPoolSize(executor.getPoolSize());
+            stats.setQueueSize(executor.getQueue().size());
+            stats.setQueueCapacity(getQueueCapacity(entry.getKey()));
+            stats.setCompletedTaskCount(executor.getCompletedTaskCount());
+            stats.setTaskCount(executor.getTaskCount());
+            statsMap.put(entry.getKey(), stats);
+        }
+        return statsMap;
+    }
+
+    /**
+     * 获取指定线程池的队列容量
+     */
+    private int getQueueCapacity(String poolName) {
+        if (threadPoolConfigs != null) {
+            for (DynamicThreadPoolConfig config : threadPoolConfigs) {
+                if (config.getPoolName().equals(poolName)) {
+                    return config.getQueueCapacity();
+                }
+            }
+        }
+        // 返回默认值
+        if (DEFAULT_POOL.equals(poolName)) {
+            return DEFAULT_CONFIG.getQueueCapacity();
+        } else if (MULTI_GET_FEATURE_POOL.equals(poolName)) {
+            return MULTI_GET_FEATURE_CONFIG.getQueueCapacity();
+        }
+        return 3000;
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("Shutting down thread pools...");
+        for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
+            ThreadPoolExecutor executor = entry.getValue();
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+            log.info("Thread pool [{}] shutdown completed", entry.getKey());
+        }
+    }
+
+    /**
+     * 线程池状态信息
+     */
+    @Data
+    public static class ThreadPoolStats {
+        private String poolName;
+        private int corePoolSize;
+        private int maxPoolSize;
+        private int activeCount;
+        private int poolSize;
+        private int queueSize;
+        private int queueCapacity;
+        private long completedTaskCount;
+        private long taskCount;
+    }
+
+}

+ 58 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ResizableLinkedBlockingQueue.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.recommend.feature.common;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * 可动态调整容量的阻塞队列
+ * 继承 LinkedBlockingQueue,支持运行时修改队列容量
+ *
+ * <p>注意:容量限制是 "尽力而为" 的软限制。由于 {@link #offer(Object)} 方法中的
+ * {@code size() >= capacity} 检查与 {@code super.offer(e)} 调用不是原子操作,
+ * 在高并发场景下队列实际大小可能略超过设定容量。对于动态线程池的使用场景,这种偏差是可接受的。</p>
+ *
+ * @author ljd
+ */
+public class ResizableLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 动态容量
+     */
+    private volatile int capacity;
+
+    public ResizableLinkedBlockingQueue(int capacity) {
+        super(Integer.MAX_VALUE);
+        this.capacity = capacity;
+    }
+
+    /**
+     * 设置新的容量
+     * 注意:如果当前队列大小已超过新容量,不会删除已有元素,但会阻止新元素入队
+     */
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    /**
+     * 获取当前容量
+     */
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public boolean offer(E e) {
+        // 如果当前队列大小已达到容量限制,拒绝入队
+        if (size() >= capacity) {
+            return false;
+        }
+        return super.offer(e);
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Math.max(0, capacity - size());
+    }
+
+}

+ 22 - 21
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ThreadPoolFactory.java

@@ -1,37 +1,38 @@
 package com.tzld.piaoquan.recommend.feature.common;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
+ * 线程池工厂
+ * 委托给 DynamicThreadPoolManager 管理,支持动态配置
+ *
  * @author dyp
  */
-public final class ThreadPoolFactory {
-    private final static ExecutorService DEFAULT = new CommonThreadPoolExecutor(
-            128,
-            128,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
-            new ThreadFactoryBuilder().setNameFormat("DEFAULT-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
-    private final static ExecutorService MULTI_GET_FEATURE = new CommonThreadPoolExecutor(
-            256,
-            256,
-            0L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(1000),
-            new ThreadFactoryBuilder().setNameFormat("MultiGetFeaturePool-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+@Component
+public class ThreadPoolFactory {
+
+    private static DynamicThreadPoolManager dynamicThreadPoolManager;
+
+    @Autowired
+    public void setDynamicThreadPoolManager(DynamicThreadPoolManager manager) {
+        ThreadPoolFactory.dynamicThreadPoolManager = manager;
+    }
 
     public static ExecutorService defaultPool() {
-        return DEFAULT;
+        if (dynamicThreadPoolManager == null) {
+            throw new IllegalStateException("DynamicThreadPoolManager not initialized");
+        }
+        return dynamicThreadPoolManager.getDefaultPool();
     }
 
     public static ExecutorService multiGetFeaturePool() {
-        return MULTI_GET_FEATURE;
+        if (dynamicThreadPoolManager == null) {
+            throw new IllegalStateException("DynamicThreadPoolManager not initialized");
+        }
+        return dynamicThreadPoolManager.getMultiGetFeaturePool();
     }
 
 }

+ 7 - 7
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/UserAndVideoFeatureService.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.recommend.feature.service;
 
 import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.recommend.feature.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.feature.model.feature.GetUserFeatureInfo;
 import com.tzld.piaoquan.recommend.feature.model.feature.GetVideoFeatureInfo;
 import org.slf4j.Logger;
@@ -22,9 +23,6 @@ import java.util.stream.Collectors;
 @Service
 public class UserAndVideoFeatureService {
 
-    ExecutorService executorService = new ThreadPoolExecutor(16, 16, 0L,
-            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
-
     private final static Logger log = LoggerFactory.getLogger(UserAndVideoFeatureService.class);
     private final static String USER_FEATURE_KEY = "recommend.feature.user.recsys.info.{mid}";
 
@@ -68,11 +66,12 @@ public class UserAndVideoFeatureService {
             });
         }
 
-        // 并发执行字段转换操作
+        // 并发执行字段转换操作,使用统一的动态线程池
         try {
-            executorService.invokeAll(callableList);
+            ThreadPoolFactory.defaultPool().invokeAll(callableList);
         } catch (InterruptedException e) {
             log.error("queryUserFeature error, mids:{}", mids, e);
+            Thread.currentThread().interrupt();
         }
 
         return result;
@@ -113,11 +112,12 @@ public class UserAndVideoFeatureService {
             });
         }
 
-        // 并发执行字段转换操作
+        // 并发执行字段转换操作,使用统一的动态线程池
         try {
-            executorService.invokeAll(callableList);
+            ThreadPoolFactory.defaultPool().invokeAll(callableList);
         } catch (InterruptedException e) {
             log.error("queryVideoFeature error, videoIds:{}", videoIds, e);
+            Thread.currentThread().interrupt();
         }
 
         return result;

+ 34 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/web/ThreadPoolMonitorController.java

@@ -0,0 +1,34 @@
+package com.tzld.piaoquan.recommend.feature.web;
+
+import com.tzld.piaoquan.recommend.feature.common.DynamicThreadPoolManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+/**
+ * 线程池监控接口
+ *
+ * @author ljd
+ */
+@RestController
+@RequestMapping("/threadpool")
+@Slf4j
+public class ThreadPoolMonitorController {
+
+    @Autowired
+    private DynamicThreadPoolManager dynamicThreadPoolManager;
+
+    /**
+     * 获取所有线程池状态
+     * 访问: GET /threadpool/stats
+     */
+    @GetMapping("/stats")
+    public Map<String, DynamicThreadPoolManager.ThreadPoolStats> getThreadPoolStats() {
+        return dynamicThreadPoolManager.getThreadPoolStats();
+    }
+
+}