一 什么是异步调用?

 异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。 

二 如何实现异步调用?

多线程,这是很多人第一眼想到的关键词,没错,多线程就是一种实现异步调用的方式。

在非spring目项目中我们要实现异步调用的就是使用多线程方式,可以自己实现Runable接口或者集成Thread类,或者使用jdk1.5以上提供了的Executors线程池。

StrngBoot中则提供了很方便的方式执行异步调用。

三  异步接口的使用场景

耗时比较长,任务比较多的接口。比方说,文件下载,大文件下载比较耗时,这个时候就可以使用异步接口。

四 示例

4.1 maven依赖:

<parent>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-parent</artifactId>  
    <version>1.5.3.RELEASE</version>  
</parent>  
<dependencies>  
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-web</artifactId>  
    </dependency>  
</dependencies> 

4.2 启动类:添加@EnableAsync注解

@SpringBootApplication  
@EnableAsync  
public class Application{  
  
    public static void main(String[] args) {  
        SpringApplication.run(Application.class, args);  
    }  
} 

 4.3 Controller 

@Api(tags = "测试异步")
@RestController
@RequestMapping("/api/async/v1")
public class AsyncController {

    @Autowired
    private AsynService asynService;

    @ApiOperation("异步接口测试")
    @GetMapping("/test")
    public Object findDetailById() throws InterruptedException, ExecutionException {
        long currentTimeMillis = System.currentTimeMillis();
        asynService.asyncTest();
        long currentTimeMillis1 = System.currentTimeMillis();
        String result = "task任务总耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms";
        return result;
    }
}

spring扫描时具有@Async注解方法的类时,是生成一个代理类,由代理类去开启关闭事务,而在同一个类中,方法调用是在类体内执行的,spring无法截获这个方法调用。将异步任务单独放到一个类中

public interface AsynService {
    void asyncTest();

    void asyncTest1() throws ExecutionException, InterruptedException;

    void asyncTest3();
}
@Service
public class AsynServiceImpl implements AsynService {

    @Autowired
    private AsyncTask asyncTask;

    @Autowired
    AddressMapper addressMapper;

    @Autowired
    UserMapper userMapper;

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void asyncTest() {
        Future<AddressDO> task1 = null;
        try {
            task1 = asyncTask.t1();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Future<UserDO> task2 = null;
        try {
            task2 = asyncTask.t2();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Future task3 = null;
        try {
            task3 = asyncTask.t3();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (;;) {
            if(task1.isDone() && task2.isDone() && task3.isDone()) {
                // 三个任务都调用完成,退出循环等待
                break;
            }
        }
        System.out.println("最后执行了");
    }

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void asyncTest1() throws ExecutionException, InterruptedException {
        CompletableFuture<Long> future1=CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                UserDO userDO = new UserDO();
                userDO.setName("利益");
                userDO.setPwd("123");
                userDO.setMail("37396916374@qq.com");
                userMapper.insert(userDO);
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return userDO.getId();
            }
        });

        CompletableFuture<Long> nestedResult = future1.thenCompose(value->
                CompletableFuture.supplyAsync(()->{
                    AddressDO addressDO = new AddressDO();
                    addressDO.setUserId(value);
                    addressDO.setCity("南京");
                    addressDO.setPhone("187210355738");
                    addressMapper.insert(addressDO);
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return addressDO.getId();
                }));
        while (true){
            if(future1.isDone()&& nestedResult.isDone()){
                break;
            }
        }
        System.out.println(nestedResult.get());
    }

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void asyncTest3() {
        CompletableFuture<Long> future1=CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                UserDO userDO = new UserDO();
                userDO.setName("利益");
                userDO.setPwd("123");
                userDO.setMail("33294912374@qq.com");
                userMapper.insert(userDO);
                return userDO.getId();
            }
        });

        CompletableFuture<Long> nestedResult = future1.thenCompose(value->
                CompletableFuture.supplyAsync(()->{
                    AddressDO addressDO = new AddressDO();
                    addressDO.setUserId(value);
                    addressDO.setCity("南京");
                    addressDO.setPhone("18210535378");
                    addressMapper.insert(addressDO);
                    return addressDO.getId();
                }));
        while (true){
            if(future1.isDone()&& nestedResult.isDone()){
                break;
            }
        }
        int i = 10/0;
    }
}
@Service
public class AsyncTask {

