Java中CompletableFuture四种调用模式的实现

 更新时间:2025年08月08日 09:26:01   作者:桦说编程  
CompletableFuture有就地执行、异步执行等三种模式,就地执行适合轻量级任务,但可能导致线程阻塞,异步执行有上下文切换开销,本文介绍了第四种调用模式,解决了以上问题

三种调用模式

CompletableFuture(以下简称CF)提供了三种调用模式,分别是就地执行、异步使用默认执行器执行、异步指定执行器执行。

就地执行指的是回调在当前线程中执行,调用thenApply、thenCompose等方法时,如果当前 CF 已经执行完成,会立即执行回调,称为当前执行,执行时刻为当前(now);如果未完成,则由完成 CF 的线程执行。

如下分别是立即执行和异步执行的例子:

var cf = CompletableFuture.completed(1);
cf.thenApply(x -> x + 1)
  .thenRun(x -> System.out.println(x))
  .join();

以上代码全程同步。

var cf = new CompletableFuture<Integer>();
cf.thenApply(x -> x + 1)
  .thenRun(x -> System.out.println(x));
new Thread(() -> cf.complete(1)).start();
Uninterruptible.sleep(1, TimeUnit.SECONDS);

thenApply、thenRun在调用时,cf未完成,无法立刻执行,其执行在完成cf的线程,也就是新创建的线程中。

异步执行指的是回调任务的执行必定在执行器中执行,默认执行器为Java提供的commonPool线程池,当然也可以通过重写defaultExecutor实现调用指定的线程池。

var cf = CompletableFuture.completed(1);
cf.thenApplyAsync(x -> x + 1)
  .thenRunAsync(x -> System.out.println(x))
  .join();
Uninterruptible.sleep(1, TimeUnit.SECONDS);

以上代码中打印操作在公共线程池中执行。

比较

就地执行性能最好,可以完全避免线程上下文切换,适合执行一些轻量级任务。缺点是使用不当时,会阻塞当前线程;可能会造成“线程泄露”,导致线程池中的线程没有及时归还。

异步执行反之。

第 4 种调用模式

线程池中任务执行有一条原则:尽最大努力交付。意思是如果任务提交时没有拒绝,没有抛出拒绝执行等异常,通常来说通过了信号量、限流器、执行时间、线程数等诸多限制,后续的执行应该不作额外限制,且努力完成;而不是等执行过程中再抛出类似拒绝服务等异常。反过来说,如果当前任务提交时,任务不能执行,就应该拒绝执行。这条简单的原则可以避免考虑复杂的问题,比如反压、取消机制等,也能够应对大多数的业务场景。

对于非轻量级任务,例如 A -> B,表示任务A执行完成后执行任务B,常规的线程池实现有一个问题,B任务的提交不一定立即执行,可能遇到排队(进入阻塞队列)甚至超时等情况,最终导致整个任务的滞后。此时如果能就地执行最好。

如果选择就地执行策略,解决了以上问题,但是可能会导致CF已完成后执行的当前线程阻塞。这时最好有执行器执行任务,而不是占用当前线程。

最近CFFU类库提供LLCF#relayAsync0,完美解决了以上痛点。LL表示low level,对于其的正确使用要求开发人员对CompletableFuture有着充分的理解。relay的含义是接力,这里指的是

  • relay Async 接力异步
  • Async 词尾,保证一定是异步(和CF命名表义 一样)

异步时(不阻塞调用逻辑),用前个computation的线程接力执行,不使用新线程,避免了上下文切换开销。

例子

relayAsync0 签名如下:

public static <T, F extends CompletionStage<?>> F relayAsync0(
        CompletionStage<? extends T> cfThis,
        Function<CompletableFuture<T>, F> relayComputations, Executor executor)

需要注意传入的回调任务不是普通的Function,而是入参CF,出参 CompletionStage,也就是说我们需要传入对CF的回调。比如:

cf -> cf.thenApply(...)
cf -> cf.thenCompose(...)
cf -> cf.thenRun(...)

该方法使用时和thenApplyAsync很像,只不过由实例方法调用改成了静态方法调用,回调参数为对CF的回调。

以下代码引用自CFFU作者 李鼎 | Jerry Lee,详细说明四种调用模式的用法:

public class RelayAsyncDescriptionAndExample {
    static void executeComputationsOfNewStage(CompletableFuture<String> cf) {

        // ================================================================================
        // Default execution
        // ================================================================================

        cf.thenApply(s -> {
            // a simulating long-running computation...
            sleep(1000);
            // if input cf is COMPLETED when computations execute,
            // executes the long time computation SYNCHRONOUSLY (aka. in the caller thread);
            // this SYNCHRONIZED execution leads to BLOCKing sequential codes of caller... ⚠️

            return s + s;
        });

        // ================================================================================
        // Asynchronous execution of CompletableFuture(default executor or custom executor)
        // ================================================================================

        cf.thenApplyAsync(s -> {
            // a simulating long-running computation...
            sleep(1000);
            // always executes via an executor(guarantees not to block sequential code of caller).
            // if input cf is INCOMPLETE when computations execute,
            // the execution via an executor leads to ONE MORE thread switching. ⚠️

            return s + s;
        });

        // ================================================================================
        // How about the fourth way to arrange execution of a new stage's computations?
        // ================================================================================
        //
        // - if input cf is COMPLETED when computations execute, use "asynchronous execution" (via supplied Executor),
        //   won't block sequential code of caller ✅
        // - otherwise, use "default execution", save one thread switching ✅
        //
        // Let's call this way as "relay async".

        LLCF.relayAsync0(cf, f -> f.thenApply(s -> {
            // a simulating long-running computation...
            sleep(1000);
            // if input cf is COMPLETED, executes via supplied executor
            // if input cf is INCOMPLETE, use "default execution"

            return s + s;
        }), ForkJoinPool.commonPool());
    }
}

