springboot之线程池ThreadPoolTaskExecutor以及@Async异步注解
前言最近项目当中有需求,要进行异步的处理,需要使用到线程池,很久没有使用到线程池了,一来是做JAVAweb开发基本上很少用到异步处理,二来是发现有的老项目里面,线程和线程池的使用比较混乱,有好几个线程池,有的线程池是通过spring管理的,有的是自己创建的,然后有的地方是直接创建的线程。所以这里记录下自己在项目当中如何优雅的使用线程池!避免项目当中到处都是线程池!!!SpringBoot整合Thr
前言
最近项目当中有需求,要进行异步的处理,需要使用到线程池,很久没有使用到线程池了,一来是做JAVAweb开发基本上很少用到异步处理,二来是发现有的老项目里面,线程和线程池的使用比较混乱,有好几个线程池,有的线程池是通过spring管理的,有的是自己创建的,然后有的地方是直接创建的线程。所以这里记录下自己在项目当中如何优雅的使用线程池!避免项目当中到处都是线程池!!!
SpringBoot整合ThreadPoolTaskExecutor线程池
ThreadPoolExecutor:这个是JAVA自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建! ThreadPoolTaskExecutor :这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。
In the absence of an Executor bean in the context, Spring Boot auto-configures a ThreadPoolTaskExecutor with sensible defaults that can be automatically associated to asynchronous task execution (@EnableAsync) and Spring MVC asynchronous request processing.
在springboot当中,根据 官方文档的说明,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,我们只需要按照他的方式调用就可以了!!!
使用springboot默认的线程池
既然springboot有默认的线程池,说明我们可以很简单的进行调用
方式一:通过@Async注解调用
第一步:在Application启动类上面加上@EnableAsync
@SpringBootApplication @EnableAsync public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }
第二步:在需要异步执行的方法上加上@Async注解
@Service public class AsyncTest { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public void hello(String name){ //这里使用logger 方便查看执行的线程是什么 logger.info("异步线程启动 started."+name); } }
第三步:测试类进行测试验证
@Autowired AsyncTest asyncTest; @Test void contextLoads() throws InterruptedException { asyncTest.hello("afsasfasf"); //一定要休眠 不然主线程关闭了,子线程还没有启动 Thread.sleep(1000); }
查看打印的日志:
INFO 2276 --- [ main] c.h.s.t.t.ThreadpoolApplicationTests : Started ThreadpoolApplicationTests in 3.003 seconds (JVM running for 5.342) INFO 2276 --- [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.afsasfasf
可以清楚的看到新开了一个task-1的线程执行任务。验证成功!!!
方式二:直接调用ThreadPoolTaskExecutor
修改上面测试类,直接注入ThreadPoolTaskExecutor
@SpringBootTest class ThreadPoolApplicationTests { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired AsyncTest asyncTest; @Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() throws InterruptedException { asyncTest.hello("async注解创建"); threadPoolTaskExecutor.submit(new Thread(()->{ logger.info("threadPoolTaskExecutor 创建线程"); })); //一定要休眠 不然主线程关闭了,子线程还没有启动 Thread.sleep(1000); } }
查看打印的日志发现都成功创建线程!!!:
INFO 12360 --- [ task-2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 12360 --- [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建
备注1:如果只使用ThreadPoolTaskExecutor, 是可以不用在Application启动类上面加上@EnableAsync注解的哦!!!
备注2:多次测试发现ThreadPoolTaskExecutor执行比@Async要快!!!
线程池默认配置信息
以下是springboot默认的线程池配置,可以在application.properties文件当中进行相关的设置!!!
# 核心线程数 spring.task.execution.pool.core-size=8 # 最大线程数 spring.task.execution.pool.max-size=16 # 空闲线程存活时间 spring.task.execution.pool.keep-alive=60s # 是否允许核心线程超时 spring.task.execution.pool.allow-core-thread-timeout=true # 线程队列数量 spring.task.execution.pool.queue-capacity=100 # 线程关闭等待 spring.task.execution.shutdown.await-termination=false spring.task.execution.shutdown.await-termination-period= # 线程名称前缀 spring.task.execution.thread-name-prefix=task-
深入springboot默认的线程池
根据官方文档的说明,Spring Boot auto-configures a ThreadPoolTaskExecutor 。最终找到springboot的线程池自动装配类:TaskExecutionAutoConfiguration
@Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); Stream var10001 = taskExecutorCustomizers.orderedStream(); var10001.getClass(); builder = builder.customizers(var10001::iterator); builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique()); return builder; }
同时在ThreadPoolTaskExecutor源码当中可以看到线程池的初始化方式是直接调用的ThreadPoolExecutor进行的初始化。
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { public void execute(Runnable command) { Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command); if (decorated != command) { ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
同时会发现默认的线程池拒绝策略是: AbortPolicy 直接抛出异常!!!
private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();
使用自定义的线程池
在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?
自定义Configuration
第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy
@Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。
INFO 12740 --- [ myExecutor--2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 12740 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建
第二步:如果配置有多个线程池,该如何指定线程池呢?
@Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } @Bean("poolExecutor") public Executor poolExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } @Bean("taskPoolExecutor") public Executor taskPoolExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor3--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
执行测试类,直接报错说找到多个类,不知道加载哪个类:
No qualifying bean of type 'org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor' available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor
由于测试类当中是这样自动注入的:
@Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor;
考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!
即修改测试类如下:
@Autowired AsyncTest asyncTest; @Autowired ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池 @Test void contextLoads() throws InterruptedException { asyncTest.hello("async注解创建"); //一定要休眠 不然主线程关闭了,子线程还没有启动 poolExecutor.submit(new Thread(()->{ logger.info("threadPoolTaskExecutor 创建线程"); })); Thread.sleep(1000); }
最后得到如下信息:
INFO 13636 --- [ myExecutor2--1] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 13636 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建
测发现还是报错,需要先定义Executor类
@Autowired private Executor executor;
转换后使用:
ThreadPoolTaskExecutor exe = (ThreadPoolTaskExecutor )executor;
exe.submit(....);
备注1:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:
@Async("taskPoolExecutor") public void hello(String name){ logger.info("异步线程启动 started."+name); }
备注2:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池
Runnable 例子:
import org.springframework.core.task.TaskExecutor;
public class TaskExecutorExample {
private class MessagePrinterTask implements Runnable {
private String message;
public MessagePrinterTask(String message) {
this.message = message;
}
public void run() {
System.out.println(message);
}
}
private TaskExecutor taskExecutor;
public TaskExecutorExample(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public void printMessages() {
for(int i = 0; i < 25; i++) {
taskExecutor.execute(new MessagePrinterTask("Message" + i));
}
}
}
总结
线程池的四种拒绝策略:线程池ThreadPoolExecutor里面4种拒绝策略 - 诗出函谷 - 博客园 JAVA常用的四种线程池: Java 四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor - 星辰之力 - 博客园 线程池的使用是为了管理线程,但是对于线程池项目当中也是要管理起来的。有利于后续的维护!!!
转自:
springboot之线程池ThreadPoolTaskExecutor以及@Async异步注解 - 云+社区 - 腾讯云
更多推荐
所有评论(0)