    @Autowired
    UserMapper userMapper;

    @Autowired
    AddressMapper addressMapper;

    public Future<AddressDO> t1() throws InterruptedException {
        return task1();
    }

    public Future<UserDO> t2() throws InterruptedException {
        return task2();
    }

    public Future t3() throws InterruptedException {
        return task3();
    }


    @Async("asyncServiceExecutor")
    public Future<AddressDO> task1() throws InterruptedException{
        long currentTimeMillis = System.currentTimeMillis();
        AddressDO addressDO = new AddressDO();
        addressDO.setCity("南京");
        addressDO.setPhone("18670555948");
        addressMapper.insert(addressDO);
        long currentTimeMillis1 = System.currentTimeMillis();
        System.out.println("task1任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
        return new AsyncResult(addressDO);
    }

    @Async("asyncServiceExecutor")
    public Future<UserDO> task2() throws InterruptedException{
        long currentTimeMillis = System.currentTimeMillis();
        UserDO userDO = new UserDO();
        userDO.setName("利益");
        userDO.setPwd("123");
        userDO.setMail("3499283774@qq.com");
        userMapper.insert(userDO);
        test7();
        long currentTimeMillis1 = System.currentTimeMillis();
        System.out.println("task2任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
        return new AsyncResult(userDO);
    }

    public void test7() throws InterruptedException {
        FixedThreadPoolUtil.doExecutor(() -> {
            try {
                task4();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        FixedThreadPoolUtil.doExecutor(() -> {
            try {
                task5();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public void task4() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        AddressDO addressDO = new AddressDO();
        addressDO.setCity("ti京");
        addressDO.setPhone("14710555348");
        addressMapper.insert(addressDO);
        Thread.sleep(4000L);
        long currentTimeMillis1 = System.currentTimeMillis();
        System.out.println("task4任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
    }

    public void task5() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        AddressDO addressDO = new AddressDO();
        addressDO.setCity("东京");
        addressDO.setPhone("18615055567");
        addressMapper.insert(addressDO);
        Thread.sleep(6000L);
        long currentTimeMillis1 = System.currentTimeMillis();
        System.out.println("task5任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
    }

    @Async("asyncServiceExecutor")
    public Future task3() throws InterruptedException{
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(3000L);
        long currentTimeMillis1 = System.currentTimeMillis();
        System.out.println("task3任务耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");
        return new AsyncResult("");
    }

}

自定义线程池: 

@Configuration
@Slf4j
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService(){
        return new ThreadPoolExecutor(3,10,60, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10000),
                Executors.defaultThreadFactory(),
                (r,e)->System.out.println("is full"));

    }
}
/**
 * 多线程的配置
 *
 * @author llh
 * @version 1.0
 * @date 2021/11/16 17:49
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("线程池启动了");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}
async.executor.thread.core_pool_size=4
async.executor.thread.max_pool_size=8
async.executor.thread.queue_capacity=135
async.executor.thread.name.prefix= ticketManager-worker-
/**
 * 固定数量线程池
 */
public final class FixedThreadPoolUtil {

    private static ExecutorService executorService;

    private static class SigleClass {
        private static ExecutorService executor = new ThreadPoolExecutor(
                12,
                200,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(1024),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static ExecutorService getInstance() {
        if (null == executorService) {
            executorService = SigleClass.executor;
        }
        return executorService;
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public static Future doExecutor(Callable callable) {
        getInstance();
        return executorService.submit(callable);
    }

    public static void doExecutor(Runnable runnable) {
        getInstance();
        executorService.execute(runnable);
    }

}
FixedThreadPoolUtil.doExecutor(() -> test(s));
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setTaskScheduler(getTaskScheduler());
    }

    @Bean
    public TaskScheduler getTaskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(7);
        taskScheduler.setThreadNamePrefix("myworker-");
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        return taskScheduler;
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