1、Future

Future代表异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。只有在计算完成后,才能使用方法get检索结果,如有必要,将其阻塞,直到准备就绪。取消是通过取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。

	//等待异步任务完成,然后检索其结果
    V get() throws InterruptedException, ExecutionException;
	//最多等待给定的时间以完成计算,然后检索其结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
	//如果此任务已完成,则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
	boolean isDone();
	private static final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
        int i = 0;

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "test-" + i++);
        }
    });
	
    public static void demo01() {
        log.info("创建异步任务");
        Future<String> submit = executor.submit(new Callable<String>() {
            @Override
            public String call() {
                String result = "fail";
                try {
                    log.info("开始执行异步任务");
                    // 执行任务耗时
                    Thread.sleep(10000);
                    result = "success";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return result;
            }
        });

        try {
            String result = submit.get();
            log.info("获取异步任务结果 " + result);
        } catch (InterruptedException e) {
            System.out.println("中断异常");
        } catch (ExecutionException e) {
            System.out.println("执行异常");
        }

        log.info("Future的get方法,会使当前线程阻塞");
    }

在这里插入图片描述

    public static void demo02() throws InterruptedException, ExecutionException {
        log.info("创建异步任务");
        Future<String> submit = executor.submit(new Callable<String>() {
            @Override
            public String call() {
                String result = "fail";
                try {
                    log.info("开始执行异步任务");
                    // 执行任务耗时
                    Thread.sleep(10000);
                    result = "success";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return result;
            }
        });

        log.info("轮询调用isDone方法查询异步任务是否完成");
        while (true) {
            if (submit.isDone()) {
                String result = submit.get();
                log.info(result);
                break;
            } else {
                log.info("异步任务还未完成,先干点别的事");
                Thread.sleep(1000);
            }
        }

        log.info("Future的get方法,会使当前线程阻塞");
    }

在这里插入图片描述
使用Future,并不能实现真正的异步,要么需要阻塞的获取结果,要么不断的轮询

2、CompletableFuture

CompletableFuture实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

	//创建带返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定		
	public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    
	//创建无返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定	
	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

	//如果以任何方式完成,则返回true:正常、异常或通过取消
	public boolean isDone() {
        return result != null;
    }
	//等待此任务完成,然后返回其结果
    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
	//最多等待给定的时间,以完成此任务,然后返回其结果
    public T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
    }
    //如果任务完成则返回结果集,否则返回给定的valueIfAbsent
    public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
    }

thenApply / thenAccept / thenRun

在流式处理中,等待上层任务正常执行完成后,再执行回调方法;
thenApply:上层任务的结果值作为回调方法的入参值,该回调方法有返回值
thenAccept:上层任务的结果值作为回调方法的入参值,该回调方法没有返回值
thenRun:没有入参也没有返回值的回调方法

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

	public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }


	public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }
    public static void demo03() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("执行异步任务");
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "success";
            }
        }, fixedThreadPool).thenApplyAsync((result) -> {
            log.info("上层任务结果: " + result);
            try {
                Thread.sleep((long) (Math.random() * 5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "over";
        }, fixedThreadPool);

        log.info("最终结果 = " + finalResult.get());
    }

在这里插入图片描述
如果上层任务抛异常则不会进入回调方法中

    public static void demo03() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("执行异步任务");
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //有异常
                if (true) throw new RuntimeException("异常");
                return "success";
            }
        }, fixedThreadPool).thenApplyAsync((result) -> {
            log.info("上层任务结果: " + result);
            try {
                Thread.sleep((long) (Math.random() * 5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "over";
        }, fixedThreadPool);

        //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
        log.info("最终结果 = " + finalResult.get());
    }

在这里插入图片描述

exceptionally

上层任务执行中,若抛出异常可被该方法接收,异常即该方法的参数;
若无异常,不会进入该方法并将上层的结果值继续下传。

    public static void demo03() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("执行异步任务");
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //有异常
                if (true) throw new RuntimeException("异常");
                return "success";
            }
        }, fixedThreadPool).exceptionally((exception) -> {
            try {
                log.info("异常处理 " + exception);
                Thread.sleep((long) (Math.random() * 5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "exception";
        }).thenApplyAsync((result) -> {
            log.info("上层任务结果: " + result);
            try {
                Thread.sleep((long) (Math.random() * 5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "over";
        }, fixedThreadPool);

        //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
        log.info("最终结果 = " + finalResult.get());
    }

异常情况
在这里插入图片描述
正常情况
在这里插入图片描述

whenComplete

接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法无返回值

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }
    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }
    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }
    public static void demo04() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("执行异步任务");
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //有异常
                if (true) throw new RuntimeException("异常");
                return "success";
            }
        }, fixedThreadPool).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                log.info("上层任务无异常,获取到上层结果为:" + result);
            } else {
                log.info("上层任务有异常,获取到上层结果为:" + result);
            }
        }, fixedThreadPool);

        //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
        log.info("最终结果 = " + finalResult.get());
    }

