Java如何利用CompletableFuture描述任务之间的关系

 更新时间:2023年07月30日 08:30:43   作者:Shawn_Shawn  
Java如何根据线程的执行结果执行下一步动作呢,Future的另一个实现类CompletableFuture能够优雅的解决异步化问题,下面就跟随小编一起了解一下吧

java线程-如何获取到线程执行的结果一文中讲解了Future的用法和实现原理,Future主要用于获取线程执行的结果,那么如何根据线程的执行结果执行下一步动作呢?Future的另一个实现类CompletableFuture能够优雅的解决异步化问题。

CompletableFuture

CompletableFuture是java8新增的,这个类实现了两个接口,一个是Future接口,一个是CompletionStage接口。

CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

CompletionStage

CompletionStage是java8新增的接口,用于异步执行中的阶段处理。CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等,一般来说要执行下一个阶段都需要上一个阶段正常完成,当然这个类也提供了对异常结果的处理接口。

任务与任务之间是有联系关系的,比如串行关系,并行关系,AND,OR等关系。

描述串行关系

CompletionStage接口里面描述串行关系,主要是thenApplythenAcceptthenRunthenCompose这四个系列的接口。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, 
                                             Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, 
                                             Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action, 
                                          Executor executor);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
                                               Executor executor);
  • thenApply系列方法,入参类型是接口 Function<T, R>,这个接口里与CompletionStage相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>
  • thenAccept系列方法,入参类型是接口Consumer<T>,这个接口里与CompletionStage相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持回值,所以thenAccept系列方法返回的是CompletionStage<Void>
  • thenRun系列方法,入参类型是接口RunnableRunnable既不能接收参数也不支持返回值,所以thenRun系列方法返回的是CompletionStage<Void>
  • thenCompose系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
  • 这些方法里面Async代表的是异步执行fnconsumer或者action
// 描述串行关系
// 期待输出
// hello world
// Hello CompletableFuture
private static void serial() throws Exception {
  CompletableFuture<Void> t1 =
    CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenApply(String::toLowerCase)
    .thenAccept(System.out::println);

  CompletableFuture<Void> t2 =
    CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " CompuletableFuture"))
    .thenAccept(System.out::println);

  t1.thenRun(() -> t2.join());
  t1.join();
}

描述AND聚合关系

CompletionStage接口里面描述AND聚合关系,主要是thenCombinethenAcceptBothrunAfterBoth这三个系列的接口。

public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, 
                                             BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
                                                 BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
                                                 BiFunction<? super T,? super U,? extends V> fn,
                                                 Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, 
                                                BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
                                                     BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
                                                     BiConsumer<? super T, ? super U> action,
                                                     Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
                                              Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                   Runnable action);
  • thenAcceptBoth系列方法,入参包括CompletionStage以及BiConsumer<T,U>,这一组函数是用来接受两个CompletableFuture的返回值,并将其组合到一起。
  • thenCombine系列方法,与thenAcceotBoth类似,与thenAcceptBoth不同的是,thenCombine将两个任务结果合并后会返回一个全新的值作为出参。
  • runAfterBoth系列方法,入参类型为CompletionStage<?>Runnable
/*
hello world
Hello CompuletableFuture
thenCombine end
thenAcceptBoth end
runAfterBoth end
*/
private static void and() throws Exception {
  CompletableFuture<Void> t1 =
      CompletableFuture.supplyAsync(() -> "Hello")
          .thenApply(s -> s + " World")
          .thenApply(String::toLowerCase)
          .thenAccept(System.out::println);

  CompletableFuture<Void> t2 =
      CompletableFuture.supplyAsync(() -> "Hello")
          .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " CompuletableFuture"))
          .thenAccept(System.out::println);

  CompletableFuture<String> t3 = t1.thenCombine(t2, (__, s) -> "end");
  t3.thenAccept(s -> System.out.println("thenCombine " + s));
  t3.join();

  CompletableFuture<Void> t4 =
      t1.thenAcceptBoth(t2, (__, s) -> System.out.println("thenAcceptBoth end"));
  t4.join();

  CompletableFuture<Void> t5 = t1.runAfterBoth(t2, () -> System.out.println("runAfterBoth end"));
  t5.join();
}

描述OR聚合关系

CompletionStage接口里面描述OR聚合关系,主要是applyToEitheracceptEitherrunAfterEither系列的接口。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,
                                            Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
                                                 Function<? super T, U> fn);

public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
                                                 Function<? super T, U> fn,
                                                 Executor executor);

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,
                                          Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
                                               Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
                                               Consumer<? super T> action,
                                               Executor executor);

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                            Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);
  • applyToEither系列方法,入参类型为CompletionStageFunction
  • acceptEither系列方法,入参类型为CompletionStageConsumer
  • runAfterEither系列方法,入参类型为CompletionStageRunnable
private static void or() {
  CompletableFuture<Void> t1 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(10);
              System.out.println("t1");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });

  CompletableFuture<Void> t2 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(200);
              System.out.println("t2");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });

  CompletableFuture<String> t3 = t1.applyToEither(t2, s -> "applyToEither end");
  t3.thenAccept(System.out::println);
  t3.join();

  CompletableFuture<Void> t4 = t1.acceptEither(t2, s -> System.out.println("acceptEither end"));
  t4.join();

  CompletableFuture<Void> t5 =
      t1.runAfterEither(t2, () -> System.out.println("runAfterEither end"));
  t5.join();
}

描述异常关系

虽然fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,
                                            Executor executor);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
                                          Executor executor);
  • exceptionally系列方法,入参类型为Function<Throwable, T>
  • whenComplete系列方法,入参类型为BiConsumer<T, Throwable>
  • handle系列方法,入参类型为BiFunction<T, Throwable, U>
