JUC中Future及CompletableFuture的用法与说明

 更新时间:2026年04月21日 09:54:35   作者:kkkwang0o0  
本文介绍了Future接口及其实现类FutureTask的使用方法,指出了其阻塞和CPU空转问题,为了解决这些问题,介绍了CompletableFuture的异步任务编排,可以自动回调异步任务结束时的方法,解决了阻塞和轮询问题,同时提供了链式调用、函数式编程等等提高编程效率

Future 异步回调(不推荐)

new Thread时传入FutureTask对象(构造时传入Callable任务对象),调用start启动

创建线程的方式

1.直接new Thread()对象,重写run方法,调用start启动

2.new Thread时传入Runnable任务对象(重写run方法),调用start启动

3.new Thread时传入FutureTask对象(构造时传入Callable任务对象),调用start启动

/**
 * 创建线程的三种方式
 */
public class CreateThreadDemo {
    public static void main(String[] args) throws Exception {
        // 1.重写Thread的run方法
        Thread t1 = new Thread(() -> {
            System.out.println("第一种创建线程方式,重写Thread的run方法");
        },"t1");
        // 2.重写Runnable的run方法
        RunTask runTask = new RunTask();
        Thread t2 = new Thread(runTask, "t2");
        // 3.传入FutureTask对象,重写Callable的call方法(异步带返回值)
        CallTask callTask = new CallTask();
        FutureTask<String> futureTask = new FutureTask<>(callTask);
        Thread t3 = new Thread(futureTask, "t3");
        t1.start();
        t2.start();
        t3.start();
        System.out.println("异步执行结果" + futureTask.get() + " " + System.currentTimeMillis());
    }
}
/**
 * Runnable 任务
 */
class RunTask implements Runnable {
    @Override
    public void run() {
        System.out.println("runnable running " + System.currentTimeMillis());
    }
}
/**
 * Callable 任务
 */
class CallTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("callable running " + System.currentTimeMillis());
        return "hello";
    }
}

概述

Fucture接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。

Future接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。

Future提供了一种异步并行计算的功能:如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果

Future接口 --- 源码

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask(Future实现类)

Thread类构造方法:

可以看出Thread构造方法接收的任务对象只有Runnable接口,为什么还能接收FutureTask任务对象呢?

因为FutureTask实现了RunableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask也是Runnable对象,而FutureTask可以接收Callable任务对象是因为构造方法中提供了接收Callable对象的构造方法。

Future编码优缺点

  • 优点:future+线程池可以实现多线程任务配合,能显著提高程序的执行效率
  • 缺点:get()阻塞、isDone()轮询导致CPU空转
  • 阻塞情况: 调用FutureTask.get方法时线程会阻塞等待异步结果的获取

代码:

FutureTask<String> futureTask = new FutureTask<String>(() -> {
    System.out.println("开始执行......" + now().getSecond());
    // 休眠
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("结束......");
    return "FutureTask End";
});
Thread t = new Thread(futureTask);
t.start();
System.out.println("子线程执行结果:" + futureTask.get());
System.out.println("--------------------------------");
System.out.println("主线程 -------- 执行..... " + now().getSecond());
System.out.println("--------------------------------");

运行结果:

可以通过设置:

System.out.println("子线程执行结果:" + futureTask.get(2, TimeUnit.SECONDS));

暴力终止程序运行或抛出异常

isDone()轮询导致CPU空转:

轮询的方式会消耗无谓的CPU资源,而且也不见得能及时获得到计算结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

while (true) {
    if (futureTask.isDone()) {
        System.out.println("子线程执行结果:" + futureTask.get() + now().getSecond());
        break;
    }else {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("暂停2s");
    }
}

结论:Future对于结果的获取不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

由此引入CompletableFuture,规避以上的缺点

需要说明的是对于简单的业务使用Future就可以了

CompletableFuture(异步任务编排)

CompletableFuture可以代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。

CompletableFuture可以解决的问题?

多个任务前后依赖组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个计算结果

对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名的处理结果。。。。

CompletableFuture对比Future的改进

