springboot CompletableFuture异步线程池
七大参数:1丶corePoolSize :[5] 核心线程数【一直存在除非(allowCoreThreadTimeOut)】;线程池,创建好后就准备好5个线程 Threadthread = new Thread() ; thread.start();2丶maximumPoolSize : [200] 最大线程数;控制资源3丶keepAliveTime :存活时间。如果当前的线程数量大于core数量
·
初始化异步的4种方法
1丶继承Thread
2丶实现Runnable
3丶实现Callable接口+Future (可以拿到返回结果,可以处理异常 jdk1.5之后)
4丶线程池【ExecutorService】(实际开发中使用)
方式1和2 :主线程无法获取线程的运算结果。
方式3:主线程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式4:线程池性能稳定,控制资源,也可以获取执行结果,并捕获异常。
线程池七大参数:
- corePoolSize :[5] 核心线程数【一直存在除非 (allowCoreThreadTimeOut会回收)】;线程池,创建好后就准备好5个线程 Thread thread = new Thread() ;并没有启动 。只有往线程池提交任务后才会执行 thread.start();
- maximumPoolSize : [200] 最大线程数;控制资源
- keepAliveTime :存活时间。如果当前的线程数量大于core(核心)数量。释放空闲的线程(maximumPoolSize-corePoolSize)。只要线程空闲大于指定的keepAliveTime 。
- unit :时间单位。
- BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就回去队列里面取出新的任务继续执行。
new LinedBlockingDeque<>() :默认是Integer的最大值 会造成内存不足 - threadFactory:线程的创建工厂
- RejectedExecutionHandler handler:拒绝策列,如果队列满了执行相应的拒绝策略
6.1 DiscardOldestPolicy :新任务进来时丢弃掉没有执行的旧任务
6.2 CallerRunsPolicy:直接调用run方法同步执行
6.3 AbortPolicy:直接丢弃新任务并抛出异常
6.4 DiscardPolicy:直接丢弃不抛出异常
工作顺序:
- 线程池创建,准备好core数量的核心线程,准备接受任务
- core满了,就会将再进来的任务放在阻塞队列中,空闲的core就会自己去阻塞队列获取任务执行。
- 阻塞队列满了,就会直接开新线程执行,最大只能开到max指定的数量。
- max满了就用RejectedExecutionHandler 策略拒绝任务。
- max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放max-core这些线程
Executors
- newCachedThreadPool() core是0,所有都可以回收
- newFixedThreadPool() 固定大小 core= max ;都不可回收
- newScheduledThreadPllo() 定时任务的线程池
- newSingleThreadExecutor() 单线程的线程池,后台获取到任务去挨个执行
1 丶创建异步对象
CompletableFuture 提供了四个静态方法
//可以获取到返回值,可传入自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
//没有返回值,可传入自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
测试
public static ExecutorService service = Executors.newFixedThreadPool(10);
//没有返回值
CompletableFuture.runAsync(()->{
System.out.println("异步任务成功完成了");
},service);
//空入参只有返回值
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "返回值";
}, service);
2丶 计算完成时回调方法
//上一个任务完成和上一个任务用同一个线程
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//交给线程池重新启动一个线程
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
//任务执行完成后(只能感知)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//感知异常同时修改返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
测试代码
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
}, service).whenComplete((res,excption)->{
//虽然能得到异常消息但是不能修改返回结果
System.out.println("异步任务成功完成了"+res+"或者异常:"+excption);
}).exceptionally(throwable ->{
//处理异常并可以数据修改返回值
return 0;
});
3丶handle 方法(异常时处理并返回)
//和上一个任务使用同一个线程执行
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
//默认线程池开启线程执行
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
//指定自己的线程池开启线程执行
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
4 丶handle 测试
//方法执行完成后的处理不论成功还是失败
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
}, service).handle((res,thr)->{
//未出现异常 当然这里可以不写 为了掩饰
if(res!=null){
return 0;
}
//出现异常
if(thr!=null){
return 1;
}
return 0;
});
5丶 线程串行化方法(B任务需要A任务的执行结果后才能执行)
//A-->B-->C 感知上一步结果并返回最后一次的结果
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)
//B可以感知到A的返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor)
//A->完成后(有Async开启新的线程没有就是和A一个线程)感知不到上一步的执行结果
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
测试
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
//任务A
return 10;
}, service).thenRunAsync(() -> {
//感知不到 任务A的结果
},service);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
//任务A
return 10;
}, service).thenAcceptAsync((res) -> {
//可以感知到任务A的结果,但是不能返回数据
int i = res / 2;
},service);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
//任务A
return 10;
}, service).thenApplyAsync((res) -> {
//返回值以最后一次返回为准
return 10 + res;
}, service);
6丶两个任务组合 - 都要完成
以下三种方式 两个任务都必须完成,才触发该任务
//组合两个future,获取两个future 任务的返回结果,并返回当前任务的返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
//组合两个future,获取两个future 任务的返回结果,然后处理任务,没有返回值。
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor)
//组合两个future ,不需要获取future的结果,只需要两个 future处理完成后处理该任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
测试
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
//任务A
System.out.println(Thread.currentThread().getId());
System.out.println("任务一结束");
return 10/2;
}, service);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
//任务A
System.out.println(Thread.currentThread().getId());
System.out.println("任务二结束");
return "future02";
}, service);
//不能感知到结果
CompletableFuture<Void> future03void = future01.runAfterBothAsync(future02, () -> {
System.out.println("任务三执行结束");
}, service);
//可以感知获取到前两个任务结果
CompletableFuture<Void> future03No = future01.thenAcceptBothAsync(future02, (res1, res2) -> {
System.out.println("任务三执行结束");
}, service);
CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (res1, res2) -> {
System.out.println("任务三执行结束");
return res1 + "-" + res2;
}, service);
7 丶两个任务一个完成
//两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T>other,Function<? super T, U> fn,Executor executor)
//两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor)
//两个任务有一个执行完成,不需要获取future的结果,处理任务 ,也没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
测试
/**
* 两个任务有一个完成就执行三
*
*/
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
//任务A
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId());
System.out.println("任务一结束");
return 10/2;
}, service);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
//任务A
System.out.println(Thread.currentThread().getId());
System.out.println("任务二结束");
return "future02";
}, service);
//不感知结果,自己也没有返回值
future01.runAfterEitherAsync(future02,()->{
System.out.println("任务三执行结束");
},service);
//感知到结果,自己没有返回值
future01.acceptEitherAsync(future02,(res)->{
//感知到线程已经处理完成的结果
System.out.println("任务三执行结束"+res);
},service);
//感知到结果并返回自己的返回值
CompletableFuture<String> stringCompletableFuture = future01.applyToEitherAsync(future02, res -> {
return "任务三结果" + res;
}, service);
8 丶多任务组合
//等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
测试
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
//任务A
System.out.println(Thread.currentThread().getId());
System.out.println("任务一结束");
return 10/2;
}, service);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
//任务A
System.out.println(Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务二结束");
return "future02";
}, service);
//此方法是阻塞式等待不建议在这使用
//future01.get();
//future02.get();
//等待所有任务完成不会阻塞
CompletableFuture<Void> allOf= CompletableFuture.allOf(future01, future02);
//等待所有的都完成(两个任务同时执行)
allOf.get();
log.info("end");
System.out.println(future02.get()+""+future01.get());
//只有一个完成
CompletableFuture<Object> anyOf= CompletableFuture.anyOf(future01, future02);
anyOf.get();
System.out.println(anyOf.get());
更多推荐
已为社区贡献1条内容
所有评论(0)