Java8 自定义CompletableFuture的原理解析

 更新时间:2021年11月04日 11:53:52   作者:小小工匠  
这篇文章主要介绍了Java8 自定义CompletableFuture的原理解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

Java8 自定义CompletableFuture原理

Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好?

public class FutureInAction3 {
    public static void main(String[] args) {
        Future<String> future = invoke(() -> {
            try {
                Thread.sleep(10000L);
                return "I am Finished.";
            } catch (InterruptedException e) {
                return "I am Error";
            }
        });
        future.setCompletable(new Completable<String>() {
            @Override
            public void complete(String s) {
                System.out.println("complete called ---- " + s);
            }
            @Override
            public void exception(Throwable cause) {
                System.out.println("error");
                cause.printStackTrace();
            }
        });
        System.out.println("....do something else .....");
        System.out.println("try to get result ->" + future.get());
    }
    private static <T> Future<T> invoke(Callable<T> callable) {
        AtomicReference<T> result = new AtomicReference<>();
        AtomicBoolean finished = new AtomicBoolean(false);
        Future<T> future = new Future<T>() {
            private Completable<T> completable;
            @Override
            public T get() {
                return result.get();
            }
            @Override
            public boolean isDone() {
                return finished.get();
            }
            // 设置完成
            @Override
            public void setCompletable(Completable<T> completable) {
                this.completable = completable;
            }
            // 获取
            @Override
            public Completable<T> getCompletable() {
                return completable;
            }
        };
        Thread t = new Thread(() -> {
            try {
                T value = callable.action();
                result.set(value);
                finished.set(true);
                if (future.getCompletable() != null)
                    future.getCompletable().complete(value);
            } catch (Throwable cause) {
                if (future.getCompletable() != null)
                    future.getCompletable().exception(cause);
            }
        });
        t.start();
        return future;
    }
    private interface Future<T> {
        T get();
        boolean isDone();
        //  1
        void setCompletable(Completable<T> completable);
        //  2
        Completable<T> getCompletable();
    }
    private interface Callable<T> {
        T action();
    }
    // 回调接口
    private interface Completable<T> {
        void complete(T t);
        void exception(Throwable cause);
    }
}

在这里插入图片描述

CompleteFuture简单使用

Java8 中的 completeFuture 是对 Future 的扩展实现, 主要是为了弥补 Future 没有相应的回调机制的缺陷.

我们先看看 Java8 之前的 Future 的使用

package demos;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * @author djh on  2019/4/22 10:23
 * @E-Mail 1544579459@qq.com
 */
public class Demo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService cachePool = Executors.newCachedThreadPool();
        Future<String> future = cachePool.submit(() -> {
            Thread.sleep(3000);
            return "异步任务计算结果!";
        });
        // 提交完异步任务后, 主线程可以继续干一些其他的事情.
        doSomeThingElse();
        // 为了获取异步计算结果, 我们可以通过 future.get 和 轮询机制来获取.
        String result;
        // Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷.
        // result = future.get();
        // 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载.
        long start = System.currentTimeMillis();
        while (true) {
            if (future.isDone()) {
                break;
            }
        }
        System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));        
        result = future.get();
        System.out.println("获取到异步计算结果啦: " + result);
        cachePool.shutdown();
    }
    private static void doSomeThingElse() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.");
    }
}

输出:

我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.
轮询耗时:2000
获取到异步计算结果啦: 异步任务计算结果!

Process finished with exit code 0

从上面的 Demo 中我们可以看出, future 在执行异步任务时, 对于结果的获取显的不那么优雅, 很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API.

下面简单介绍用法

package demos;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author djh on  2019/5/1 20:26
 * @E-Mail 1544579459@qq.com
 */
public class CompleteFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(() -> {
            try {
                Thread.sleep(3000);
                completableFutureOne.complete("异步任务执行结果");
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果.
        CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {
            System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);
        });
        // ThenApply 方法返回的是一个新的 completeFuture.
        CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
            System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s.length();
        });
        System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());
        cachePool.shutdown();
    }
}

从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法, 这两个方法便是 CompleteFuture 中最具有意义的方法, 他们都会在 completeFuture 调用 complete 方法传入异步计算结果时回调, 从而获取到异步任务的结果.

相比之下 future 的阻塞和轮询方式获取异步任务的计算结果, CompleteFuture 获取结果的方式就显的优雅的多。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 基于javamelody监控springboot项目过程详解

    基于javamelody监控springboot项目过程详解

    这篇文章主要介绍了基于javamelody监控springboot项目过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java执行JavaScript代码

    Java执行JavaScript代码

    这篇文章主要为大家详细介绍了Java执行JavaScript代码的具体操作方法,感兴趣的小伙伴们可以参考一下
    2016-03-03
  • java开发BeanUtils类解决实体对象间赋值

    java开发BeanUtils类解决实体对象间赋值

    这篇文章主要为大家介绍了java开发中使用BeanUtils类实现实体对象之间的赋值有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步学有所得
    2021-10-10
  • Java与Mysql锁相关知识总结

    Java与Mysql锁相关知识总结

    这篇文章主要介绍了Java与Mysql锁相关知识总结的相关资料,需要的朋友可以参考下
    2023-04-04
  • Java继承与多态的正确打开方式

    Java继承与多态的正确打开方式

    这篇文章主要为大家介绍了Java的继承与多态,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-01-01
  • Java网络编程之TCP通信完整代码示例

    Java网络编程之TCP通信完整代码示例

    这篇文章主要介绍了Java网络编程之TCP通信完整代码示例,具有一定借鉴价值,需要的朋友可以了解下。
    2017-12-12
  • Spring容器注入bean的五种方法逐个解析

    Spring容器注入bean的五种方法逐个解析

    依赖注入(Dependency Injection)和控制反转(Inversion of Control)是同一个概念。具体含义是:当某个角色(可能是一个Java实例,调用者)需要另一个角色(另一个Java实例,被调用者)的协助时,在传统的程序设计过程中,通常由调用者来创建被调用者的实例
    2023-02-02
  • 详解Springboot集成sentinel实现接口限流入门

    详解Springboot集成sentinel实现接口限流入门

    这篇文章主要介绍了详解Springboot集成sentinel实现接口限流入门,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • MyBatis中XML映射器的实现

    MyBatis中XML映射器的实现

    MyBatis的真正强大在于它的语句映射,映射器的XML文件就显得相对简单,本文主要介绍了MyBatis中XML映射器的实现,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • mybatis-plus的添加与修改详解

    mybatis-plus的添加与修改详解

    这篇文章主要介绍了mybatis-plus的添加与修改方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06

最新评论