无异常
在这里插入图片描述
有异常
在这里插入图片描述

handle

接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法有返回值

    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }
    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }
    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }
    public static void demo04() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("执行异步任务");
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //有异常
                //if (true) throw new RuntimeException("异常");
                return "success";
            }
        }, fixedThreadPool).handleAsync((result, exception) -> {
            if (exception == null) {
                log.info("上层任务无异常,获取到上层结果为:" + result);
            } else {
                log.info("上层任务有异常,获取到上层结果为:" + result, exception);
            }
            return "handle " + result;
        }, fixedThreadPool);

        //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
        log.info("最终结果 = " + finalResult.get());
    }

无异常
在这里插入图片描述
有异常
在这里插入图片描述

thenCombine / thenAcceptBoth / runAfterBoth

将两个CompletableFuture组合起来,当这两个future都正常执行完了才会执行回调任务
thenCombine:2个future的返回值作为回调方法的入参值,该回调方法有返回值
thenAcceptBoth:2个future的返回值作为回调方法的入参值,该回调方法没有返回值
runAfterBoth:没有入参也没有返回值

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

    public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
        return biRunStage(null, other, action);
    }
    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
        return biRunStage(asyncPool, other, action);
    }
    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
        return biRunStage(screenExecutor(executor), other, action);
    }
    public static void demo05() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int i = 0;
                try {
                    log.info("开始执行异步任务");
                    Thread.sleep((long) (Math.random() * 5000));
                    i = 1;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i;
            }
        }, fixedThreadPool);

        CompletableFuture<Integer> supplyAsync2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int i = 0;
                try {
                    log.info("开始执行异步任务");
                    Thread.sleep((long) (Math.random() * 8000));
                    i = 2;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i;
            }
        }, fixedThreadPool);

        CompletableFuture<Integer> thenCombineAsync = supplyAsync.thenCombineAsync(supplyAsync2, (a, b) -> {
            log.info("a = " + a + ", b = " + b);
            return a + b;
        }, fixedThreadPool);
        log.info("thenCombineAsync = " + thenCombineAsync.get());

    }

在这里插入图片描述
其中任意一个有异常都会导致thenCombineAsync方法不执行

applyToEither / acceptEither / runAfterEither

将两个CompletableFuture组合起来,只要有一个future正常执行完了就可以执行回调任务
applyToEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法有返回值
acceptEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法没有返回值
runAfterEither:只要有任务执行完就调用回调方法

    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

    public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }
    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }
    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) {
        return orAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {
        return orRunStage(null, other, action);
    }
    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) {
        return orRunStage(asyncPool, other, action);
    }
    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor) {
        return orRunStage(screenExecutor(executor), other, action);
    }
    public static void demo06() throws ExecutionException, InterruptedException {
        log.info("创建异步任务");
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int i = 0;
                try {
                    log.info("执行异步任务");
                    Thread.sleep((long) (Math.random() * 5000));
                    i = 1;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i;
            }
        }, fixedThreadPool);

        CompletableFuture<Integer> supplyAsync2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int i = 0;
                try {
                    log.info("执行异步任务");
                    Thread.sleep((long) (Math.random() * 5000));
                    i = 2;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i;
            }
        }, fixedThreadPool);

        CompletableFuture<Integer> thenCombineAsync = supplyAsync.applyToEitherAsync(supplyAsync2, (result) -> {
            log.info("result " + result);
            return 3;
        }, fixedThreadPool);

        log.info("final result = " + thenCombineAsync.get());
    }

在这里插入图片描述
在这里插入图片描述
任意一个任务有异常,都不会进入applyToEitherAsync方法

3、@Async

基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作

启用@Async注解

