CompletableFuture 异步编排示例详解

 更新时间:2022年09月03日 14:48:53   作者:fxtahe  
这篇文章主要为大家介绍了CompletableFuture 异步编排示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

从Future聊起

Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题。

但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

public interface Future<V> {

    //任务是否完成
    boolean isDone();
    
    //阻塞调用线程获取异步结果
    V get() throws InterruptedException, ExecutionException;

   //在指定时间内阻塞线程获取异步结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

假如存在多个异步任务相互依赖,一个或多个异步线程任务需要依赖上一个异步线程任务结果,并且多个异步任务能够组合结果,显然这种阻塞线程的方式并不能优雅解决。

我们更希望能够提供一种异步回调的方式,组合各种异步任务,而无需开发者对多个异步任务结果的监听编排。

为了解决优化上述问题,java8 新增了CompletableFutureAPI ,其大大扩展了Future能力,并提供了异步任务编排能力。

CompletableFuture

CompletableFuture实现了新的接口CompletionStage,并扩展了Future接口。查看类图

创建异步任务

CompletableFuture 提供了四种方法去创建一个异步任务。

  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):创建一个有返回值的异步任务实例
  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):创建一个有返回值的异步任务实例,可以指定线程池
  • static CompletableFuture<Void> runAsync(Runnable runnable):创建一个无返回值的任务实例
  • static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor):创建一个无返回值的任务实例,允许指定线程池

着几个方法本质上是有返回值和无返回值两种类型方法,supply方法可以获取异步结果,而run方法则无返回值,根据需要使用。

同时两种类型的方法均提供了指定线程池的重载,如果不指定线程池会默认使用ForkJoinPool.commonPool(),默认线程数为cpu核心数,建议使用自定义线程池的方式,避免线程资源竞争

一个简单样例

        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("无返回值任务"); });
        runAsync.get();
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "hello completableFuture");
        String result = supplyAsync.get();
        System.out.println(result);

我们依然可以通过get()方法阻塞获取异步结果任务,但是CompletableFuture主要还是用于异步回调及异步任务编排使用。

异步回调

在任务执行结束后我们希望能够自动触发回调方法,CompletableFuture提供了两种方法实现。

  • CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action):当上一阶段任务执行结束后,回调方法接受上一阶段结果或者异常,返回上一阶段任务结果
  • <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn):当上一阶段任务执行结束后,回调方法接受上一阶段结果或者异常,并最终返回回调方法处理结果
  • CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn):上一阶段任务出现异常后的回调,返回结果是回调函数的返回结果。

whenComplete 与 handle 区别:两者均接受上一阶段任务结果或异常,但是whenComplete 回调中没有返回值,所以其结果是上一阶段任务,而handle 最终返回的是其回调方法方法,其主要是BiConsumerBiFunction的区别。

异步编排

CompletionStage表示异步计算的一个阶段,当一个计算处理完成后会触发其他依赖的阶段。当然一个阶段的触发也可以是由多个阶段的完成触发或者多个中的任意一个完成触发。该接口定义了异步任务编排的各种场景,CompletableFuture则实现了这些场景。

可以把这些场景大致分为三类:串行、AND和OR。下面会逐个分析各个场景,接口中定义的以Async结尾的方法,指下一阶段任务会被单独提交到线程池中执行,后面不在赘述。

串行

当上一阶段任务执行完毕后,继续提交执行其他任务

  • <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):接收上一阶段任务结果,并可获取返回值。
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action):接收上一阶段任务结果,无返回值。
  • CompletableFuture<Void> thenRun(Runnable action):不接收上一阶段任务结果,并且无返回值。

T:上一个任务返回结果的类型 U:当前任务的返回值类型

AND

组合多个异步任务,当多个任务执行完毕继续执行其他任务

  • <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):上一阶段任务与other任务均执行结束,接收两个任务的结果,并可获取返回值
  • <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn): 使用上一阶段任务的结果,返回一个新的CompletableFuture实例
  • <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action):上一阶段任务与other任务均执行结束,接收两个任务的结果,无返回值
  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action):上一阶段任务与other任务均执行结束,不接收两个任务的结果,无返回值
  • static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待所有异步任务执行结束

T:上一个任务返回结果的类型 U:上一个other任务的返回值类型 V:当前任务返回值

OR