private static void exception() {
  int i = 0;
  CompletableFuture<String> t1 =
      CompletableFuture.supplyAsync(
          () -> {
            System.out.println("t1");
            if (i > 0) {
              return "t1";
            }
            throw new RuntimeException("error");
          });

  //    CompletableFuture<Void> t2 =
  //        t1.exceptionally(e -> e.getMessage())
  //            .thenAccept(System.out::println)
  //            .thenRun(() -> System.out.println("exceptionally end"));
  //    t2.join();

  //    CompletableFuture<String> t3 =
  //        t1.whenComplete(
  //                (s, e) -> {
  //                  if (e != null) {
  //                    System.out.println(e.getMessage());
  //                  } else {
  //                    System.out.println(s);
  //                  }
  //                });
  //    t3.join();

  CompletableFuture<Void> t4 =
      t1.handle(
              (s, e) -> {
                if (e != null) {
                  return e.getMessage();
                }
                return s;
              })
          .thenAccept(System.out::println);
  t4.join();
}

其余API

allOf() 与anyOf() 也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf() :给定一组任务,等待所有任务执行结束;
  • anyOf() :给定一组任务,等待其中任一任务执行结束。
private static void all() {
  long startTime = System.currentTimeMillis();
  CompletableFuture<Void> t1 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(5);
              System.out.println("t1");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });

  CompletableFuture<Void> t2 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(15);
              System.out.println("t2");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });
  CompletableFuture<Void> t3 =
      CompletableFuture.allOf(t1, t2)
          .thenRun(
              () ->
                  System.out.println(
                      "all end cost " + (System.currentTimeMillis() - startTime) + " ms."));
  t3.join();
}

private static void any() {
  long startTime = System.currentTimeMillis();
  CompletableFuture<Void> t1 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(5);
              System.out.println("t1");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });

  CompletableFuture<Void> t2 =
      CompletableFuture.runAsync(
          () -> {
            try {
              TimeUnit.SECONDS.sleep(15);
              System.out.println("t2");
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          });

  CompletableFuture<Void> t3 =
      CompletableFuture.anyOf(t1, t2)
          .thenRun(
              () ->
                  System.out.println(
                      "all end cost " + (System.currentTimeMillis() - startTime) + " ms."));
  t3.join();
}

经典案例

利用CompletableFuture来实现烧水泡茶程序,来自极客时间-java并发编程课程的案例

分工:

  • 任务1负责洗水壶,烧开水。
  • 任务2负责洗茶壶,洗茶杯,拿茶叶。
  • 任务1和任务2并行。
  • 任务1和任务3都完成后,启动任务三泡茶。

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);
​
  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);
​
  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);
​
  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());
​
void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}

一次执行结果:

T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

到此这篇关于Java如何利用CompletableFuture描述任务之间的关系的文章就介绍到这了,更多相关Java CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • RabbitMQ交换机使用场景和消息可靠性总结分析

    RabbitMQ交换机使用场景和消息可靠性总结分析

    这篇文章主要为大家介绍了RabbitMQ交换机使用场景和消息可靠性总结分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Nacos配置中心搭建及动态刷新配置及踩坑记录

    Nacos配置中心搭建及动态刷新配置及踩坑记录

    这篇文章主要介绍了Nacos配置中心搭建及动态刷新配置及踩坑记录,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • Java设计模式之java状态模式详解

    Java设计模式之java状态模式详解

    这篇文章主要介绍了Java设计模式之状态模式定义与用法,结合具体实例形式详细分析了Java状态模式的概念、原理、定义及相关操作技巧,需要的朋友可以参考下
    2021-09-09
  • 零基础全面解析SpringBoot配置文件(结合验证码案例)

    零基础全面解析SpringBoot配置文件(结合验证码案例)

    文章介绍配置文件的作用及SpringBoot中properties和yml格式的使用,比较其优缺点,并演示基于Hutool的验证码生成与校验实现,本文结合验证码案例给大家详细讲解,感兴趣的朋友跟随小编一起看看吧
    2025-09-09
  • @Valid和@Validated注解校验以及异常处理方式

    @Valid和@Validated注解校验以及异常处理方式

    在Javaweb开发中,防止数据库恶意攻击是至关重要的,尽管前端校验可以起到一定的筛选作用,但通过工具如postman直接对后端发起请求的情况仍然需要后端进行严格的数据校验,Java生态下,@Valid注解配合SpringBoot提供了一个便捷高效的后端数据校验方案
    2024-11-11
  • Mybatis-Plus 映射匹配兼容性的问题解决

    Mybatis-Plus 映射匹配兼容性的问题解决

    本文主要介绍了Mybatis-Plus 映射匹配兼容性的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • spring配置扫描多个包问题解析

    spring配置扫描多个包问题解析

    这篇文章主要介绍了spring配置扫描多个包问题解析,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • Java基础之关键字final详解

    Java基础之关键字final详解

    这篇文章主要介绍了Java基础之关键字final详解,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有非常好的帮助,需要的朋友可以参考下
    2021-05-05
  • Java实现产生随机字符串主键的UUID工具类

    Java实现产生随机字符串主键的UUID工具类

    这篇文章主要介绍了Java实现产生随机字符串主键的UUID工具类,涉及java随机数与字符串遍历、转换等相关操作技巧,需要的朋友可以参考下
    2017-10-10
  • Spring中的@AliasFor标签的用法及说明

    Spring中的@AliasFor标签的用法及说明

    Spring框架中的@AliasFor标签用于定义注解属性的别名,使得在同一个注解中可以使用相同的属性值,从而提高代码的可读性和可维护性
    2026-03-03

最新评论