关键字:SpringBoot 异步执行方法、Spring 异步执行有返回值的方法


Spring 中,我们可以找到 @EnableAsync 注解,通过月的该注解的作用,大致有这么几点信息:

① 该注解的作用是开启 SpringBoot 异步执行方法的能力

@EnableAsync 注解必须要和 @Configuration 注解一起使用

 @Configuration
 @EnableAsync
 public class AppConfig {
 }

③ 如果没有配置线程池而直接使用 @Async 注解,此时会使用默认策略

1. 先在 Context 中根据类型寻找 org.springframework.core.task.TaskExecutor 对象
2. 如果没有找到,则在 Context 中根据类型寻找 java.util.concurrent.Executor 对象且名称为 'taskExecutor'
3. 如果仍然没有找到,则创建 org.springframework.core.task.SimpleAsyncTaskExecutor 用于执行异步方法

④ 异步执行有两种方法

// 第一种
定义一个线程池,并 @EnableAsync 开启异步,在需要异步执行的方法上增加 @Async
// 第二种
实现 AsyncConfigurer 接口,在需要异步支持的方法上使用 @Async

第一种方法:配置一个线程池

① 配置线程池

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import lombok.Setter;

/**
 * 线程池配置类
 *
 * @author Kyle White
 * @date Oct 31, 2021
 */
@Setter
@EnableAsync
@Configuration
@ConfigurationProperties(prefix = "task.pool")
public class ThreadPoolConfig {

    /** 线程池中的核心线程数量,默认为1 */
    private int corePoolSize = 5;
    /** 线程池中的最大线程数量 */
    private int maxPoolSize = 10;
    /** 线程池中允许线程的空闲时间,默认为 60s */
    private int keepAliveTime = ((int) TimeUnit.SECONDS.toSeconds(30));
    /** 线程池中的队列最大数量 */
    private int queueCapacity = 1000;

    /** 线程的名称前缀 */
    private static final String THREAD_PREFIX = "thread-call-runner-%d";

    @Bean
    @Lazy
    public ThreadPoolTaskExecutor threadPool(){

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix(THREAD_PREFIX);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.initialize();

        return executor;
    }


}

② 在配置文件中进行配置

task:
  pool:
    corePoolSize: 5
    maxPoolSize: 20
    keepAliveSeconds: 300
    queueCapacity: 50

在这里插入图片描述
③ 测试:向线程池中提交任务

提交多个任务,测试多个线程的累加是否正确

import java.util.concurrent.atomic.LongAdder;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

/**
 * 原子自增任务
 *
 * @author Kyle White
 * @date Oct 31, 2021
 */
@NoArgsConstructor
@AllArgsConstructor
public class AutomaticIncTask implements Runnable {

    private LongAdder adder;

    @Override
    public void run() {

        for (int i = 0; i < 1000; i++) {
            adder.increment();
        }

    }

}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import org.example.task.AutomaticIncTask;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * 线程池配置测试
 *
 * @author Kyle White
 * @date Oct 31, 2021
 */
@SpringBootTest
public class ThreadPoolConfigTest {

    @Autowired
    private ThreadPoolTaskExecutor executor;

    @Test
    public void testThreadPoolTaskExecutor() {

        LongAdder adder = new LongAdder();

        List<ListenableFuture<?>> futures = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            // 注意这里
            // 这里提交的是无返回值的任务,有返回值的任务需要实现 Callable 接口
            // futures 用于收集线程的监听结果,由于是异步的,主线程是不会等待线程池里面的任务的
            futures.add(executor.submitListenable(new AutomaticIncTask(adder)));
        }

        // 附带阻塞主线程效果,目的是为了让主线程等待线程池中的线程执行完毕
        futures.forEach(ListenableFuture::completable);

        System.out.println("执行完毕:" + adder.longValue());

    }

}

④ 在方法上使用 @Async 开启异步任务

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("asyncThreadPool")
    @Override
    public void asyncNoReturn() {
        new AutomaticIncTask(new LongAdder()).run();
    }

    @Async("asyncThreadPool")
    @Override
    public ListenableFuture<String> asyncReturn() throws Exception {
        // 这里可以改写成执行多个任务(增加任务的并行执行能力),对任务的返回值进行过滤
        // 1. 如果全部没有包含错误信息,则返回结果为 success
        // 2. 如果其中一个返回值包含错误信息,则返回结果为 fail
        return new AsyncResult<>(new CallAbleTask(new LongAdder()).call());
    }

}

第一种:执行无返回值的方法

@GetMapping("/hello")
public void hello() {
   service.asyncNoReturn();
}

第二种:执行有返回值的方法

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.LongAdder;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

/**
 * @author Kyle White
 * @date Oct 31, 2021
 */
@NoArgsConstructor
@AllArgsConstructor
public class CallAbleTask implements Callable<String> {

    private LongAdder adder;

    @Override
    public String call() {

        String result = "success";
        
        try {
            // 执行成功
            for (int i = 0; i < 1000; i++) {
                adder.increment();
            }
            
        } catch (Exception e) {
            // 执行错误
            result = "fail";
        }

        return result;
    }

}

注意:有返回值的需要通过 org.springframework.util.concurrent.ListenableFuture 包装,具体请查看 @Async 注解的介绍
在这里插入图片描述

@GetMapping("/hi")
public String hi() throws Exception {
    ListenableFuture<String> future = service.asyncReturn();
    return future.get();
}

第二种方法:实现 AsyncConfigurer 接口

这个方法没有测试,自行测试

① 实现接口

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

     @Override
     public Executor getAsyncExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(5);
         executor.setMaxPoolSize(10);
         executor.setQueueCapacity(1000);
         executor.setThreadNamePrefix("thread-call-runner-%d");
         executor.initialize();
         return executor;
     }

     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncUncaughtExceptionHandler();
     }
}

② 在方法上使用 @Async

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