当多个任务中任意任务执行完成则继续执行其他任务。

  • <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn): 接收上一阶段任务与other任务最快执行完成的结果,并可获取返回值
  • CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action):接收上一阶段任务与other任务最快执行完成的结果,无返回值
  • CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action):上一阶段任务与other任务任意任务完成执行,不接收结果,无返回值
  • static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):组合多个任务,返回最快执行结束的任务结果

Future 机制扩展

CompletableFuture不仅实现了Future接口,同时对其进行了扩展,提供了更加优雅的实现。

  • T join() :与get()方法用法一致,阻塞调用线程获取结果,但是不会抛出具体异常,简化了使用上下文
  • T getNow(T valueIfAbsent):当任务结束返回任务结果,否则返回给定的结果valueIfAbsent。
  • boolean complete(T value):当任务未结束时设置给定的结果value并结束任务,已结束的任务不会生效。
  • boolean completeExceptionally(Throwable ex):当任务未结束时设置异常结果并结束任务,已结束的任务不会生效

CompletableFuture 实践

我们通过CompletableFuture实现一个经典的烧水程序。

我们可以把这个流程分为三个异步任务。

任务1:洗水壶->烧水

任务2:洗水壶->洗茶杯->拿茶叶

任务3:泡茶,需要等待任务1与任务2结束。

通过代码模拟实现

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "水壶";
        }).thenApply(e->{
            System.out.println("烧水");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "热水";
        });
        //洗水壶->洗水杯->拿茶叶
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗茶壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "茶壶";
        }).thenApply(e->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("洗水杯");
            return "水杯";
        }).thenApply(e->{
            System.out.println("拿茶叶");
            return "茶叶";
        });
        //泡茶
        CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
            System.out.println("泡茶");
            return "茶";
        });
        String tea = task3.join();
        System.out.println(tea);

以上就是CompletableFuture 异步编排示例详解的详细内容,更多关于CompletableFuture 异步编排的资料请关注脚本之家其它相关文章!

相关文章

  • SpringBoot多级缓存实现方案总结

    SpringBoot多级缓存实现方案总结

    所谓多级缓存,是指在整个系统架构的不同系统层面进行数据缓存,以提升访问速度,多级缓存就是为了解决项目服务中单一缓存使用不足的缺点,本文我们将给大家总结了SpringBoot多级缓存实现方案,需要的朋友可以参考下
    2023-08-08
  • 解决Eclipse打开.java文件异常,提示用系统工具打开的问题

    解决Eclipse打开.java文件异常,提示用系统工具打开的问题

    这篇文章主要介绍了解决Eclipse打开.java文件异常,提示用系统工具打开的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • 详解Java异常处理中throw与throws关键字的用法区别

    详解Java异常处理中throw与throws关键字的用法区别

    这篇文章主要介绍了详解Java异常处理中throw与throws关键字的用法区别,这也是Java面试题目中的常客,需要的朋友可以参考下
    2015-11-11
  • SpringBoot使用TraceId进行日志追踪的实现

    SpringBoot使用TraceId进行日志追踪的实现

    本文主要介绍了SpringBoot使用TraceId进行日志追踪的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-01-01
  • IntelliJ IDEA中折叠所有Java代码,再也不怕大段的代码了

    IntelliJ IDEA中折叠所有Java代码,再也不怕大段的代码了

    今天小编就为大家分享一篇关于IntelliJ IDEA中折叠所有Java代码,再也不怕大段的代码了,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-10-10
  • java 中JDBC连接数据库代码和步骤详解及实例代码

    java 中JDBC连接数据库代码和步骤详解及实例代码

    这篇文章主要介绍了java 中JDBC连接数据库代码和步骤详解及实例代码的相关资料,需要的朋友可以参考下
    2017-02-02
  • Spring Cloud Gateway重试机制原理解析

    Spring Cloud Gateway重试机制原理解析

    这篇文章主要介绍了Spring Cloud Gateway重试机制原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Java如何处理数据成为树状结构

    Java如何处理数据成为树状结构

    这篇文章主要介绍了Java如何处理数据成为树状结构问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • Mybatis返回数组的两种实现方式

    Mybatis返回数组的两种实现方式

    这篇文章主要介绍了Mybatis返回数组的两种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • Java package编译乱码问题解决

    Java package编译乱码问题解决

    这篇文章主要介绍了Java package编译乱码问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05

最新评论