Java8 CompletableFuture使用及说明

 更新时间:2026年04月02日 17:07:26   作者:Jerry的技术博客  
文章介绍了Future和CompletableFuture接口及其方法,解释了它们的异同、使用场景与应用场景,示例了Future和CompletableFuture的使用方法和场景,强调了CompletableFuture的函数式编程特性

一、Future接口

1.1 Runnable与Callable

Runnable接口源自JDK1.1,它只有一个run()方法,该方法没有返回结果:

public interface Runnable {
    public abstract void run();
}

Callable接口是JDK1.5中添加,只有一个call()方法,该方法支持结果返回且可以抛出异常:

public interface Callable<V> {
    V call() throws Exception;
}

无论是Runnalble还是Callable任务实例,直接调用时不会新开启子线程,是在主线程中运行。

如果要在子线程中运行这些任务,需要将任务提交到Thread实例或线程池执行。

值得注意的是通过Thread实例执行Callable任务,Thread没有接受Callable入参的构造函数,因此,不能直接构造Callable任务的Thread实例。

可以通过构建FutureTask任务将Callable任务提供到Thread实例执行,因为FutureTask实现了Runnable接口,且其构造函数入参支持Callable类型。

	@Test
    public void testRunable() throws Exception {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                }
            }
        };

        Callable<Integer> c = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                try {
                    Thread.sleep(1000L);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                }
                return ThreadLocalRandom.current().nextInt(100);
            }
        };

        r.run();
        System.out.println(c.call());

        Thread t = new Thread(r);
        t.start();
        t.join(); // 不阻塞时,如果主线程比子线程先完成,则子线程中任务不会完整执行

        FutureTask<Integer> f = new FutureTask<>(c);
        Thread t2 = new Thread(f);
        t2.start();
        t2.join(); // 不阻塞时,如果主线程比子线程先完成,则子线程中任务不会完整执行
        System.out.println(f.get());

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(r);
        Future<Integer> future = executorService.submit(c);
        System.out.println(future.get()); // 不阻塞时,如果主线程比子线程先完成,则子线程中任务不会完整执行
    }
---
main
main
62
Thread-0
Thread-1
6
pool-1-thread-1
pool-1-thread-1
36

注意:

  • 通过junit对以上代码进行测试时,如果主线程比子线程先执行完成,则主线程执行完成后会程序会退出,意味着没有完成的子线程任务不会再继续执行。
  • 因此,如果需要子线程完整执行,则需要阻塞主线程。

1.2 Future接口

Future接口也是在JDK1.5中添加的,用于描述一个异步Callable任务的计算结果,如下所示。

Future接口一共有5个方法:

  • 通过cancel()方法停止任务;
  • 通过isCanceld()方法判断任务是否被停止;
  • 通过isDone()方法检查任务是否完成;
  • 提供了两个重载版本的get方法,用于阻塞调用线程,直到任务完成或超时。
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future只能通过阻塞或轮训的方式获取任务结果,阻塞违背异步变成的初衷,轮训的方式又耗费了CPU资源,通过Future获取异步任务结果的示里如下:

    @Test
    public void testFuture() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future<Integer>> futures = new ArrayList<>();
        List<Integer> results = new ArrayList<>();
        for(int i=0; i <= 3; i++) {
            final int j = i;
            Future<Integer> future = executorService.submit(() -> {
                try {
                    Thread.sleep(j * 100L);
                } catch (InterruptedException e) {
                }
                return ThreadLocalRandom.current().nextInt(100);
            });
            futures.add(future);
        }
        for(Future<Integer> future : futures) {
            while(true) {
                if(future.isDone() && !future.isCancelled()) {
                    results.add(future.get());
                    break;
                }
            }
        }
        System.out.println(results);
    }

二、CompletableFuture

JDK8中添加了ComletableFuture类,它实现了Future接口和CompletionStage接口,其中CompletionStage可以看做是一个异步任务执行过程的抽象,如下。

它简化了异步变成的复杂性,并提供了函数式变成的能力。

ComletableFuture提供了同步和异步的执行方式,其异步方式又支持外部声明的线程池,如果不提供外部线程池,则异步时默认使用ForkJoinPool.commonPool()。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}

CompletableFuture实现了Future接口,因此,也可以通过以前的方式阻塞或轮训获取结果:

// 同Future
public T get() throws InterruptedException, ExecutionException {
...
}

// 同Future
public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {
...
}

// 返回执行结果或抛出一个unchecked异常
public T join() {
...
}

// 如果执行结束则返回执行结果,否则返回valueIfAbsent
public T getNow(T valueIfAbsent) {
...
}

CompetableFuture.completedFuture是一个静态辅助类,用于返回一个计算好的CompletableFuture,示例:

    @Test
    public void testComletedFuture() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.completedFuture("result");
        System.out.println(future.get());
    }
