|
@@ -0,0 +1,326 @@
|
|
|
|
|
+package com.tzld.piaoquan.recommend.feature.common;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.ctrip.framework.apollo.model.ConfigChange;
|
|
|
|
|
+import com.ctrip.framework.apollo.model.ConfigChangeEvent;
|
|
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
|
|
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
+import lombok.Data;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 动态线程池管理器
|
|
|
|
|
+ * 支持通过 Apollo 配置动态调整线程池参数
|
|
|
|
|
+ *
|
|
|
|
|
+ * @author ljd
|
|
|
|
|
+ */
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+public class DynamicThreadPoolManager {
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 线程池注册表
|
|
|
|
|
+ */
|
|
|
|
|
+ private final Map<String, ThreadPoolExecutor> threadPoolRegistry = new ConcurrentHashMap<>();
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 默认线程池名称
|
|
|
|
|
+ */
|
|
|
|
|
+ public static final String DEFAULT_POOL = "DEFAULT";
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 特征查询线程池名称
|
|
|
|
|
+ */
|
|
|
|
|
+ public static final String MULTI_GET_FEATURE_POOL = "MULTI_GET_FEATURE";
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Apollo 配置的线程池参数
|
|
|
|
|
+ * 配置格式:
|
|
|
|
|
+ * [
|
|
|
|
|
+ * {"poolName":"DEFAULT","corePoolSize":32,"maxPoolSize":64,"queueCapacity":3000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"},
|
|
|
|
|
+ * {"poolName":"MULTI_GET_FEATURE","corePoolSize":64,"maxPoolSize":128,"queueCapacity":5000,"keepAliveSeconds":60,"rejectedPolicy":"CALLER_RUNS"}
|
|
|
|
|
+ * ]
|
|
|
|
|
+ */
|
|
|
|
|
+ @ApolloJsonValue("${thread.pool.configs:[]}")
|
|
|
|
|
+ private List<DynamicThreadPoolConfig> threadPoolConfigs;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 默认配置
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final DynamicThreadPoolConfig DEFAULT_CONFIG;
|
|
|
|
|
+ private static final DynamicThreadPoolConfig MULTI_GET_FEATURE_CONFIG;
|
|
|
|
|
+
|
|
|
|
|
+ static {
|
|
|
|
|
+ // 针对 2 核 CPU 优化的默认配置
|
|
|
|
|
+ DEFAULT_CONFIG = new DynamicThreadPoolConfig();
|
|
|
|
|
+ DEFAULT_CONFIG.setPoolName(DEFAULT_POOL);
|
|
|
|
|
+ DEFAULT_CONFIG.setCorePoolSize(8);
|
|
|
|
|
+ DEFAULT_CONFIG.setMaxPoolSize(16);
|
|
|
|
|
+ DEFAULT_CONFIG.setQueueCapacity(3000);
|
|
|
|
|
+ DEFAULT_CONFIG.setKeepAliveSeconds(60);
|
|
|
|
|
+ DEFAULT_CONFIG.setRejectedPolicy("CALLER_RUNS");
|
|
|
|
|
+
|
|
|
|
|
+ // IO 密集型任务(Redis 查询),可以多一些线程
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG = new DynamicThreadPoolConfig();
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setPoolName(MULTI_GET_FEATURE_POOL);
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setCorePoolSize(16);
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setMaxPoolSize(32);
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setQueueCapacity(5000);
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setKeepAliveSeconds(60);
|
|
|
|
|
+ MULTI_GET_FEATURE_CONFIG.setRejectedPolicy("CALLER_RUNS");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @PostConstruct
|
|
|
|
|
+ public void init() {
|
|
|
|
|
+ // 初始化默认线程池
|
|
|
|
|
+ initThreadPool(DEFAULT_CONFIG);
|
|
|
|
|
+ initThreadPool(MULTI_GET_FEATURE_CONFIG);
|
|
|
|
|
+
|
|
|
|
|
+ // 根据 Apollo 配置覆盖
|
|
|
|
|
+ if (threadPoolConfigs != null && !threadPoolConfigs.isEmpty()) {
|
|
|
|
|
+ for (DynamicThreadPoolConfig config : threadPoolConfigs) {
|
|
|
|
|
+ updateThreadPool(config);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("DynamicThreadPoolManager initialized, pools: {}", threadPoolRegistry.keySet());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 初始化线程池
|
|
|
|
|
+ */
|
|
|
|
|
+ private void initThreadPool(DynamicThreadPoolConfig config) {
|
|
|
|
|
+ ThreadPoolExecutor executor = createThreadPoolExecutor(config);
|
|
|
|
|
+ threadPoolRegistry.put(config.getPoolName(), executor);
|
|
|
|
|
+ log.info("Thread pool [{}] initialized: coreSize={}, maxSize={}, queueCapacity={}",
|
|
|
|
|
+ config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize(), config.getQueueCapacity());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 创建线程池
|
|
|
|
|
+ */
|
|
|
|
|
+ private ThreadPoolExecutor createThreadPoolExecutor(DynamicThreadPoolConfig config) {
|
|
|
|
|
+ return new ThreadPoolExecutor(
|
|
|
|
|
+ config.getCorePoolSize(),
|
|
|
|
|
+ config.getMaxPoolSize(),
|
|
|
|
|
+ config.getKeepAliveSeconds(),
|
|
|
|
|
+ TimeUnit.SECONDS,
|
|
|
|
|
+ new ResizableLinkedBlockingQueue<>(config.getQueueCapacity()),
|
|
|
|
|
+ new ThreadFactoryBuilder().setNameFormat(config.getPoolName() + "-%d").build(),
|
|
|
|
|
+ getRejectedExecutionHandler(config.getRejectedPolicy())
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取拒绝策略
|
|
|
|
|
+ */
|
|
|
|
|
+ private RejectedExecutionHandler getRejectedExecutionHandler(String policy) {
|
|
|
|
|
+ switch (policy.toUpperCase()) {
|
|
|
|
|
+ case "ABORT":
|
|
|
|
|
+ return new ThreadPoolExecutor.AbortPolicy();
|
|
|
|
|
+ case "DISCARD":
|
|
|
|
|
+ return new ThreadPoolExecutor.DiscardPolicy();
|
|
|
|
|
+ case "DISCARD_OLDEST":
|
|
|
|
|
+ return new ThreadPoolExecutor.DiscardOldestPolicy();
|
|
|
|
|
+ case "CALLER_RUNS":
|
|
|
|
|
+ default:
|
|
|
|
|
+ return new ThreadPoolExecutor.CallerRunsPolicy();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 监听 Apollo 配置变更
|
|
|
|
|
+ * 注意:手动解析新配置值,避免与 @ApolloJsonValue 自动注入产生竞态条件
|
|
|
|
|
+ */
|
|
|
|
|
+ @ApolloConfigChangeListener
|
|
|
|
|
+ public void onConfigChange(ConfigChangeEvent changeEvent) {
|
|
|
|
|
+ if (changeEvent.isChanged("thread.pool.configs")) {
|
|
|
|
|
+ ConfigChange change = changeEvent.getChange("thread.pool.configs");
|
|
|
|
|
+ log.info("Thread pool config changed, old={}, new={}", change.getOldValue(), change.getNewValue());
|
|
|
|
|
+
|
|
|
|
|
+ // 手动解析新配置值,避免竞态条件
|
|
|
|
|
+ String newValue = change.getNewValue();
|
|
|
|
|
+ if (StringUtils.isNotBlank(newValue)) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<DynamicThreadPoolConfig> newConfigs = JSON.parseArray(newValue, DynamicThreadPoolConfig.class);
|
|
|
|
|
+ if (newConfigs != null) {
|
|
|
|
|
+ for (DynamicThreadPoolConfig config : newConfigs) {
|
|
|
|
|
+ updateThreadPool(config);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("Failed to parse thread pool config: {}", newValue, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 动态更新线程池参数
|
|
|
|
|
+ */
|
|
|
|
|
+ public void updateThreadPool(DynamicThreadPoolConfig config) {
|
|
|
|
|
+ // 参数验证
|
|
|
|
|
+ if (config.getCorePoolSize() <= 0 || config.getMaxPoolSize() <= 0) {
|
|
|
|
|
+ log.error("Invalid pool size for [{}]: corePoolSize={}, maxPoolSize={} must be positive",
|
|
|
|
|
+ config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (config.getCorePoolSize() > config.getMaxPoolSize()) {
|
|
|
|
|
+ log.error("Invalid pool size for [{}]: corePoolSize={} > maxPoolSize={}",
|
|
|
|
|
+ config.getPoolName(), config.getCorePoolSize(), config.getMaxPoolSize());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (config.getQueueCapacity() <= 0) {
|
|
|
|
|
+ log.error("Invalid queueCapacity for [{}]: {} must be positive",
|
|
|
|
|
+ config.getPoolName(), config.getQueueCapacity());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ThreadPoolExecutor executor = threadPoolRegistry.get(config.getPoolName());
|
|
|
|
|
+ if (executor == null) {
|
|
|
|
|
+ log.warn("Thread pool [{}] not found, creating new one", config.getPoolName());
|
|
|
|
|
+ initThreadPool(config);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 动态调整参数
|
|
|
|
|
+ int oldCoreSize = executor.getCorePoolSize();
|
|
|
|
|
+ int oldMaxSize = executor.getMaximumPoolSize();
|
|
|
|
|
+
|
|
|
|
|
+ // 注意:调整顺序很重要,避免 coreSize > maxSize 的情况
|
|
|
|
|
+ if (config.getCorePoolSize() > executor.getMaximumPoolSize()) {
|
|
|
|
|
+ executor.setMaximumPoolSize(config.getMaxPoolSize());
|
|
|
|
|
+ executor.setCorePoolSize(config.getCorePoolSize());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ executor.setCorePoolSize(config.getCorePoolSize());
|
|
|
|
|
+ executor.setMaximumPoolSize(config.getMaxPoolSize());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ executor.setKeepAliveTime(config.getKeepAliveSeconds(), TimeUnit.SECONDS);
|
|
|
|
|
+
|
|
|
|
|
+ // 更新拒绝策略
|
|
|
|
|
+ RejectedExecutionHandler newHandler = getRejectedExecutionHandler(config.getRejectedPolicy());
|
|
|
|
|
+ executor.setRejectedExecutionHandler(newHandler);
|
|
|
|
|
+
|
|
|
|
|
+ // 更新队列容量(如果是可调整大小的队列)
|
|
|
|
|
+ if (executor.getQueue() instanceof ResizableLinkedBlockingQueue) {
|
|
|
|
|
+ ((ResizableLinkedBlockingQueue<?>) executor.getQueue()).setCapacity(config.getQueueCapacity());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("Thread pool [{}] updated: coreSize {} -> {}, maxSize {} -> {}, queueCapacity={}, rejectedPolicy={}",
|
|
|
|
|
+ config.getPoolName(), oldCoreSize, config.getCorePoolSize(),
|
|
|
|
|
+ oldMaxSize, config.getMaxPoolSize(), config.getQueueCapacity(), config.getRejectedPolicy());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取线程池
|
|
|
|
|
+ */
|
|
|
|
|
+ public ExecutorService getThreadPool(String poolName) {
|
|
|
|
|
+ ExecutorService executor = threadPoolRegistry.get(poolName);
|
|
|
|
|
+ if (executor == null) {
|
|
|
|
|
+ log.warn("Thread pool [{}] not found, using DEFAULT pool", poolName);
|
|
|
|
|
+ return threadPoolRegistry.get(DEFAULT_POOL);
|
|
|
|
|
+ }
|
|
|
|
|
+ return executor;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取默认线程池
|
|
|
|
|
+ */
|
|
|
|
|
+ public ExecutorService getDefaultPool() {
|
|
|
|
|
+ return getThreadPool(DEFAULT_POOL);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取特征查询线程池
|
|
|
|
|
+ */
|
|
|
|
|
+ public ExecutorService getMultiGetFeaturePool() {
|
|
|
|
|
+ return getThreadPool(MULTI_GET_FEATURE_POOL);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取线程池状态信息
|
|
|
|
|
+ */
|
|
|
|
|
+ public Map<String, ThreadPoolStats> getThreadPoolStats() {
|
|
|
|
|
+ Map<String, ThreadPoolStats> statsMap = new ConcurrentHashMap<>();
|
|
|
|
|
+ for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
|
|
|
|
|
+ ThreadPoolExecutor executor = entry.getValue();
|
|
|
|
|
+ ThreadPoolStats stats = new ThreadPoolStats();
|
|
|
|
|
+ stats.setPoolName(entry.getKey());
|
|
|
|
|
+ stats.setCorePoolSize(executor.getCorePoolSize());
|
|
|
|
|
+ stats.setMaxPoolSize(executor.getMaximumPoolSize());
|
|
|
|
|
+ stats.setActiveCount(executor.getActiveCount());
|
|
|
|
|
+ stats.setPoolSize(executor.getPoolSize());
|
|
|
|
|
+ stats.setQueueSize(executor.getQueue().size());
|
|
|
|
|
+ stats.setQueueCapacity(getQueueCapacity(entry.getKey()));
|
|
|
|
|
+ stats.setCompletedTaskCount(executor.getCompletedTaskCount());
|
|
|
|
|
+ stats.setTaskCount(executor.getTaskCount());
|
|
|
|
|
+ statsMap.put(entry.getKey(), stats);
|
|
|
|
|
+ }
|
|
|
|
|
+ return statsMap;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取指定线程池的队列容量
|
|
|
|
|
+ */
|
|
|
|
|
+ private int getQueueCapacity(String poolName) {
|
|
|
|
|
+ if (threadPoolConfigs != null) {
|
|
|
|
|
+ for (DynamicThreadPoolConfig config : threadPoolConfigs) {
|
|
|
|
|
+ if (config.getPoolName().equals(poolName)) {
|
|
|
|
|
+ return config.getQueueCapacity();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 返回默认值
|
|
|
|
|
+ if (DEFAULT_POOL.equals(poolName)) {
|
|
|
|
|
+ return DEFAULT_CONFIG.getQueueCapacity();
|
|
|
|
|
+ } else if (MULTI_GET_FEATURE_POOL.equals(poolName)) {
|
|
|
|
|
+ return MULTI_GET_FEATURE_CONFIG.getQueueCapacity();
|
|
|
|
|
+ }
|
|
|
|
|
+ return 3000;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @PreDestroy
|
|
|
|
|
+ public void shutdown() {
|
|
|
|
|
+ log.info("Shutting down thread pools...");
|
|
|
|
|
+ for (Map.Entry<String, ThreadPoolExecutor> entry : threadPoolRegistry.entrySet()) {
|
|
|
|
|
+ ThreadPoolExecutor executor = entry.getValue();
|
|
|
|
|
+ executor.shutdown();
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("Thread pool [{}] shutdown completed", entry.getKey());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 线程池状态信息
|
|
|
|
|
+ */
|
|
|
|
|
+ @Data
|
|
|
|
|
+ public static class ThreadPoolStats {
|
|
|
|
|
+ private String poolName;
|
|
|
|
|
+ private int corePoolSize;
|
|
|
|
|
+ private int maxPoolSize;
|
|
|
|
|
+ private int activeCount;
|
|
|
|
|
+ private int poolSize;
|
|
|
|
|
+ private int queueSize;
|
|
|
|
|
+ private int queueCapacity;
|
|
|
|
|
+ private long completedTaskCount;
|
|
|
|
|
+ private long taskCount;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|