Java中的FutureTask源码解析

 更新时间:2023年12月22日 10:04:41   作者:正经人z.  
这篇文章主要介绍了Java中的FutureTask源码解析,FutureTask是一个可取消的异步计算,这个类是Future的实现类,有开始和取消一个计算的方法,如果一个计算已经完成可以查看结果,需要的朋友可以参考下

一、简介

1、FutureTask是一个可取消的异步计算。这个类是Future的实现类,有开始和取消一个计算的方法,如果一个计算已经完成可以查看结果。如果在计算没有完成的情况下调用get获取计算结果会阻塞。且一旦任务完成后,计算不能重新开始或被取消,除非计算被runAndReset调用执行。

2、FutureTask被用来去封装一个Callable或者Runnable,一个FutureTask能够被submit作为一个Executor

3、FutureTask 的线程安全由CAS来保证。

二、源码分析

1、成员属性

 public class FutureTask<V> implements RunnableFuture<V> {
        //state表示的任务的状态
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
        //任务
        private Callable<V> callable;
        //存储任务完成以后的结果
        private Object outcome; 
        //执行当前任务的线程
        private volatile Thread runner;
        //执行当前任务被阻塞的线程
        private volatile WaitNode waiters;
    }

可能有的状态转换:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

注意:state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性。

2、构造函数

 public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW; 
        }
    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW; 
        }

很好的诠释了FutureTask封装了Runnable或Callable,构造完成后将任务的状态变为NEW。同时注意,封装Runnable时用的Executors的静态方法callable

顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
       throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

所以,FutureTask封装Runnable使用了适配器模式的设计模式

3、核心方法

 //运行任务的方法
    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;  //得到当前任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();  //当前任务调用call方法,执行,同时,执行完后将结果返回
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)  //表示任务执行成功
                        set(result);  //CAS改变任务的状态从NEW->COMPLETING->NORMAL,同时将任务返回的结果保存到outcome属性中,再移除并唤醒所有等待线程
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;  //将任务成功执行完后返回的结果保存到outcome中
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终的状态,表示任务结束
                finishCompletion();   //移除并唤醒所有等待线程
            }
        }
    //该方法用于移除并唤醒所有等待线程
    private void finishCompletion() {
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);  //唤醒
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
            done();
            callable = null;    
        }
    public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();   //打断
                    } finally { // 设置成为最终态INTERRUPTED
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();  //移除并唤醒所有等待线程
            }
            return true;
        }
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);  //如果任务没有完成或者其他的问题,将阻塞;创建一个新节点存入阻塞栈中
            return report(s);
        }
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }

三、示例

常用使用方式:

  • 第一种方式: Future + ExecutorService
  • 第二种方式: FutureTask + ExecutorService
  • 第三种方式: FutureTask + Thread

第一种方式:Future + ExecutorService

public class FutureDemo {
      public static void main(String[] args) {
          ExecutorService executorService = Executors.newCachedThreadPool();
          Future future = executorService.submit(new Callable<Object>() {
              @Override
              public Object call() throws Exception {
                  Long start = System.currentTimeMillis();
                  while (true) {
                      Long current = System.currentTimeMillis();
                     if ((current - start) > 1000) {
                         return 1;
                     }
                 }
             }
         });
         try {
             Integer result = (Integer)future.get();
             System.out.println(result);
         }catch (Exception e){
             e.printStackTrace();
         }
     }
}

第二种方式:FutureTask + ExecutorService

ExecutorService executor = Executors.newCachedThreadPool();
          Task task = new Task();
          FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
          executor.submit(futureTask);

第三种方式:FutureTask + Thread

FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
        Thread thread = new Thread(futureTask);
        thread.setName("Task thread");
        thread.start();

四、总结

1、FutureTask用来封装Runnable或者Callable接口,可以当成一个任务。

2、在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。一个FutureTask对象可以对调用了Callable和Runnable的对象进行包装,由于FutureTask也是调用了Runnable接口所以它可以提交给Executor来执行。

3、FutureTask可用于异步获取执行结果或取消执行任务的场景,通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

4、FutureTask间接继承了Runnable和Callable

5、FutureTask的线程安全由CAS操作来保证

6、FutureTask结果返回机制 :只有任务成功执行完成后,通过get方法能够得到任务返回的结果,其他情况都会导致阻塞。

到此这篇关于Java中的FutureTask源码解析的文章就介绍到这了,更多相关FutureTask源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现的迷宫游戏

    Java实现的迷宫游戏

    这篇文章主要介绍了如何用Java实现一个迷宫游戏,本仓库代码是经过eclipse编译运行过的,一般情况下将本仓库代码下载下来之后,使用eclipse编译直接可以运行。
    2021-04-04
  • activiti获取流程图实例

    activiti获取流程图实例

    这篇文章主要介绍了activiti获取流程图的方法,需要的朋友可以参考下
    2014-08-08
  • JAVA删除字符串固定下标字串的实现

    JAVA删除字符串固定下标字串的实现

    本文主要介绍了JAVA删除字符串固定下标字串的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • 剑指Offer之Java算法习题精讲链表专题篇

    剑指Offer之Java算法习题精讲链表专题篇

    跟着思路走,之后从简单题入手,反复去看,做过之后可能会忘记,之后再做一次,记不住就反复做,反复寻求思路和规律,慢慢积累就会发现质的变化
    2022-03-03
  • Java判断两个浮点数相等

    Java判断两个浮点数相等

    本文主要介绍了Java判断两个浮点数相等,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-05-05
  • mybatis中的if test判断入参的值问题

    mybatis中的if test判断入参的值问题

    这篇文章主要介绍了mybatis中的if test判断入参的值问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • 在CentOS上安装Java 17并实现多版本共存的详细教程

    在CentOS上安装Java 17并实现多版本共存的详细教程

    在现代软件开发中,Java 作为一种广泛使用的编程语言,其版本更新频繁,不同项目可能依赖不同版本的 Java 运行环境,CentOS 作为一款流行的 Linux 发行版,常被用于服务器部署和开发环境,本文将详细介绍如何在 CentOS 上安装 Java 17,并实现与现有 Java 8 的多版本共存
    2025-03-03
  • SpringKafka消息发布之KafkaTemplate与事务支持功能

    SpringKafka消息发布之KafkaTemplate与事务支持功能

    通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支持特性尤为重要,它确保了在分布式环境中的数据一致性,感兴趣的朋友一起看看吧
    2025-04-04
  • java后台如何接收get请求传过来的数组

    java后台如何接收get请求传过来的数组

    这篇文章主要介绍了java后台如何接收get请求传过来的数组问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • java 三元操作符用法说明

    java 三元操作符用法说明

    这篇文章主要介绍了java 三元操作符用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10

最新评论