# DynamicThreadPool **Repository Path**: xiaoyis/dynamic-thread-pool ## Basic Information - **Project Name**: DynamicThreadPool - **Description**: Dynamic Thread Pool是一个基于Java实现的动态线程池管理系统,提供了线程池的动态创建、管理和参数调整功能。该系统支持通过URL方式动态配置线程池参数,并提供了Spring Boot集成方案,使开发者能够轻松地在项目中使用动态线程池功能。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-06 - **Last Updated**: 2025-11-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Dynamic Thread Pool 动态线程池管理系统 ## 项目介绍 Dynamic Thread Pool是一个基于Java实现的动态线程池管理系统,提供了线程池的动态创建、管理和参数调整功能。该系统支持通过URL方式动态配置线程池参数,并提供了Spring Boot集成方案,使开发者能够轻松地在项目中使用动态线程池功能。 ## 功能特点 - **动态参数调整**:支持在线调整线程池核心参数(核心线程数、最大线程数、存活时间等) - **多种队列支持**:支持多种阻塞队列类型的选择和容量配置 - **可配置拒绝策略**:支持多种线程池拒绝策略 - **Netty服务器集成**:内置Netty服务器,支持通过网络进行远程控制 - **Spring Boot自动配置**:提供Spring Boot Starter式的自动配置,易于集成 - **线程池集中管理**:通过ThreadPoolManager统一管理所有线程池实例 - **优雅关闭机制**:提供线程池的优雅关闭功能,避免资源泄漏 ## 架构设计 ### 核心组件 1. **ThreadPoolManager**:线程池管理器,采用单例模式,负责线程池的注册、获取、更新和销毁 2. **DynamicThreadPool**:动态线程池实现,扩展自ThreadPoolExecutor,支持参数动态调整 3. **DynamicThreadPoolFactory**:线程池工厂,负责根据配置创建和更新线程池 4. **DynamicThreadPoolExecutor**:Spring集成执行器,参考XXL-JOB的设计,在Spring容器初始化后自动启动 5. **ThreadPoolControlServer**:基于Netty的控制服务器,提供网络接口进行线程池控制 6. **DynamicThreadPoolAutoConfiguration**:Spring Boot自动配置类,实现自动装配 ### 技术栈 - Java 8+ - Spring Boot - Netty - Dubbo SPI(部分参考实现) ## 快速开始 ### 环境要求 - JDK 8 或更高版本 - Maven 3.5+ - Spring Boot 2.x(可选,用于集成) ### 安装与配置 1. **克隆项目** ```bash git clone https://github.com/your-repo/dynamic-threadpool.git cd dynamic-threadpool ``` 2. **编译打包** ```bash mvn clean package ``` 3. **配置Spring Boot应用** 在`application.properties`中添加以下配置: ```properties # 服务器端口 server.port=8080 # 动态线程池配置 dynamic.threadpool.port=8888 dynamic.threadpool.appname=dynamic-threadpool-example dynamic.threadpool.accesstoken=default_token dynamic.threadpool.admin.addresses=http://localhost:8888 # 日志配置 logging.level.root=INFO logging.level.com.dynamicthreadpool=DEBUG ``` ### 使用示例 #### 1. 通过Spring Boot启动 创建一个Spring Boot启动类: ```java @SpringBootApplication(scanBasePackages = {"com.dynamicthreadpool.spring"}) public class DynamicThreadPoolSpringBootApplication { public static void main(String[] args) { // 启动Spring Boot应用 ConfigurableApplicationContext context = SpringApplication.run(DynamicThreadPoolSpringBootApplication.class, args); // 获取ThreadPoolManager实例 ThreadPoolManager threadPoolManager = ThreadPoolManager.getInstance(); // 创建线程池并注册 // ...详见完整示例 } } ``` #### 2. 手动创建和管理线程池 ```java // 创建URL配置 URL url = new URL("dynamic", "localhost", 8888) .setParameter("corethreads", "10") .setParameter("maxthreads", "50") .setParameter("queues", "100") .setParameter("keepalive", "60000") .setParameter("queueType", "LinkedBlockingQueue") .setParameter("rejectPolicy", "AbortPolicy") .setParameter("threadname", "example-threadpool"); // 创建线程池 Executor executor = DynamicThreadPoolFactory.getExecutor(url); // 注册到管理器 ThreadPoolManager.getInstance().registerThreadPool("exampleThreadPool", executor); // 提交任务 for (int i = 0; i < 5; i++) { final int taskId = i; executor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 正在执行任务: " + taskId); // 任务逻辑 }); } // 动态更新线程池参数 URL updateUrl = new URL("dynamic", "localhost", 8888) .setParameter("corethreads", "20") .setParameter("maxthreads", "100"); ThreadPoolManager.getInstance().updateThreadPool("exampleThreadPool", updateUrl); ## 线程池控制 Dynamic Thread Pool提供了基于Netty的线程池远程控制系统,支持通过网络连接动态创建、查询、更新和关闭线程池。 ### 控制服务器 #### 启动控制服务器 线程池控制服务器基于Netty实现,默认监听8888端口: ```java // 创建并启动控制服务器 ThreadPoolControlServer server = new ThreadPoolControlServer(8888); server.start(); // 阻塞方法,启动服务器 ``` 或者通过Spring Boot应用自动启动(集成方式): ```bash # 运行Spring Boot示例应用 java -jar dynamicthreadpool-example.jar ``` ### 控制客户端 通过ThreadPoolControlClient可以方便地发送控制命令到服务器: ```java // 创建客户端连接到服务器 ThreadPoolControlClient client = new ThreadPoolControlClient("localhost", 8888); client.connect(); // 创建线程池请求 ThreadPoolControlMessage createRequest = new ThreadPoolControlMessage("create", "orderThreadPool"); Map params = new HashMap<>(); params.put("corethreads", "10"); params.put("maxthreads", "50"); params.put("queues", "1000"); params.put("queueType", "LinkedBlockingQueue"); params.put("rejectPolicy", "abort"); params.put("threadname", "order-pool"); createRequest.setParameters(params); // 发送请求并获取响应 ThreadPoolControlMessage response = client.sendRequest(createRequest); System.out.println("Status: " + response.getStatus()); System.out.println("Message: " + response.getMessage()); // 断开连接 client.disconnect(); ``` ### 支持的控制操作 | 操作类型 | 描述 | 参数 | |---------|------|------| | create | 创建新线程池 | poolId + 线程池参数 | | update | 更新线程池参数 | poolId + 要更新的参数 | | status | 获取线程池状态 | poolId | | shutdown | 关闭线程池 | poolId | ### 完整控制示例 ```java // 创建客户端,指定accessToken String accessToken = "your_access_token_here"; // 与服务器配置一致 ThreadPoolControlClient client = new ThreadPoolControlClient("localhost", 8888); client.setAccessToken(accessToken); client.connect(); // 1. 创建线程池 ThreadPoolControlMessage createMsg = new ThreadPoolControlMessage("create", "paymentPool"); Map createParams = new HashMap<>(); createParams.put("corethreads", "5"); createParams.put("maxthreads", "20"); createParams.put("queues", "100"); createParams.put("keepalive", "30000"); createParams.put("queueType", "ArrayBlockingQueue"); createParams.put("rejectPolicy", "discardoldest"); createParams.put("threadname", "payment-thread"); createMsg.setParameters(createParams); ThreadPoolControlMessage createResp = client.sendRequest(createMsg); // 2. 查询线程池状态 ThreadPoolControlMessage statusMsg = new ThreadPoolControlMessage("status", "paymentPool"); ThreadPoolControlMessage statusResp = client.sendRequest(statusMsg); System.out.println("线程池状态: " + statusResp.getPoolStatus()); // 3. 更新线程池参数 ThreadPoolControlMessage updateMsg = new ThreadPoolControlMessage("update", "paymentPool"); Map updateParams = new HashMap<>(); updateParams.put("corethreads", "10"); updateParams.put("maxthreads", "30"); updateMsg.setParameters(updateParams); ThreadPoolControlMessage updateResp = client.sendRequest(updateMsg); // 4. 关闭线程池 ThreadPoolControlMessage shutdownMsg = new ThreadPoolControlMessage("shutdown", "paymentPool"); ThreadPoolControlMessage shutdownResp = client.sendRequest(shutdownMsg); // 断开连接 client.disconnect(); ``` ### 线程池状态信息 当执行status操作时,返回的poolStatus包含以下线程池运行状态信息: | 字段名 | 类型 | 描述 | |-------|------|------| | activeCount | int | 活跃线程数 | | corePoolSize | int | 核心线程数 | | maximumPoolSize | int | 最大线程数 | | poolSize | int | 当前线程池大小 | | queueSize | int | 队列中等待的任务数 | | taskCount | long | 总任务数 | | completedTaskCount | long | 已完成的任务数 | | isShutdown | boolean | 是否已关闭 | | isTerminated | boolean | 是否已终止 | ``` ## 核心配置项 ### URL参数说明 创建或更新线程池时,可通过URL参数配置以下属性: | 参数名 | 说明 | 默认值 | 示例值 | |--------|------|--------|--------| | corethreads | 核心线程数 | 10 | 20 | | maxthreads | 最大线程数 | 20 | 50 | | keepalive | 线程存活时间(毫秒) | 60000 | 120000 | | queues | 队列容量 | 0 | 1000 | | queueType | 队列类型 | LinkedBlockingQueue | ArrayBlockingQueue, SynchronousQueue | | rejectPolicy | 拒绝策略 | AbortPolicy | DiscardPolicy, CallerRunsPolicy, DiscardOldestPolicy | | threadname | 线程名称前缀 | dynamic-thread- | example-thread- | ### Spring Boot配置项 | 配置项 | 说明 | 默认值 | 示例值 | |--------|------|--------|--------| | dynamic.threadpool.port | Netty服务器端口 | 8888 | 9999 | | dynamic.threadpool.appname | 应用名称 | dynamic-threadpool | my-application | | dynamic.threadpool.accesstoken | 访问令牌,用于身份验证(可选,不设置则不开启身份验证) | - | your_access_token_here | | dynamic.threadpool.admin.addresses | 管理地址 | - | http://localhost:8888 | ## 项目结构 ``` src/main/java/com/dynamicthreadpool/ ├── core/ # 核心实现 │ └── DynamicThreadPool.java # 动态线程池实现 ├── dubbo/ # Dubbo相关实现 │ └── DynamicThreadPoolFactory.java # 线程池工厂 ├── example/ # 示例代码 │ └── DynamicThreadPoolSpringBootApplication.java # Spring Boot示例应用 ├── manager/ # 管理相关 │ └── ThreadPoolManager.java # 线程池管理器 ├── netty/ # Netty相关 │ └── server/ # 服务器实现 │ └── ThreadPoolControlServer.java # 线程池控制服务器 ├── spring/ # Spring集成 │ ├── DynamicThreadPoolExecutor.java # 动态线程池执行器 │ ├── DynamicThreadPoolAutoConfiguration.java # 自动配置 │ └── DynamicThreadPoolProperties.java # 配置属性 └── spi/ # SPI相关 └── URL.java # URL实现 ``` ## 注意事项 1. **线程池命名**:为每个线程池指定有意义的名称,便于管理和监控 2. **参数设置**:根据业务需求合理设置线程池参数,避免资源浪费或任务堆积 3. **异常处理**:在线程池任务中添加适当的异常处理机制 4. **优雅关闭**:在应用退出时调用`shutdownAllThreadPools()`方法关闭所有线程池 5. **监控**:建议结合监控系统监控线程池运行状态 ## 扩展与自定义 ### 自定义拒绝策略 可以实现`RejectedExecutionHandler`接口,创建自定义的拒绝策略,然后通过URL参数指定使用。 ### 自定义队列类型 系统支持扩展更多的队列类型,只需在`QueueStrategyFactory`中添加相应的创建逻辑。 ## SPI扩展机制 Dynamic Thread Pool项目全面采用了参考Dubbo SPI设计的扩展机制,通过`@SPI`注解和`ExtensionLoader`实现了核心组件的灵活扩展。以下是关于SPI扩展的详细说明: ### 可扩展的核心组件 1. **线程池实现** (`ThreadPool`接口) - 扩展自定义的线程池实现 - 实现动态参数调整逻辑 - 自定义线程池监控和统计功能 2. **队列策略** (`QueueStrategy`接口,已通过@SPI注解支持) - 提供统一的队列创建接口 - 通过SPI机制动态加载不同队列实现 - 支持自定义队列参数和行为 3. **拒绝策略** (`RejectStrategy`接口,已通过@SPI注解支持) - 实现自定义的任务拒绝处理逻辑 - 支持通过SPI自动加载注册 - 提供多种内置实现(abort、discard、callerruns等) 4. **线程工厂** (`ThreadFactory`接口,已通过@SPI注解支持) - 自定义线程创建和命名逻辑 - 实现线程优先级和守护线程设置 - 支持通过SPI机制扩展不同实现 ### 如何通过SPI进行扩展 #### 1. 扩展已有的SPI接口 项目中核心SPI接口都已使用`@SPI`注解标记,如: ```java @SPI("LinkedBlockingQueue") public interface QueueStrategy { BlockingQueue createQueue(int capacity, URL url); String getTypeName(); } @SPI("named") public interface ThreadFactory extends java.util.concurrent.ThreadFactory { // 扩展方法 } ``` #### 2. 实现SPI接口 创建自定义实现类: ```java public class MyCustomQueueStrategy implements QueueStrategy { @Override public BlockingQueue createQueue(int capacity, URL url) { // 实现自定义队列创建逻辑 return new CustomBlockingQueue<>(capacity); } @Override public String getTypeName() { return "customQueue"; } } ``` #### 3. 配置SPI实现 在项目的`META-INF/dubbo/`目录下创建对应的接口全限定名文件,并配置实现类映射: ``` # 在META-INF/dubbo/com.dynamicthreadpool.strategy.QueueStrategy文件中 customQueue=com.example.MyCustomQueueStrategy ``` #### 4. 使用扩展实现 系统会自动通过`ExtensionLoader`加载SPI扩展,无需手动注册: ```java // 直接通过名称获取扩展实现 QueueStrategy strategy = ExtensionLoader.getExtensionLoader(QueueStrategy.class).getExtension("customQueue"); // 或通过工厂类获取(推荐方式) BlockingQueue queue = QueueStrategyFactory.getStrategy("customQueue").createQueue(capacity, url); ``` ### 扩展示例 #### 自定义队列策略示例 ```java // 1. 实现QueueStrategy接口 public class PriorityBlockingQueueStrategy implements QueueStrategy { @Override public BlockingQueue createQueue(int capacity, URL url) { // PriorityBlockingQueue不需要固定容量,这里忽略capacity参数 return new PriorityBlockingQueue<>(); } @Override public String getTypeName() { return "PriorityBlockingQueue"; } } // 2. 在META-INF/dubbo/com.dynamicthreadpool.strategy.QueueStrategy文件中配置 // PriorityBlockingQueue=com.example.PriorityBlockingQueueStrategy // 3. 使用自定义队列策略 URL url = new URL("dynamic", "localhost", 8888) .setParameter("queueType", "PriorityBlockingQueue") .setParameter("queues", "100"); Executor executor = DynamicThreadPoolFactory.getExecutor(url); ``` #### 自定义线程工厂示例 ```java // 1. 实现ThreadFactory接口 public class PriorityThreadFactoryImpl implements ThreadFactory { private final String namePrefix; private final AtomicInteger threadNumber = new AtomicInteger(1); private final ThreadGroup group; private int priority = Thread.NORM_PRIORITY; public PriorityThreadFactoryImpl(String namePrefix) { this.namePrefix = namePrefix; SecurityManager s = System.getSecurityManager(); this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); } public ThreadFactory setPriority(int priority) { this.priority = priority; return this; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } t.setPriority(this.priority); return t; } @Override public ThreadFactory setName(String name) { return this; // 已在构造函数中设置名称 } @Override public ThreadFactory setDaemon(boolean daemon) { return this; // 此处省略实现 } @Override public String getName() { return namePrefix; } } // 2. 在META-INF/dubbo/com.dynamicthreadpool.spi.ThreadFactory文件中配置 // priority=com.example.PriorityThreadFactoryImpl // 3. 系统会自动通过ThreadFactoryFactory加载并使用 ``` #### 自定义拒绝策略示例 ```java // 1. 实现RejectStrategy接口 public class RetryRejectStrategy implements RejectStrategy { private final int maxRetries; private final long retryDelay; public RetryRejectStrategy() { this.maxRetries = 3; this.retryDelay = 100; } @Override public RejectedExecutionHandler newRejectedExecutionHandler(URL url) { return (r, executor) -> { int retries = 0; while (retries < maxRetries && !executor.isShutdown()) { try { Thread.sleep(retryDelay); if (executor.getQueue().offer(r, retryDelay, TimeUnit.MILLISECONDS)) { return; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } retries++; } // 重试失败后执行默认拒绝策略 new ThreadPoolExecutor.AbortPolicy().rejectedExecution(r, executor); }; } } // 2. 在META-INF/dubbo/com.dynamicthreadpool.strategy.RejectStrategy文件中配置 // retry=com.example.RetryRejectStrategy // 3. 使用自定义拒绝策略 URL url = new URL("dynamic", "localhost", 8888) .setParameter("rejectPolicy", "retry"); Executor executor = DynamicThreadPoolFactory.getExecutor(url); ``` ### 内置SPI扩展实现 项目内置了以下SPI扩展实现: #### 队列策略 - `LinkedBlockingQueue` - 基于链表的无界阻塞队列 - `ArrayBlockingQueue` - 基于数组的有界阻塞队列 - `SynchronousQueue` - 不存储元素的同步队列 - `PriorityBlockingQueue` - 支持优先级排序的无界阻塞队列 #### 线程工厂 - `named` - 支持自定义线程名称的线程工厂实现 #### 拒绝策略 - `abort` - 直接抛出RejectedExecutionException异常 - `discard` - 丢弃被拒绝的任务 - `discardoldest` - 丢弃队列头部的任务,然后尝试重新提交 - `callerruns` - 由调用线程执行被拒绝的任务 - `abortwithreport` - 抛出异常并记录详细信息 ## 版本历史 ### 1.0.0 (初始版本) - 实现动态线程池核心功能 - 支持Spring Boot自动配置 - 集成Netty服务器 - 提供线程池创建、更新、查询等基本操作 - 支持SPI扩展机制 ## License [Apache License 2.0](LICENSE)