![cover](http://image.wzoe.fun/i/2024/06/27/667d22153d7b9.png)
JAVA通过配置中心(NACOS)实现动态线程池
AI-摘要
佐伊 GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
引入Nacos依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
新增配置文件bootstrap.yml
spring:
cloud:
nacos:
config:
server-addr: localhost:8848
extension-configs:
- dataId: application.yaml
group: test
refresh: true
Nacos配置
project:
threads:
poolA: ## 这里是自动创建的线程池bean Name
corePoolSize: 12
maxPoolSize: 20
queueCapacity: 10
thread-name-prefix: poolA-
allow-core-thread-timeout: false
keep-alive-seconds: 60 #保持活跃时间 可通过统一配置中心修改
poolB:
corePoolSize: 10
maxPoolSize: 20
queueCapacity: 10
一些属性配置model
ThreadPoolProperties
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* 线程池配置
*
* @author zoe
* @date 2024/6/24 18:53
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ThreadPoolProperties {
/**
* 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut, 默认1
*/
private int corePoolSize = 1;
/**
* 池中允许的最大线程数, 默认100
*/
private int maxPoolSize = Integer.MAX_VALUE;
/**
* 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间,单位秒, 默认60秒
*/
private int keepAliveSeconds = 60;
/**
* 线程名前缀
*/
private String threadNamePrefix = "zoe-executor-";
/**
* 是否允许核心线程超时
*/
private boolean allowCoreThreadTimeout = false;
/**
* 队列容量, 大于0时使用LinkedBlockingQueue,容量为设置的值, 小于等于0时使用SynchronousQueue
*/
private int queueCapacity = Integer.MAX_VALUE;
public String toString() {
return "{" +
"corePoolSize=" + corePoolSize +
", maxPoolSize=" + maxPoolSize +
", keepAliveTimeSeconds=" + keepAliveSeconds +
'}';
}
}
ThreadsProperties
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* 线程池配置
*
* @author zoe
* @date 2024/6/26 16:25
*/
@Data
public class ThreadsProperties {
private Map<String, ThreadPoolProperties> threads = new HashMap<>();
}
ZoeBeanFactoryPostProcessor
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.context.properties.bind.BindResult;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.Map;
/**
* 一些在bean初始化前的bean定义。 比如redis连接池bean、线程池bean等,这些都需要根据配置来动态定义
*/
@Configuration
public class ZoeBeanFactoryPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ZoeBeanFactoryPostProcessor.class);
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor(Environment environment) {
return beanFactory -> {
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
BindResult<ThreadsProperties> result = Binder.get(environment)
.bind("project", ThreadsProperties.class);
ThreadsProperties threadsProperties = result.get();
Map<String, ThreadPoolProperties> threadPropertiesMap = threadsProperties.getThreads();
if (!threadPropertiesMap.isEmpty()) {
threadPropertiesMap.forEach((beanName, threadProperties) -> {
boolean exist = defaultListableBeanFactory.containsBeanDefinition(beanName);
if (exist) {
return;
}
LOGGER.info("register thread pool task executor bean: {}", beanName);
BeanDefinition beanDefinition = BeanDefinitionBuilder.rootBeanDefinition(IbdThreadPoolTaskExecutorFactoryBean.class)
.addConstructorArgValue(threadProperties)
.getBeanDefinition();
defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinition);
});
}
};
}
}
ZoeThreadPoolTaskExecutorFactoryBean
import lombok.Data;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author zoe
* @date 2024/6/26 16:07
*/
@Data
public class ZoeThreadPoolTaskExecutorFactoryBean implements FactoryBean<ThreadPoolTaskExecutor> {
private final ThreadPoolProperties threadProperties;
public ZoeThreadPoolTaskExecutorFactoryBean(ThreadPoolProperties threadProperties) {
this.threadProperties = threadProperties;
}
@Override
public ThreadPoolTaskExecutor getObject() throws Exception {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(this.threadProperties.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(this.threadProperties.getMaxPoolSize());
threadPoolTaskExecutor.setKeepAliveSeconds(this.threadProperties.getKeepAliveSeconds());
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(this.threadProperties.isAllowCoreThreadTimeout());
threadPoolTaskExecutor.setQueueCapacity(this.threadProperties.getQueueCapacity());
threadPoolTaskExecutor.setThreadNamePrefix(this.threadProperties.getThreadNamePrefix());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public Class<?> getObjectType() {
return ThreadPoolTaskExecutor.class;
}
@Override
public boolean isSingleton() {
return FactoryBean.super.isSingleton();
}
}
PoolPropertyChangeListener
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.bind.BindResult;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* TODO:(这里用一句话描述这个类的作用)
*
* @author zoe
* @date 2024/6/26 15:36
*/
@Slf4j
@Component
public class PoolPropertyChangeListener implements ApplicationListener<EnvironmentChangeEvent> {
private static final Bindable<Map<String, ThreadPoolProperties>> BIND_MAP = Bindable.mapOf(String.class, ThreadPoolProperties.class);
@Override
public void onApplicationEvent(EnvironmentChangeEvent event) {
Set<String> keys = event.getKeys();
List<String> changedPoolBeanNames = keys.stream().filter(n -> n.startsWith("project.threads."))
.map(n -> StringUtils.substringBetween(n, "project.threads.", "."))
.distinct()
.collect(Collectors.toList());
if (changedPoolBeanNames.isEmpty()) {
return;
}
if (event.getSource() instanceof ApplicationContext) {
// 通知时,对应的configuration类实际是不会更新的,需要从environment中获取最新的配置
ApplicationContext applicationContext = (ApplicationContext) event.getSource();
BindResult<Map<String, ThreadPoolProperties>> bindResult = Binder.get(applicationContext.getEnvironment()).bind("project.threads", BIND_MAP);
if (bindResult.isBound()) {
Map<String, ThreadPoolProperties> threads = bindResult.get();
for (String beanName : changedPoolBeanNames) {
resetPoolProperty(applicationContext, beanName, threads);
}
}
}
}
private void resetPoolProperty(ApplicationContext applicationContext, String beanName, Map<String, ThreadPoolProperties> threads) {
ThreadPoolTaskExecutor threadPoolTaskExecutor;
try {
threadPoolTaskExecutor = applicationContext.getBean(beanName, ThreadPoolTaskExecutor.class);
} catch (Exception e) {
log.warn("error when get bean: {}", beanName, e);
return;
}
ThreadPoolProperties newProperties = threads.get(beanName);
if (newProperties == null) {
return;
}
// 三个参数可以变更,其余在创建后不能再被修改
threadPoolTaskExecutor.setCorePoolSize(newProperties.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(newProperties.getMaxPoolSize());
threadPoolTaskExecutor.setKeepAliveSeconds(newProperties.getKeepAliveSeconds());
log.info("reset pool property: {} {}", beanName, newProperties);
}
}
NacosDemo
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Map;
/**
* @author wangzhao
*/
@Slf4j
@RequiredArgsConstructor
@Controller
@RequestMapping("config")
public class NacosDemo {
private final Map<String, ThreadPoolTaskExecutor> map;
@RequestMapping(value = "/get")
@ResponseBody
public void get() {
map.forEach((k, v) -> {
int maximumPoolSize = v.getMaxPoolSize();
log.info("key: {}, value: {}", k, maximumPoolSize);
int corePoolSize = v.getCorePoolSize();
log.info("key: {}, value: {}", k, corePoolSize);
});
}
}
结果
修改后,立即生效
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员佐伊
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果