---
result

通过以下四个静态方法为异步执行代码创建CompletableFuture对象:

// 接受Runnable任务,返回结果为空
public static CompletableFuture<Void> runAsync(Runnable runnable) {
...
}

// 接受Runnable任务,返回结果为空,指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
...
}

// 接受Supplier对象,可指定返回类型
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
...
}

// 接受Supplier对象,可指定返回类型,指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
...
}

CompletableFuture任务执行示例如下,可指定执行线程池,如不指定则使用ForkJoinPool.commonPool()。

    @Test
    public void testCreateSyncTask() throws InterruptedException, ExecutionException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "supplyAsync result";
        });
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
        }, executorService);
        System.out.println(f1.get());
    }
---
ForkJoinPool.commonPool-worker-1
supplyAsync result
pool-1-thread-1

Future和CompletableFuture提供的get()方法是阻塞的,为了获取任务结果同时不阻塞当前线程,可使用CompletionStage提供的方法实现任务异步处理,有以下4中处理方式:

// 上游任务完成后在当前主线程中同步执行处理,向下传递上游处理结果或异常
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
...
}

// 上游任务完成后在当前子线程中异步执行处理,向下传递上游处理结果或异常
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
...
}

// 上游任务完成后在当前主线程中同步执行处理,指定线程池,向下传递上游处理结果或异常
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
...
}

// 上游任务异常时处理,在主线程中完成
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
...
}

示例代码如下:

    @Test
    public void testWhenComplete() throws InterruptedException, ExecutionException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "test";
        }).whenCompleteAsync((result, exception) -> {
            if (result != null) {
                System.out.println(result);
            } else {
                exception.printStackTrace();
            }
        });
        System.out.println(f1.get());

        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "test";
        }).exceptionally((e) -> {
            e.printStackTrace();
            return "error";
        });
        System.out.println(f2.get());
    }
---
test
error
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
...

whenComplete方法入参是BiConsumer,它是一个纯消费者函数,不会修改返回值及其类型。

下面一组接口除了whenComplete的功能外,同时具备转换结果的功能,通过参数BiFunction实现:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
...
}

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
...
}

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
...
}

示例代码如下:

    @Test
    public void testHandle() throws InterruptedException, ExecutionException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "100";
        });
        CompletableFuture<Integer> f2 = f1.handleAsync((result, exception) -> {
            if(exception != null) {
                exception.printStackTrace();
            }
            return Integer.parseInt(result) * 10;
        });
        System.out.println(f2.get());
    }
---
1000

handle方法依然保留了对异常请款的处理,在BiFunction中指定其第二个参数类型是Throwble。

接下来一组方法thenApply与handle类似,也可以对上游结果进行转换,同时忽略对异常情况的处理。

public <U> CompletableFuture<U> thenApplyFunction<? super T,? extends U> fn){
...
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
...
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
...
}

示例代码:

	@Test
    public void testThenApply() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //int i= 1/0;
            return "100";
        });
        CompletableFuture<Integer> f = future.thenApply(t -> Integer.parseInt(t) * 10).thenApply(t -> t * 10);
        System.out.println(f.get());
    }
---
10000

上述方法处理完成后,都会返回计算结果,CompletableFuture还提供了一组处理方法,只对上游处理结果进行消费,且没有返回,如果上游发生异常,则不执行该方法;同时,CompletableFuture还提供了一组加强版方法,提供两个CompletableFuture任务都完成或完成一个时执行的方法。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
...
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
...
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) {
...
}

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {
...
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {
...
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) {
...
}
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
...
}

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
...
}

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) {
...
}

示例代码:

    @Test
    public void testThenAccept() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            int i = 1/0;
            return "1";
        });
        CompletableFuture<Void> f = future.thenAccept(t -> {
            System.out.println(t);
        });
    }
---

上述方法都依赖上游处理的结果,CompletableFuture还提供了一组方法,不依赖上游的结果,如下

public CompletableFuture<Void> thenRun(Runnable action) {
...
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
...
}

public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {
...
}

示例代码:

    @Test
    public void testThenRun() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "result";
        });
        CompletableFuture<Void> f = future.thenRunAsync(() -> {
            System.out.println("future then run");
        });
    }
---
future then run

thenCompose这组方法与thenApply类似,都可以将上游执行结果作为本stage入参继续计算,并转换返回类型,不同的是thenCompose在一个新CompletableFuture对象执行计算。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
...
}

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
...
}

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {
...
}

示例代码:

    @Test
    public void testThenCompose() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "10";
        });
        CompletableFuture<Integer> f = future.thenCompose(v -> {
            return CompletableFuture.supplyAsync(() -> {
               return Integer.parseInt(v) * 100;
            });
        });
        System.out.println(f.get());
    }
