SpringBoot 异步接口
代码】SpringBoot 异步接口。
·
一 什么是异步调用?
异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。
二 如何实现异步调用?
多线程,这是很多人第一眼想到的关键词,没错,多线程就是一种实现异步调用的方式。
在非spring目项目中我们要实现异步调用的就是使用多线程方式,可以自己实现Runable接口或者集成Thread类,或者使用jdk1.5以上提供了的Executors线程池。
StrngBoot中则提供了很方便的方式执行异步调用。
三 异步接口的使用场景
耗时比较长,任务比较多的接口。比方说,文件下载,大文件下载比较耗时,这个时候就可以使用异步接口。
四 示例
4.1 maven依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
4.2 启动类:添加@EnableAsync注解
@SpringBootApplication
@EnableAsync
public class Application{
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
4.3 Controller
@Api(tags = "测试异步")
@RestController
@RequestMapping("/api/async/v1")
public class AsyncController {
@Autowired
private AsynService asynService;
@ApiOperation("异步接口测试")
@GetMapping("/test")
public Object findDetailById() throws InterruptedException, ExecutionException {
long currentTimeMillis = System.currentTimeMillis();
asynService.asyncTest();
long currentTimeMillis1 = System.currentTimeMillis();
String result = "task任务总耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms";
return result;
}
}
spring扫描时具有@Async注解方法的类时,是生成一个代理类,由代理类去开启关闭事务,而在同一个类中,方法调用是在类体内执行的,spring无法截获这个方法调用。将异步任务单独放到一个类中
public interface AsynService {
void asyncTest();
void asyncTest1() throws ExecutionException, InterruptedException;
void asyncTest3();
}
@Service
public class AsynServiceImpl implements AsynService {
@Autowired
private AsyncTask asyncTask;
@Autowired
AddressMapper addressMapper;
@Autowired
UserMapper userMapper;
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void asyncTest() {
Future<AddressDO> task1 = null;
try {
task1 = asyncTask.t1();
} catch (InterruptedException e) {
e.printStackTrace();
}
Future<UserDO> task2 = null;
try {
task2 = asyncTask.t2();
} catch (InterruptedException e) {
e.printStackTrace();
}
Future task3 = null;
try {
task3 = asyncTask.t3();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (;;) {
if(task1.isDone() && task2.isDone() && task3.isDone()) {
// 三个任务都调用完成,退出循环等待
break;
}
}
System.out.println("最后执行了");
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void asyncTest1() throws ExecutionException, InterruptedException {
CompletableFuture<Long> future1=CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
UserDO userDO = new UserDO();
userDO.setName("利益");
userDO.setPwd("123");
userDO.setMail("37396916374@qq.com");
userMapper.insert(userDO);
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return userDO.getId();
}
});
CompletableFuture<Long> nestedResult = future1.thenCompose(value->
CompletableFuture.supplyAsync(()->{
AddressDO addressDO = new AddressDO();
addressDO.setUserId(value);
addressDO.setCity("南京");
addressDO.setPhone("187210355738");
addressMapper.insert(addressDO);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return addressDO.getId();
}));
while (true){
if(future1.isDone()&& nestedResult.isDone()){
break;
}
}
System.out.println(nestedResult.get());
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void asyncTest3() {
CompletableFuture<Long> future1=CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
UserDO userDO = new UserDO();
userDO.setName("利益");
userDO.setPwd("123");
userDO.setMail("33294912374@qq.com");
userMapper.insert(userDO);
return userDO.getId();
}
});
CompletableFuture<Long> nestedResult = future1.thenCompose(value->
CompletableFuture.supplyAsync(()->{
AddressDO addressDO = new AddressDO();
addressDO.setUserId(value);
addressDO.setCity("南京");
addressDO.setPhone("18210535378");
addressMapper.insert(addressDO);
return addressDO.getId();
}));
while (true){
if(future1.isDone()&& nestedResult.isDone()){
break;
}
}
int i = 10/0;
}
}
@Service
public class AsyncTask {
@Autowired
UserMapper userMapper;
@Autowired
AddressMapper addressMapper;
public Future<AddressDO> t1() throws InterruptedException {
return task1();
}
public Future<UserDO> t2() throws InterruptedException {
return task2();
}
public Future t3() throws InterruptedException {
return task3();
}
@Async("asyncServiceExecutor")
public Future<AddressDO> task1() throws InterruptedException{
long currentTimeMillis = System.currentTimeMillis();
AddressDO addressDO = new AddressDO();
addressDO.setCity("南京");
addressDO.setPhone("18670555948");
addressMapper.insert(addressDO);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("task1任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
return new AsyncResult(addressDO);
}
@Async("asyncServiceExecutor")
public Future<UserDO> task2() throws InterruptedException{
long currentTimeMillis = System.currentTimeMillis();
UserDO userDO = new UserDO();
userDO.setName("利益");
userDO.setPwd("123");
userDO.setMail("3499283774@qq.com");
userMapper.insert(userDO);
test7();
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("task2任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
return new AsyncResult(userDO);
}
public void test7() throws InterruptedException {
FixedThreadPoolUtil.doExecutor(() -> {
try {
task4();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
FixedThreadPoolUtil.doExecutor(() -> {
try {
task5();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public void task4() throws InterruptedException {
long currentTimeMillis = System.currentTimeMillis();
AddressDO addressDO = new AddressDO();
addressDO.setCity("ti京");
addressDO.setPhone("14710555348");
addressMapper.insert(addressDO);
Thread.sleep(4000L);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("task4任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
}
public void task5() throws InterruptedException {
long currentTimeMillis = System.currentTimeMillis();
AddressDO addressDO = new AddressDO();
addressDO.setCity("东京");
addressDO.setPhone("18615055567");
addressMapper.insert(addressDO);
Thread.sleep(6000L);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("task5任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
}
@Async("asyncServiceExecutor")
public Future task3() throws InterruptedException{
long currentTimeMillis = System.currentTimeMillis();
Thread.sleep(3000L);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("task3任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
return new AsyncResult("");
}
}
自定义线程池:
@Configuration
@Slf4j
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService(){
return new ThreadPoolExecutor(3,10,60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
Executors.defaultThreadFactory(),
(r,e)->System.out.println("is full"));
}
}
/**
* 多线程的配置
*
* @author llh
* @version 1.0
* @date 2021/11/16 17:49
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("线程池启动了");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
async.executor.thread.core_pool_size=4
async.executor.thread.max_pool_size=8
async.executor.thread.queue_capacity=135
async.executor.thread.name.prefix= ticketManager-worker-
/**
* 固定数量线程池
*/
public final class FixedThreadPoolUtil {
private static ExecutorService executorService;
private static class SigleClass {
private static ExecutorService executor = new ThreadPoolExecutor(
12,
200,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1024),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
private static ExecutorService getInstance() {
if (null == executorService) {
executorService = SigleClass.executor;
}
return executorService;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Future doExecutor(Callable callable) {
getInstance();
return executorService.submit(callable);
}
public static void doExecutor(Runnable runnable) {
getInstance();
executorService.execute(runnable);
}
}
FixedThreadPoolUtil.doExecutor(() -> test(s));
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(getTaskScheduler());
}
@Bean
public TaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(7);
taskScheduler.setThreadNamePrefix("myworker-");
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
return taskScheduler;
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)