JUC中Future及CompletableFuture的用法与说明
Future 异步回调(不推荐)
new Thread时传入FutureTask对象(构造时传入Callable任务对象),调用start启动
创建线程的方式
1.直接new Thread()对象,重写run方法,调用start启动
2.new Thread时传入Runnable任务对象(重写run方法),调用start启动
3.new Thread时传入FutureTask对象(构造时传入Callable任务对象),调用start启动
/**
* 创建线程的三种方式
*/
public class CreateThreadDemo {
public static void main(String[] args) throws Exception {
// 1.重写Thread的run方法
Thread t1 = new Thread(() -> {
System.out.println("第一种创建线程方式,重写Thread的run方法");
},"t1");
// 2.重写Runnable的run方法
RunTask runTask = new RunTask();
Thread t2 = new Thread(runTask, "t2");
// 3.传入FutureTask对象,重写Callable的call方法(异步带返回值)
CallTask callTask = new CallTask();
FutureTask<String> futureTask = new FutureTask<>(callTask);
Thread t3 = new Thread(futureTask, "t3");
t1.start();
t2.start();
t3.start();
System.out.println("异步执行结果" + futureTask.get() + " " + System.currentTimeMillis());
}
}
/**
* Runnable 任务
*/
class RunTask implements Runnable {
@Override
public void run() {
System.out.println("runnable running " + System.currentTimeMillis());
}
}
/**
* Callable 任务
*/
class CallTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("callable running " + System.currentTimeMillis());
return "hello";
}
}概述
Fucture接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。
Future提供了一种异步并行计算的功能:如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
Future接口 --- 源码
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}FutureTask(Future实现类)

Thread类构造方法:

可以看出Thread构造方法接收的任务对象只有Runnable接口,为什么还能接收FutureTask任务对象呢?
因为FutureTask实现了RunableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask也是Runnable对象,而FutureTask可以接收Callable任务对象是因为构造方法中提供了接收Callable对象的构造方法。

Future编码优缺点
- 优点:future+线程池可以实现多线程任务配合,能显著提高程序的执行效率
- 缺点:get()阻塞、isDone()轮询导致CPU空转
- 阻塞情况: 调用FutureTask.get方法时线程会阻塞等待异步结果的获取
代码:
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println("开始执行......" + now().getSecond());
// 休眠
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束......");
return "FutureTask End";
});
Thread t = new Thread(futureTask);
t.start();
System.out.println("子线程执行结果:" + futureTask.get());
System.out.println("--------------------------------");
System.out.println("主线程 -------- 执行..... " + now().getSecond());
System.out.println("--------------------------------");运行结果:

可以通过设置:
System.out.println("子线程执行结果:" + futureTask.get(2, TimeUnit.SECONDS));
暴力终止程序运行或抛出异常
isDone()轮询导致CPU空转:
轮询的方式会消耗无谓的CPU资源,而且也不见得能及时获得到计算结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
while (true) {
if (futureTask.isDone()) {
System.out.println("子线程执行结果:" + futureTask.get() + now().getSecond());
break;
}else {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("暂停2s");
}
}结论:Future对于结果的获取不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。
由此引入CompletableFuture,规避以上的缺点
需要说明的是对于简单的业务使用Future就可以了
CompletableFuture(异步任务编排)
CompletableFuture可以代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
CompletableFuture可以解决的问题?
多个任务前后依赖组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个计算结果
对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名的处理结果。。。。
CompletableFuture对比Future的改进
Future问题:get()阻塞、isDone()轮询CPU空转
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。
CompletableFuture提供一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
怎么创建CompletableFuture对象?
创建
一般不建议直接new CompletableFuture
可以采用以下的方式创建CompletableFuture对象:
1.CompletableFuture.runAsync无返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

2. CompletableFuture.supplyAsync有返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