---
1000

thenCombine这组方法与thenAccepBoth类似,不同的是thenCombine有返回值:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {
...
}

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {
...
}

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
...
}

示例代码:

    @Test
    public void testThenCombine() throws InterruptedException, ExecutionException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "abc";
        });
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            return 123;
        });
        CompletableFuture<String> f3 = f1.thenCombine(f2, (m,n) -> {
            return m + " is " + n;
        });
        System.out.println(f3.get());
    }
---
abc is 123

最后两个方法用于组合多个CompletableFuture,allOf方法是当所有CompletableFuture执行完成后执行计算,anyOf方法是任意一个CompletableFuture执行完成后执行计算。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
...
}

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
...
}

示例代码:

    @Test
    public void testAllof() throws InterruptedException, ExecutionException {
        List<CompletableFuture<Integer>> list = new ArrayList<>();
        Random random = new Random();
        for (int i=1; i < 10; i++) {
            list.add(CompletableFuture.supplyAsync(() -> {
                return random.nextInt(100);
            }));
        }
        CompletableFuture<Void> fn = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
        CompletableFuture<List<Integer>> fr = fn.thenApply(v -> {
            return list.stream().map(CompletableFuture::join).collect(Collectors.toList()); 
        });
        System.out.println(fr.get());
    }
---
[3, 97, 11, 93, 22, 77, 68, 26, 89]

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java商城项目实战之购物车功能实现

    java商城项目实战之购物车功能实现

    这篇文章主要为大家详细介绍了java商城项目实战之购物车功能实现,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-04-04
  • SpringBoot实现数据转换的四种对象映射方案

    SpringBoot实现数据转换的四种对象映射方案

    在项目开发中,对象之间的相互转换是一个高频操作,尤其在分层架构的系统中,数据在实体对象(Entity)、数据传输对象(DTO)、值对象(VO)之间的转换尤为常见,选择一个高效、可靠的对象映射方案对提高系统性能和开发效率至关重要,本文将介绍4种对象映射方案
    2025-06-06
  • Spring Boot身份认证状态的存储与传递实现方案

    Spring Boot身份认证状态的存储与传递实现方案

    本文主要内容介绍了在SpringBoot中实现用户登录信息获取的三种主流方案:基于Session+Cookie、基于JWTToken和基于OAuth2.0/OIDC,并详细介绍了每种方案的核心逻辑、代码实现、优缺点及关键注意点,感兴趣的朋友跟随小编一起看看吧
    2025-11-11
  • SpringBoot中实现多线程六种方式大全

    SpringBoot中实现多线程六种方式大全

    Spring Boot 提供了非常丰富的多线程支持手段,从最简单的注解到虚拟线程,可以满足从简单异步任务到高并发 IO/CPU 密集型场景的各种需求,下面列出目前(2026 年视角)最常用、最推荐的 6 种方式,需要的朋友可以参考下
    2026-02-02
  • JSONObject按put顺序排放与输出方式

    JSONObject按put顺序排放与输出方式

    这篇文章主要介绍了JSONObject按put顺序排放与输出方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Flink实现往Kafka中多个topic发送消息

    Flink实现往Kafka中多个topic发送消息

    文章介绍了使用Flink 1.13.2 和 Kafka 2.6.2 从Kafka读取数据,并根据逻辑将数据分配到不同的topic,提到了需要重写FlinkKafka的Key序列化器,并加入自定义逻辑以发送消息到指定的topic,文中还包括了配置Kafka信息和如何连接FlinkKafka的步骤
    2025-10-10
  • java GUI界面初步入门示例【AWT包】

    java GUI界面初步入门示例【AWT包】

    这篇文章主要介绍了java GUI界面,结合简单实例形式分析了java使用AWT包进行图形界面操作相关使用技巧,需要的朋友可以参考下
    2020-01-01
  • SpringBoot模拟员工数据库并实现增删改查操作

    SpringBoot模拟员工数据库并实现增删改查操作

    这篇文章主要给大家介绍了关于SpringBoot模拟员工数据库并实现增删改查操作的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-09-09
  • java实现动态上传多个文件并解决文件重名问题

    java实现动态上传多个文件并解决文件重名问题

    这篇文章主要为大家详细介绍了java实现动态上传多个文件,并解决文件重名问题的方法,感兴趣的小伙伴们可以参考一下
    2016-03-03
  • Java实现截图小工具的完整代码

    Java实现截图小工具的完整代码

    这篇文章主要介绍了Java实现截图小工具的完整代码,用Java的图形用户界面GUI技术写了一个电脑截图小工具,本程序代码简单,涉及到异常处理,事件处理,图形用户界面等,需要的朋友可以参考下
    2022-05-05

最新评论