Future问题:get()阻塞、isDone()轮询CPU空转

对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。

CompletableFuture提供一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

怎么创建CompletableFuture对象?

创建

一般不建议直接new CompletableFuture

可以采用以下的方式创建CompletableFuture对象:

1.CompletableFuture.runAsync无返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

2. CompletableFuture.supplyAsync有返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

怎么解决Future的阻塞和轮询问题?

从java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

默认线程池创建的是守护线程,自定义线程池是用户线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    // 有返回值
    supplyFuture(threadPool);
    System.out.println("主线程 running " + now());
    threadPool.shutdown();
}
private static void supplyFuture(ExecutorService threadPool) {
    CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("开始执行...." +
                (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程") +
                " " + now());
        int random = ThreadLocalRandom.current().nextInt();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return random;
    }, threadPool).whenComplete((v, e) -> {
        if (e == null) {
            // 没有异常
            System.out.println("随机数结果: " + v);
        }
    }).exceptionally((e) -> {
        e.printStackTrace();
        System.out.println("异常情况:" + e.getCause());
        return null;
    });
}

运行结果:

已经解决了Future编码出现的get阻塞问题。

上述代码块逻辑解析:

supplyAsync()中是任务(默认线程池创建的守护线程),如果这个任务成功会走到whenComplete代码块中,不成功会走到exceptionally代码块中

注意:默认线程池创建的是守护线程,自定义线程池是用户线程,守护线程会随着用户线程的结束而结束,所以会导致主线程执行完了然后还没打印出随机数线程池就关闭了,就是以下输出的情况

怎么获得异步结果?

通过get或者join获得结果(区别在于join在编译期间不会作检查性异常的处理,抛不抛异常都可以)

CompletableFuture的优点

  1. 异步任务结束时,会自动回调某个对象的方法;
  2. 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;
  3. 异步任务出错时,会自动回调某个对象的方法:

函数式编程

函数式编程:Lambda表达式+Stream流式编程+Chain链式调用+Java8函数式编程

如Runnable、Function、Consumer --- BIConsumer、Supplier

常用函数式接口

函数式接口名称

方法名称

参数

返回值类型

Consumer<T>

accept

T t

void

Supplier<T>

get

T

Function<T, R>

apply

T t

R

Predicate<T>

test

T t

boolean

BiConsumer<T, U>

accept

T t, U u

void

BiFunction<T, U, R>

apply

T t, U u

R

BiPredicate<T, U>

test

T t, U u

boolean

UnaryOperator<T>

apply

T t

T

BinaryOperator<T>

apply

T t1, T t2

T

IntConsumer

accept

int value

void

IntSupplier

getAsInt

int

IntFunction<R>

apply

int value

R

IntPredicate

test

int value

boolean

链式语法