package com.yzm.thread.async;

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync // 开启异步调用功能,即使@Async注解生效
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

    @Bean(name = "default_async_pool", destroyMethod = "shutdown")
    public ThreadPoolTaskExecutor defaultAsyncPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置线程池前缀:方便排查
        executor.setThreadNamePrefix("default-async-");
        // 设置线程池的大小
        executor.setCorePoolSize(10);
        // 设置线程池的最大值
        executor.setMaxPoolSize(15);
        // 设置线程池的队列大小
        executor.setQueueCapacity(250);
        // 设置线程最大空闲时间,单位:秒
        executor.setKeepAliveSeconds(3000);
        // 饱和策略
        // AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
        // CallerRunsPolicy:若已达到待处理队列长度,将由主线程直接处理请求
        // DiscardOldestPolicy:抛弃旧的任务;会导致被丢弃的任务无法再次被执行
        // DiscardPolicy:抛弃当前任务;会导致被丢弃的任务无法再次被执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        return executor;
    }

    @Bean(name = "another_async_pool", destroyMethod = "shutdown")
    public ThreadPoolTaskExecutor anotherAsyncPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("another-task-");
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(6);
        executor.setQueueCapacity(5);
        executor.setKeepAliveSeconds(10);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        return executor;
    }

    /**
     * 自定义异步线程池,若不重写,则使用默认的
     */
    @Override
    public Executor getAsyncExecutor() {
        return defaultAsyncPool();
    }

    /**
     * 1.无参无返回值方法
     * 2.有参无返回值方法
     * 返回值为void的, 通过IllegalArgumentException异常, AsyncUncaughtExceptionHandler处理异常
     * 3.有参有返回值方法
     * 返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
     * 或者在调用方在调用Future.get时捕获异常进行处理
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("正在处理无返回值的@Async异步调用方法");
        return (throwable, method, objects) -> {
            log.info("Exception message - " + throwable.getMessage());
            log.info("Method name - " + method.getName());
            for (Object param : objects) {
                log.info("Parameter value - " + param);
            }
        };
    }

}
package com.yzm.thread.async;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;

@Slf4j
@Component
public class AsyncService {

    /**
     * 1.无参无返回值方法
     * 最简单的异步调用,返回值为void
     */
    @Async
    public void async() {
        log.info("无参无返回值方法,通过观察线程名称以便查看效果");
//        int a = 1 / 0;
    }

    /**
     * 2.有参无返回值方法
     * 指定线程池
     *
     * @param i 传入参数
     */
    @Async("another_async_pool")
    public void async(int i) {
        log.info("有参无返回值方法, 参数={}", i);
    }

    /**
     * 3.有参有返回值方法
     *
     * @param i 传入参数
     * @return Future
     */
    @Async
    public Future<String> asyncReturn(int i) throws InterruptedException {
        log.info("有参有返回值方法, 参数={}", i);
//        int a = 1 / 0;
        Thread.sleep(100);
        return new AsyncResult<String>("success:" + i);
    }

    /**
     * @Async  必须不同类间调用:
     */
    public void D() {
        log.info("在同类下调用 @Async 方法是同步执行的");
        async();
    }
}

调用无参无返回值的异步方法

@Component
public class AsyncDemo {

    private final AsyncService asyncService;

    public AsyncDemo(AsyncService asyncService) {
        this.asyncService = asyncService;
    }

    @PostConstruct
    public void demo() {
        asyncA();
    }

    public void asyncA() {
        asyncService.async();
    }
}

在这里插入图片描述

调用有参无返回值的异步方法并指定线程池

AsyncService类

    /**
     * 2.有参无返回值方法
     * 指定线程池
     *
     * @param i 传入参数
     */
    @Async("another_async_pool")
    public void async(int i) {
        log.info("有参无返回值方法, 参数={}", i);
    }

AsyncDemo类

    @PostConstruct
    public void demo() {
//        asyncA();
        asyncB(1);
    }

    public void asyncA() {
        asyncService.async();
    }

    public void asyncB(int i) {
        asyncService.async(i);
    }

在这里插入图片描述

调用有参有返回值的异步方法

    public void asyncC(int i) {
        try {
            Future<String> future = asyncService.asyncReturn(i);
            // 这里使用了循环判断,等待获取结果信息
            while (true) {
                // 判断是否执行完毕
                if (future.isDone()) {
                    System.out.println("执行完毕,结果为:" + future.get());
                    break;
                }
                System.out.println("还未执行完毕,请稍等。。。");
                Thread.sleep(1000);
            }
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("异步调用失败");
            e.printStackTrace();
        }
    }

在这里插入图片描述

调用方法内部调用一个异步方法是不行的,仍是同步调用

AsyncService类

    /**
     * @Async  必须不同类间调用:
     */
    public void D() {
        log.info("在同类下调用 @Async 方法是同步执行的");
        // 调用本类的异步方法
        async();
    }

AsyncDemo类

	public void asyncD() {
        asyncService.D();
    }

在这里插入图片描述

异常处理

AsyncConfig类

	// 可处理无返回值的异步方法异常
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("正在处理无返回值的@Async异步调用方法");
        return (throwable, method, objects) -> {
            log.info("Exception message - " + throwable.getMessage());
            log.info("Method name - " + method.getName());
            for (Object param : objects) {
                log.info("Parameter value - " + param);
            }
        };
    }

AsyncService类

    /**
     * 1.无参无返回值方法
     * 最简单的异步调用,返回值为void
     */
    @Async
    public void async() {
        log.info("无参无返回值方法,通过观察线程名称以便查看效果");
        int a = 1 / 0;
    }

AsyncDemo类

    @PostConstruct
    public void demo() {
        asyncA();
//        asyncB(1);
//        asyncC(11);
//        asyncD();
    }

    public void asyncA() {
        asyncService.async();
    }

在这里插入图片描述
有返回值的异步方法异常,需要手动try{}catch(){}处理

事务处理机制

在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
例如:
方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
方法B,使用了@Async来标注, B中调用了C,C使用@Transactional做了标注,则可实现事务控制的目的。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