怎么解决Future的阻塞和轮询问题?
从java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
默认线程池创建的是守护线程,自定义线程池是用户线程
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 有返回值
supplyFuture(threadPool);
System.out.println("主线程 running " + now());
threadPool.shutdown();
}
private static void supplyFuture(ExecutorService threadPool) {
CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行...." +
(Thread.currentThread().isDaemon() ? "守护线程" : "用户线程") +
" " + now());
int random = ThreadLocalRandom.current().nextInt();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return random;
}, threadPool).whenComplete((v, e) -> {
if (e == null) {
// 没有异常
System.out.println("随机数结果: " + v);
}
}).exceptionally((e) -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause());
return null;
});
}运行结果:

已经解决了Future编码出现的get阻塞问题。
上述代码块逻辑解析:
supplyAsync()中是任务(默认线程池创建的守护线程),如果这个任务成功会走到whenComplete代码块中,不成功会走到exceptionally代码块中
注意:默认线程池创建的是守护线程,自定义线程池是用户线程,守护线程会随着用户线程的结束而结束,所以会导致主线程执行完了然后还没打印出随机数线程池就关闭了,就是以下输出的情况

怎么获得异步结果?
通过get或者join获得结果(区别在于join在编译期间不会作检查性异常的处理,抛不抛异常都可以)
CompletableFuture的优点
- 异步任务结束时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;
- 异步任务出错时,会自动回调某个对象的方法:
函数式编程
函数式编程:Lambda表达式+Stream流式编程+Chain链式调用+Java8函数式编程
如Runnable、Function、Consumer --- BIConsumer、Supplier
常用函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值类型 |
Consumer<T> | accept | T t | void |
Supplier<T> | get | 无 | T |
Function<T, R> | apply | T t | R |
Predicate<T> | test | T t | boolean |
BiConsumer<T, U> | accept | T t, U u | void |
BiFunction<T, U, R> | apply | T t, U u | R |
BiPredicate<T, U> | test | T t, U u | boolean |
UnaryOperator<T> | apply | T t | T |
BinaryOperator<T> | apply | T t1, T t2 | T |
IntConsumer | accept | int value | void |
IntSupplier | getAsInt | 无 | int |
IntFunction<R> | apply | int value | R |
IntPredicate | test | int value | boolean |
链式语法
public class ChainDemo {
public static void main(String[] args) {
Student student = new Student();
student.setId(1).setName("karry").setMajor("cs");
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
class Student{
private Integer id;
private String name;
private String major;
}案例精讲-电商网站的比价需求
从“功能”到“性能”

/**
* 电商网站的比价需求
*/
public class CompletableMallDemo {
private static final List<Mall> list = new ArrayList<>();
static {
for (int i = 0; i <= 1000; i ++) {
list.add(new Mall("book" + i));
}
}
/**
* 查询价格 同步处理
* @param list 平台列表
* @param bookName 书籍名称
* @return 查询所需时间
*/
public static Long getPrice(List<Mall> list, String bookName) {
long start = System.currentTimeMillis();
List<String> prices = list.stream()
.map((mall) -> String.format(bookName + " in %s price is %.2f",
mall.getName(), mall.calcPrice(bookName)))
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("用时: " + end);
prices.forEach(System.out::println);
return end - start;
}
/**
* 查询价格 异步处理
* @param list 平台
* @param bookName 书籍名称
* @return 查询所需
*/
public static Long getPriceByCompletableFuture(List<Mall> list, String bookName) {
long start = System.currentTimeMillis();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
List<String> prices = list.stream()
// 把 mall 映射到 CompletableFuture对象
.map(mall -> CompletableFuture.supplyAsync(() ->
String.format(bookName + " in %s price is %.2f",
mall.getName(), mall.calcPrice(bookName)), threadPool)
).collect(Collectors.toList()).stream()
// CompletableFuture对象 映射到 CompletableFuture.join() [String对象]
.map(CompletableFuture::join).collect(Collectors.toList());
threadPool.shutdown();
long end = System.currentTimeMillis();
System.out.println("用时: " + end);
prices.forEach(System.out::println);
return end - start;
}
public static void main(String[] args) {
System.out.println("差值:" +
(getPrice(list, "mysql") - getPriceByCompletableFuture(list, "mysql")));
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class Mall {
/**
* 电商网站名称
*/
private String name;
/**
* 模拟查询价格
* @param bookName 书籍名称
* @return double 价格
*/
public double calcPrice(String bookName) {
return ThreadLocalRandom.current().nextDouble(20, 100) + bookName.charAt(0);
}
}源码分析
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}CompletableFuture实现了Future接口,还拓展了Future不具备的CompletionStage接口
CompletionStage(完成阶段)
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。
一个阶段的计算执行可以时一个Fuction,Consumer或者Runnable。比如:stage.thenApply().thenAccept().thenRun()
CompletableFuture API
不带Async和带Async的API区别:
对于CompletionStage,每个任务阶段带Async的任务可以设定线程池,不设定就会使用默认线程池, 而不带Async的任务阶段会使用CompletableFuture设定的线程池。
thenRun和thenRunAsync的区别:
thenRun使用的supplyAsync或者runAsync传入的线程池(不传入则使用默认线程池---守护线程)thenRunAsync使用的是自己api传入的线程池,不传入则使用默认线程池(守护线程)
/**
* thenRunAsync方法
* @param threadPool
*/
public static void testRunAsyncMethod(ExecutorService threadPool) {
CompletableFuture.runAsync(() -> {
System.out.println("step 1 " + Thread.currentThread().getName());
}, threadPool).thenRunAsync(() -> {
System.out.println("step 2 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
System.out.println("step 3 " + Thread.currentThread().getName());
}, threadPool).thenRunAsync(() -> {
System.out.println("step 4 " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
System.out.println("step 5 " + Thread.currentThread().getName());
}).join();
}
方法 | 触发时机 | 输入参数 | 返回值类型 | 核心作用 |
thenRun | 前序任务正常完成后 | 无(Runnable ) | CompletableFuture<Void> | 执行无输入、无输出的后续操作 |
thenAccept | 前序任务正常完成后 | 前序任务的结果(Consumer ) | CompletableFuture<Void> | 消费前序任务的结果,无新输出 |
thenApply | 前序任务正常完成后 | 前序任务的结果(Function ) | CompletableFuture<U> | 基于前序结果计算新结果,有新输出 |
handle | 前序任务完成(无论成败) | 前序结果 + 异常(BiFunction ) | CompletableFuture<U> | 处理前序任务的结果或异常,计算新结果 |
1. 获得结果和触发计算
获得结果:

- get() : 不见不散
- get(long, TimUnit): 过时不候
- join() : 功能和get类似,但是在编译期间不抛出受检查异常
- getNow(valueIfAbsent): 立即获得当前结果,为空则返回valueIfAbsent的值
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}触发计算:

complete(T value) :是否打断get方法立即返回括号值(返回括号值,为true;不返回为false)
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}eg:
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
testCompleteMethodByTrue(threadPool);
testCompleteMethodByFalse(threadPool);
threadPool.shutdown();
}
public static void testCompleteMethodByFalse(ExecutorService threadPool) throws InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "get/join返回的结果";
}, threadPool);
TimeUnit.SECONDS.sleep(4);
boolean flag = future.complete("设定的值");
System.out.println("complete方法返回值为 " + flag + ",4s左右获得的值为 " + future.join());
}
public static void testCompleteMethodByTrue(ExecutorService threadPool) throws InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "get/join返回的结果";
}, threadPool);
TimeUnit.SECONDS.sleep(2);
boolean flag = future.complete("设定的值");
System.out.println("complete方法返回值为 " + flag + ",2s左右获得的值为 " + future.join());
}运行结果:

2. 对计算结果进行处理
2.1 thenApply: 计算结果存在依赖关系,串行化

/**
* thenApply
* @param threadPool
*/
public static void testApplyMethod(ExecutorService threadPool) {
StringBuffer str = new StringBuffer();
CompletableFuture<StringBuffer> future = CompletableFuture.supplyAsync(() -> {
return str.append("a");
}, threadPool).thenApply(f -> {
return str.append("b");
}).thenApply(f -> {
return str.append("c");
}).whenComplete((v, e) -> {
if (v != null) {
System.out.println("apply处理后的结果为 " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
}2.2 handle: 计算结果存在依赖关系,串行化(可以带着)

/**
* handle方法
* @param threadPool
*/
public static void testHandleMethod(ExecutorService threadPool) {
StringBuffer str = new StringBuffer();
CompletableFuture<StringBuffer> future = CompletableFuture.supplyAsync(() -> {
return str.append("a");
}, threadPool).handle((f, e) -> {
if (e == null) {
return f.append("b");
}else {
e.printStackTrace();
return null;
}
}).handle((f, e) -> {
if (e == null) {
return f.append("c");
}else {
e.printStackTrace();
return null;
}
});
System.out.println("handle处理后的结果为 " + future.join());
}
3.对计算结果进行消费
接收任务的处理结果,消费处理,无返回结果
thenAccept:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}eg:
/**
* thenAccept方法(消费型)
* @param threadPool
*/
public static void testAcceptMethod(ExecutorService threadPool) {
StringBuffer str = new StringBuffer();
CompletableFuture.supplyAsync(() -> str.append("abc"), threadPool)
.thenAccept(r -> System.out.println("accept直接消费" + r)).join();
}输出:

thenRun:
任务A执行完执行B,并且B不需要A的结果

/**
* thenRun方法
* @param threadPool
*/
public static void testRunMethod(ExecutorService threadPool) {
CompletableFuture.runAsync(() -> {
System.out.println("step 1");
}, threadPool).thenRun(() -> {
System.out.println("step 2");
}).thenRun(() -> {
System.out.println("step 3");
});
}4.对计算速度进行选用
applyToEither:
对两个future对象选用速度较快的那一个结果

eg:
/**
* applyToEither 方法
* @param threadPool
*/
public static void testApplyToEitherMethod(ExecutorService threadPool) {
for (int i = 2; i <= 5; i ++) {
CompletableFuture<String> future = getPlayFuture(i - 1, threadPool).applyToEither(getPlayFuture(i, threadPool), f -> f + " is winner");
System.out.println(future.join());
}
}
private static CompletableFuture<String> getPlayFuture(int num, ExecutorService threadPool) {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(num * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "play" + num;
});
}
5.对计算结果进行合并
thenCombine:
两个CompletionStage任务都完成后,最终能把两个任务的结果一起提交给thenCombine来处理;先完成的先等着,等待其它分支任务。

eg:
/**
* thenCombine 方法
* @param threadPool
*/
public static void testCombineMethod(ExecutorService threadPool) {
CompletableFuture<Integer> future1 = getBranchFuture(1, threadPool);
CompletableFuture<Integer> future2 = getBranchFuture(2, threadPool);
CompletableFuture<Integer> combineFuture = future2.thenCombine(future1, Integer::sum);
System.out.println("计算结果为 " + combineFuture.join());
}
private static CompletableFuture<Integer> getBranchFuture(int num, ExecutorService threadPool) {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(num * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return num * 10;
});
}输出:计算结果为 30
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Java classloader和namespace详细介绍
这篇文章主要介绍了Java classloader和namespace详细介绍的相关资料,需要的朋友可以参考下2017-03-03
解决IntelliJ IDEA创建spring boot无法连接http://start.spring.io/问题
这篇文章主要介绍了解决IntelliJ IDEA创建spring boot无法连接http://start.spring.io/问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-08-08
基于SpringAI+Qdrant+Ollama本地模型和向量数据库开发问答和RAG检索(完整代码)
文章介绍了使用Ollama和Qdrant构建AI对话系统的步骤,包括安装模型、启动向量数据库、环境配置等,并提供了完整的代码实现,支持基本对话、文件检索等功能,本文通过实例代码给大家讲解的非常详细,感兴趣的朋友跟随小编一起看看吧2026-04-04
mybatis-plus调用update方法时,自动填充字段不生效问题及解决
这篇文章主要介绍了mybatis-plus调用update方法时,自动填充字段不生效问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-06-06
SpringSecurity+JWT实现前后端分离的使用详解
这篇文章主要介绍了SpringSecurity+JWT实现前后端分离的使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-01-01
Spring框架原理之实例化bean和@Autowired实现原理方式
这篇文章主要介绍了Spring框架原理之实例化bean和@Autowired实现原理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2025-05-05


最新评论