public class ChainDemo {
    public static void main(String[] args) {
        Student student = new Student();
        student.setId(1).setName("karry").setMajor("cs");
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
class Student{
    private Integer id;
    private String name;
    private String major;
}

案例精讲-电商网站的比价需求

从“功能”到“性能”

/**
 * 电商网站的比价需求
 */
public class CompletableMallDemo {
    private static final List<Mall> list = new ArrayList<>();
    static {
        for (int i = 0; i <= 1000; i ++) {
            list.add(new Mall("book" + i));
        }
    }
    /**
     * 查询价格 同步处理
     * @param list 平台列表
     * @param bookName 书籍名称
     * @return 查询所需时间
     */
    public static Long getPrice(List<Mall> list, String bookName) {
        long start = System.currentTimeMillis();
        List<String> prices = list.stream()
        .map((mall) -> String.format(bookName + " in %s price is %.2f",
                                     mall.getName(), mall.calcPrice(bookName)))
        .collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("用时:  " + end);
        prices.forEach(System.out::println);
        return end - start;
    }
    /**
     * 查询价格 异步处理
     * @param list 平台
     * @param bookName 书籍名称
     * @return 查询所需
     */
    public static Long getPriceByCompletableFuture(List<Mall> list, String bookName) {
        long start = System.currentTimeMillis();
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        List<String> prices = list.stream()
        // 把 mall 映射到 CompletableFuture对象
        .map(mall -> CompletableFuture.supplyAsync(() ->
                                                   String.format(bookName + " in %s price is %.2f",
                                                                 mall.getName(), mall.calcPrice(bookName)), threadPool)
            ).collect(Collectors.toList()).stream()
        // CompletableFuture对象 映射到 CompletableFuture.join() [String对象]
        .map(CompletableFuture::join).collect(Collectors.toList());
        threadPool.shutdown();
        long end = System.currentTimeMillis();
        System.out.println("用时:  " + end);
        prices.forEach(System.out::println);
        return end - start;
    }
    public static void main(String[] args) {
        System.out.println("差值:" +
                           (getPrice(list, "mysql") - getPriceByCompletableFuture(list, "mysql")));
    }
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class Mall {
    /**
     * 电商网站名称
     */
    private String name;
    /**
     * 模拟查询价格
     * @param bookName 书籍名称
     * @return double 价格
     */
    public double calcPrice(String bookName) {
        return ThreadLocalRandom.current().nextDouble(20, 100) + bookName.charAt(0);
    }
}

源码分析

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>  {
}

CompletableFuture实现了Future接口,还拓展了Future不具备的CompletionStage接口

CompletionStage(完成阶段)

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。

一个阶段的计算执行可以时一个Fuction,Consumer或者Runnable。比如:stage.thenApply().thenAccept().thenRun()

CompletableFuture API

不带Async和带Async的API区别:

对于CompletionStage,每个任务阶段带Async的任务可以设定线程池,不设定就会使用默认线程池, 而不带Async的任务阶段会使用CompletableFuture设定的线程池。

thenRun和thenRunAsync的区别:

  • thenRun使用的supplyAsync或者runAsync传入的线程池(不传入则使用默认线程池---守护线程)
  • thenRunAsync使用的是自己api传入的线程池,不传入则使用默认线程池(守护线程)
/**
 * thenRunAsync方法
 * @param threadPool
 */
public static void testRunAsyncMethod(ExecutorService threadPool) {
CompletableFuture.runAsync(() -> {
    System.out.println("step 1 " + Thread.currentThread().getName());
}, threadPool).thenRunAsync(() -> {
    System.out.println("step 2 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
    System.out.println("step 3 " + Thread.currentThread().getName());
}, threadPool).thenRunAsync(() -> {
    System.out.println("step 4 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
    System.out.println("step 5 " + Thread.currentThread().getName());
}).join();
}

方法

触发时机

输入参数

返回值类型

核心作用

thenRun

前序任务正常完成后

无(Runnable

CompletableFuture<Void>

执行无输入、无输出的后续操作

thenAccept

前序任务正常完成后

前序任务的结果(Consumer

CompletableFuture<Void>

消费前序任务的结果,无新输出

thenApply

前序任务正常完成后

前序任务的结果(Function

CompletableFuture<U>

基于前序结果计算新结果,有新输出

handle

前序任务完成(无论成败)

前序结果 + 异常(BiFunction

CompletableFuture<U>

处理前序任务的结果或异常,计算新结果

1. 获得结果和触发计算

获得结果:

  • get() : 不见不散
  • get(long, TimUnit): 过时不候
  • join() : 功能和get类似,但是在编译期间不抛出受检查异常
  • getNow(valueIfAbsent): 立即获得当前结果,为空则返回valueIfAbsent的值
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

触发计算:

complete(T value) :是否打断get方法立即返回括号值(返回括号值,为true;不返回为false)

public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}

eg:

public static void main(String[] args) throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    testCompleteMethodByTrue(threadPool);
    testCompleteMethodByFalse(threadPool);
    threadPool.shutdown();
}
public static void testCompleteMethodByFalse(ExecutorService threadPool) throws InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "get/join返回的结果";
    }, threadPool);
    TimeUnit.SECONDS.sleep(4);
    boolean flag = future.complete("设定的值");
    System.out.println("complete方法返回值为 " + flag + ",4s左右获得的值为 " + future.join());
}
public static void testCompleteMethodByTrue(ExecutorService threadPool) throws InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "get/join返回的结果";
    }, threadPool);
    TimeUnit.SECONDS.sleep(2);
    boolean flag = future.complete("设定的值");
    System.out.println("complete方法返回值为 " + flag + ",2s左右获得的值为 " + future.join());
}

运行结果:

2. 对计算结果进行处理

2.1 thenApply: 计算结果存在依赖关系,串行化

/**
 * thenApply
 * @param threadPool
 */
public static void testApplyMethod(ExecutorService threadPool) {
StringBuffer str = new StringBuffer();
CompletableFuture<StringBuffer> future = CompletableFuture.supplyAsync(() -> {
    return str.append("a");
}, threadPool).thenApply(f -> {
    return str.append("b");
}).thenApply(f -> {
    return str.append("c");
}).whenComplete((v, e) -> {
    if (v != null) {
        System.out.println("apply处理后的结果为 " + v);
    }
}).exceptionally(e -> {
    e.printStackTrace();
    return null;
});
}

2.2 handle: 计算结果存在依赖关系,串行化(可以带着)

/**
 * handle方法
 * @param threadPool
 */
public static void testHandleMethod(ExecutorService threadPool) {
    StringBuffer str = new StringBuffer();
    CompletableFuture<StringBuffer> future = CompletableFuture.supplyAsync(() -> {
        return str.append("a");
    }, threadPool).handle((f, e) -> {
        if (e == null) {
            return f.append("b");
        }else {
            e.printStackTrace();
            return null;
        }
    }).handle((f, e) -> {
        if (e == null) {
            return f.append("c");
        }else {
            e.printStackTrace();
            return null;
        }
    });
    System.out.println("handle处理后的结果为 " + future.join());
}

3.对计算结果进行消费

接收任务的处理结果,消费处理,无返回结果

thenAccept:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

eg:

/**
 * thenAccept方法(消费型)
 * @param threadPool
 */
public static void testAcceptMethod(ExecutorService threadPool) {
    StringBuffer str = new StringBuffer();
    CompletableFuture.supplyAsync(() -> str.append("abc"), threadPool)
    .thenAccept(r -> System.out.println("accept直接消费" + r)).join();
}

输出:

thenRun:

任务A执行完执行B,并且B不需要A的结果

/**
 * thenRun方法
 * @param threadPool
 */
public static void testRunMethod(ExecutorService threadPool) {
CompletableFuture.runAsync(() -> {
    System.out.println("step 1");
}, threadPool).thenRun(() -> {
    System.out.println("step 2");
}).thenRun(() -> {
    System.out.println("step 3");
});
}
4.对计算速度进行选用

applyToEither:

对两个future对象选用速度较快的那一个结果

eg:

/**
 * applyToEither 方法
 * @param threadPool
 */
public static void testApplyToEitherMethod(ExecutorService threadPool) {
for (int i = 2; i <= 5; i ++) {
    CompletableFuture<String> future = getPlayFuture(i - 1, threadPool).applyToEither(getPlayFuture(i, threadPool), f -> f + " is winner");
    System.out.println(future.join());
}
}
private static CompletableFuture<String> getPlayFuture(int num, ExecutorService threadPool) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(num * 100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "play" + num;
    });
}

5.对计算结果进行合并

thenCombine:

两个CompletionStage任务都完成后,最终能把两个任务的结果一起提交给thenCombine来处理;先完成的先等着,等待其它分支任务。

eg:

/**
 * thenCombine 方法
 * @param threadPool
 */
public static void testCombineMethod(ExecutorService threadPool) {
    CompletableFuture<Integer> future1 = getBranchFuture(1, threadPool);
    CompletableFuture<Integer> future2 = getBranchFuture(2, threadPool);
    CompletableFuture<Integer> combineFuture = future2.thenCombine(future1, Integer::sum);
    System.out.println("计算结果为 " + combineFuture.join());
}
private static CompletableFuture<Integer> getBranchFuture(int num, ExecutorService threadPool) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(num * 100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return num * 10;
    });
}

输出:计算结果为 30

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

最新评论