实现分析

public static <T, F extends CompletionStage<?>> F relayAsync0(
        CompletionStage<? extends T> cfThis,
        Function<CompletableFuture<T>, F> relayComputations, Executor executor) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    final F ret = relayComputations.apply(promise);

    final Thread callerThread = currentThread();
    final boolean[] returnedFromPeek0 = {false};

    LLCF.peek0(cfThis, (v, ex) -> {
        if (currentThread().equals(callerThread) && !returnedFromPeek0[0]) {
            // If the action is running in the caller thread(single same thread) and `peek0` invocation does not
            // return to caller(flag returnedFromPeek0 is false), the action is being executed synchronously.
            // To prevent blocking the caller's sequential code, use the supplied executor to complete the promise.
            executor.execute(() -> completeCf0(promise, v, ex));
        } else {
            // Otherwise, complete the promise directly, avoiding one thread switching.
            completeCf0(promise, v, ex);
        }
    }, "relayAsync0");

    returnedFromPeek0[0] = true;
    return ret;
}

说明:

  1. completeCf0方法可以将结果v或者异常ex设置到promise中
  2. peek0 近似等效于 whenComplete

分析:

  1. 可以通过引入新的CF,也就是 promise 实现线程传递,其他线程“完成”promise时,这个线程隐式传到了promise中,可以理解成隐式上下文。任何一个CF都带有一个隐式上下文。
  2. returnedFromPeek0 避免了异步调用但是恰好是同线程的问题,此时也应该实现relay语义,因为我们的目的是避免对当前线程的阻塞。returnedFromPeek0 天然线程安全,因为其访问总是在一个确定的线程内。
  3. else 代码块:就地执行,避免线程切换。

总结

到此这篇关于Java中CompletableFuture四种调用模式的实现的文章就介绍到这了,更多相关Java CompletableFuture 调用模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springCloud gateWay 统一鉴权的实现代码

    springCloud gateWay 统一鉴权的实现代码

    这篇文章主要介绍了springCloud gateWay 统一鉴权的实现代码,统一鉴权包括鉴权逻辑和代码实现,本文给大家介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • RabbitMQ基础概念之信道channel详解

    RabbitMQ基础概念之信道channel详解

    这篇文章主要介绍了RabbitMQ基础概念之信道channel详解,信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的,需要的朋友可以参考下
    2023-08-08
  • Java中的Native方法

    Java中的Native方法

    这篇文章主要介绍了Java中的Native方法,在本文中,我们将看到java中本机native方法的介绍。我们将看到它的基本语法及其工作原理。将有java代码示例展示native本机方法的使用,下面来看看文章的具体介绍
    2021-12-12
  • 关于Object中equals方法和hashCode方法判断的分析

    关于Object中equals方法和hashCode方法判断的分析

    今天小编就为大家分享一篇关于关于Object中equals方法和hashCode方法判断的分析,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Java日期时间类及计算详解

    Java日期时间类及计算详解

    这篇文章主要介绍了Java日期时间类及计算详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下,希望对你的学习有所帮助
    2022-07-07
  • Java数据类型超完整指南

    Java数据类型超完整指南

    Java数据类型可以分为基本数据类型和引用数据类型两大类,这篇文章主要介绍了Java数据类型的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2026-03-03
  • SpringBoot修改内置tomcat版本的操作步骤

    SpringBoot修改内置tomcat版本的操作步骤

    生产环境使用的外部部署Tomcat还是内置Tomcat由于版本安全漏洞,往往需要升级到指定的安全版本,本文演示一下SpringBoot升级内置的Tomcat版本,感兴趣的小伙伴跟着小编一起来看看吧
    2024-07-07
  • Java如何在PDF中添加ToolTip工具提示

    Java如何在PDF中添加ToolTip工具提示

    大家好,本篇文章主要讲的是Java如何在PDF中添加ToolTip工具提示,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-01-01
  • Mybatis many=@Many的传值问题解决

    Mybatis many=@Many的传值问题解决

    本文详细介绍了在MyBatis通过主查询将单个或多个值传递给子查询的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-11-11
  • Java如何判断一个字符串是否包含某个字符串

    Java如何判断一个字符串是否包含某个字符串

    这篇文章主要给大家介绍了关于Java如何判断一个字符串是否包含某个字符串的相关资料,在实际编程中,经常需要判断一个字符串中是否包含某个子串,需要的朋友可以参考下
    2023-07-07

最新评论