Java多线程获取异步执行结果的多种姿势
前言 由于java虚拟机对于进程中线程映射为内核级线程即真多线程的映射方式,因此在各个优秀的开源框架、服务容器中多线程技术使用非常广泛,我们自己的业务项目中,也经常遇到需要异步执行的任务,在不方便引入消息中间件的情况下,直接另开一个线程执行即可,但是如果遇到一些需要获取执行结果的业务,就只能为了多线程异步方式再引入线程间通信逻辑,便显得捉襟见肘
前言
由于在大多数操作系统中java虚拟机的线程实现为内核级线程
即真多线程的映射方式,因此在各个优秀的开源框架、服务容器中多线程技术使用非常广泛。我们自己的业务项目中,也经常遇到需要异步执行的任务,在不方便引入消息中间件的情况下,直接另开一个线程执行即可,但是如果遇到一些需要获取执行结果的业务,就只能为了多线程异步方式再引入线程间通信逻辑,便显得捉襟见肘。
jdk 1.5以后新引入了Future和Callable,jdk1.8后又引入了CompletableFuture、stream工具类,可以方便的获取异步线程的执行结果,本文也从多种角度来获取异步线程执行结果,并且在线程池配置上提供了相关建议。
正文
无需获取异步结果的多线程执行
如下直接使用线程池+Runnable执行的多线程异步操作
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setAwaitTerminationSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
for (int i = 0;i<100;i++){
int finalI = i;
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("输出"+ finalI);
}
});
}
如果我们需要获取多个线程的执行结果,则可以使用Future和stream来辅助完成,下面就用几个例子来完成对于List中的字符串添加后缀的简单操作进行异步执行获取结果的demo。
parallel stream 获取异步结果
以下为使用java8 parallelStream来完成的异步分发任务demo,在parallelStream中使用线程安全的容器对结果进行保存。
值得注意的是,为什么parallelStream可以使用容器对结果进行保存,但上面的线程池ThreadPoolTaskExecutor却不能在异步线程中将结果存入容器后直接使用?因为parallelStream在进行异步线程任务分发的这行代码(parallelStream()),是阻塞执行的,相当于只有当list中全部数据被执行完后,才会执行到下一行,所以我们不用担心这些异步线程在何时才能执行完;而Executor则是直接提交任务后就不再阻塞,直接执行下一行,因此对于子线程的异步执行进度是无法感知的。
List<String> list = new ArrayList<>();
for (int i = 1;i<100;i++){
list.add(String.valueOf(i));
}
//需要线程安全的容器记录数据
Vector<String> result =new Vector<>();
//使用parallelStream分发异步任务
list.parallelStream().forEach(str->{
try {
Thread.sleep(1000);
result.add(str+"字符串被拼接");
System.out.println(str+"字符串被拼接");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//全部任务为阻塞执行,之后才执行到这一行
System.out.println(result.size());
result.forEach(str->{
System.out.println("获取结果"+str);
});
parallel stream实现
java8集合中使用ParallelStream是对集合中元素的并行map操作,而线程则会使用jvm原生的forkJoinPool线程池来提供worker,forkJoinPool中的线程个数与当前运行宿主机的核数相同,即8核16g机器为8个线程。
如果我们当前服务qps不高,那么使用Parallel对集合中数据进行map任务后能够明显降低业务耗时开销,但是如果qps一旦提升到一定的阈值,每个集合中的元素处理都要依赖于forkJoinPool中线程资源的释放,如果是io密集型的操作,线程长时间阻塞在等待io的状态,后续集合中的任务也无法处理,只能一直阻塞,性能耗时会适得其反,还不如这些事都由主线程来做更合适一点。如下图所示:
应对上述问题,io密集型的应用,我们一般会考虑增大线程池中线程数量,由于ParallelStream使用的是jvm原生的forkJoinPool,需要改动可以在启动时使用参数来控制,如下使用X来控制;或者显示设定系统参数来完成。
Djava.util.concurrent.ForkJoinPool.common.parallelism=20
或
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
future 获取异步结果
相对于ParallelStream针对集合类提供的简单直接的方式,future作为jdk1.5后引入的特性,支持了异步线程提交后跟踪收集对应结果的功能。如下所示,我们对于list中的String字符串进行拼接的操作。
ExecutorService executorService = Executors.newFixedThreadPool(20);
List<String> list = new ArrayList<>();
for (int i = 1; i < 100; i++) {
list.add(String.valueOf(i));
}
List<Future> futureList = new ArrayList<>();
for (String str : list) {
//使用executorService提交异步线程任务,future整合结果
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println(str + "字符串被拼接");
return str + "字符串被拼接";
}
});
futureList.add(future);
}
//future.get时才为阻塞获取线程执行结果的时候
for (Future future : futureList) {
System.out.println(future.get());
}
future.get方法执行时,才是阻塞获取对应线程中return执行结果的时候,因此相对于ParallelStream去执行集合中元素的异步迭代操作,future可以更灵活的控制在何时去执行阻塞的获取线程执行结果。
CompletableFuture获取异步执行结果
上述方式都是较为简单的迭代list元素的任务分配,但是如果我们有更复杂的需求,比如这些异步线程间的数据是相互依赖、有组合关系或者依据其他线程执行结果再做条件判断后的异步执行,那么就需要引入jdk1.8的新特性CompletableFuture。我这里项目中没有这样的场景,因此也只做一些常用api搬运简单介绍一下:
顺序的异步执行
线程b需要等到线程a执行结束后执行。
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "1";
}).thenRunAsync(() -> {
System.out.println("2线程执行");
});
线程间依赖数据执行结果
线程b需要等到线程a执行结束后,获取a的结果再执行。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "1";
}).thenApplyAsync(v -> v + "字符串拼接");
System.out.println(future.get());
依赖多个异步线程执行结果
线程c需要等到线程a和线程b执行结束后,获取两个线程执行结果后再执行,下述代码仅在future13.get()这一步是阻塞的。
CompletableFuture<Integer> future11 = CompletableFuture.supplyAsync(() -> {
return 1;
});
CompletableFuture<Integer> future12 = CompletableFuture.supplyAsync(() -> {
return 2;
});
CompletableFuture<Integer> future13 = future11.thenCombineAsync(future12,(v1,v2) -> {
return v1 + v2;
});
System.out.println(future13.get());
CompletableFuture还有applyToEither等方法。而在获取结果方面有如下,其中加入时间的方法为超时时间,如果超时则会发出一个TimeoutException异常到主线程。
public T get()
public T get(long timeout, TimeUnit unit)
多线程异步编程注意的点
多线程异步编程可以在一定程度上提高业务性能、解耦非主流程业务等优点,但也需要注意线程安全和线程池中的线程配比问题。
- 在多个异步线程中操作数据,需要使用线程安全的集合或对象;
- 线程池中线程个数配置是需要耐心分析和实测的活,跟业务并发度、异步线程执行逻辑(io密集或计算密集)都有关系。
假设服务器核心数为N。控制变量的来讲,如果业务并发度很低,线程池的配置仅跟io密集或者计算密集业务相关,计算密集型业务的设置核心数建议N+2;io密集型业务,就需要根据io等待时间灵活平衡线程池个数,一般可以设置为核心数的K(k>=2)倍,如果等待时间短,就设置两倍即可,等待时间长设置4到5倍;
如果业务并发度很高,那么针对于计算密集型的业务真就不建议使用多个线程去异步处理了,由于有限的计算资源,还不如主线程直接做了效率高;对于io密集型的业务,可以适当调高线程池线程个数配比。
\ | 业务并发qps低 | 业务并发qps高 |
---|---|---|
io密集型 | 2N——4N | 4N——8N |
计算密集型 | N+1 | 不建议使用线程池提交异步任务 |
如果业务高峰低谷期qps相差较多又要追求效率的极致,可以考虑动态调节线程池的参数,参照美团技术动态线程池的实现方案。
总结
本文提到了Java技术栈中异步线程获取执行结果的三种方式,各有优劣。
- parallel stream最简单直接,但是对于线程池的配置使用却相对隐晦,需要修改启动配置或者修改系统全局配置;另外由于stream语句的阻塞性,也无法灵活的处理集合中不同数据获取时机不一样的问题;同时收集结果时需要注意使用线程安全容器。
- future api相对简单,可以定制线程池,并且可以在提交异步任务后自行处理阻塞获取数据的时机。
- CompletableFuture 作为future的新实现,有更加灵活的多个异步线程配合的方式,如线程间顺序执行、结果聚合、数据依赖等。
除了一些基本api的介绍,还有对于线程池的把握,如何根据业务定义线程池的线程个数配比,使得压榨服务器资源的同时又不让频繁的线程上下文切换成为性能隐患,更多的需要大家根据自身业务进行判断,如在配置线程池时根据业务tps进行相关压测来寻找最合适的参数等。
参考
更多推荐
所有评论(0)