Java中的Phaser并发阶段器详解

 更新时间:2023年12月22日 08:57:16   作者:Java面试365  
这篇文章主要介绍了Java中的Phaser并发阶段器详解,Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景,需要的朋友可以参考下

Phaser并发阶段器

Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景。

我们可以回忆下CountDownLatch讲的是先指定N个线程,在N个线程干完活之前,其它线程都需要等待(导游等待旅游团所有人上车才能开车),而CyclicBarrier讲的是先指定N个线程。等N个线程到齐了大家同时干活(多个驴友相约去旅游,先到的需要等待后来的),而Phaser是两者的结合,可以理解为先指定N个线程,等N个线程到齐后开始干第一阶段的活,等第一阶段所有的线程都干完活了,接着N个线程开始干第二阶段的活,直到所有的阶段完成工作,程序结束,当然需要注意的是每个阶段可以根据业务需要新增或者删除一些线程,并不是开始指定多少个线程每个阶段就必须有多少个线程。

入门体验

看了概念可能不容易理解,从一个小demo入手体验下

public class PhaserDemo1 {
    // 指定随机种子
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        // 将线程注册到phaser
        phaser.register();
        for (int i = 0; i <5 ; i++) {
            Task task = new Task(phaser);
            task.start();
        }
        phaser.arriveAndAwaitAdvance();
        System.out.println("all task execute close");
    }
    static class Task extends Thread{
        Phaser phaser;
        public Task(Phaser phaser){
            this.phaser = phaser;
            this.phaser.register();
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"开始执行");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"执行完毕");
                // 类似CountDownLatch中的 await
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

不知道有没有这样的疑惑,phaser.register是向phaser去注册这个线程,那么为什么主线程也需要注册呢?

其实很简单主线程需要等待所有子线程执行完毕才能继续往下面执行所以必须要phaser.arriveAndAwaitAdvance();阻塞等待,而这个语句是意思当前线程已经到达屏障,在此等待一段时间等条件满足后需要向下一个屏障继续执行,如果没有主线程的phaser.register,直接调用phaser.arriveAndAwaitAdvance,在源码中提到可能会有异常,所以必须在主程序中注册phaser.register();

/* <p>It is a usage error for an unregistered party to invoke this
* method.  However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*/
译:
未注册方调用此函数是一个使用错误方法。但是,这个错误可能会导致
{@codeIllegalStateException}仅在一些后续操作这个相位器,如果有的话。

Phaser解决分科考试问题

从体验的示例中其实没看出其优势在哪里,上诉场景完全可以采用CountDownLatch,所以现在换一种场景来说明Phaser的优势。

假设某校举行期末考试,有三门考试语文、数学、英语,每门课允许学生提前交卷,只有当所有学生完成考试后才能举行下一次的考试,这就是典型的分阶段任务处理,示例图如下。

图片

将上诉场景语义化如下

public class PhaserExam {
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        // 一次初始化2个 相当于两次register
        Phaser phaser = new Phaser(2);
        for (int i = 0; i <2 ; i++) {
            Exam exam = new Exam(phaser,random.nextLong());
            exam.start();
        }
    }
    static class Exam extends Thread{
        Phaser phaser;
        Long id;
        public Exam(Phaser phaser,Long id){
            this.phaser = phaser;
            this.id = id;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"===开始语文考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束语文考试");
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName()+"===开始数学考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束数学考试");
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName()+"===开始英语考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束英语考试");
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

代码执行结果如下,可以看到三个阶段都是等待所有线程执行完毕后才往下执行,相当于多个栅栏。

图片

到这里请注意,通过Phaser类的构造方法构建的party数,也就是线程数需要和循环的次数对应,不然可能影响后续阶段器的正常运行。

两个重要状态

在Phaser内有2个重要状态,分别是phase和party,乍一看很难理解,他们的定义如下。

phase就是阶段,如上面提到的语文、数学、英语考试这每个考试对应一个阶段,不过phase是从0开始的,当所有任务执行完毕,准备进入下一个阶段时phase就会加一。

party对应注册到Phaser线程数,party初始值有两种形式

  • 方法一就是通过Phaser的有参构造初始化party值。
  • 方法二采用动态注册方法phaser.register()或phaser.bulkRegister(线程数)指定线程数,注销线程调用phaser.arriveAndDeregister()方法party值会减一。

Phaser常用API

Phaser常用API总结如下所示

// 获取Phaser阶段数,默认0
public final int getPhase();
// 向Phaser注册一个线程
public int register();    
// 向Phaser注册多个线程
public int bulkRegister(int parties);
// 获取已经注册的线程数,也就是重要状态party的值
public int getRegisteredParties();
// 到达并且等待其它线程到达
public int arriveAndAwaitAdvance();
// 到达后注销不等待其它线程,继续往下执行
public int arriveAndDeregister();
// 已到达线程数
public int getArrivedParties();
// 未到达线程数
public int getUnarrivedParties();
// Phaser是否结束 只有当party的数量是0或者调用方法forceTermination时才会结束
public boolean isTerminated();
// 结束Phaser
public void forceTermination();

代码演示如下

public class PhaserApiTest {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(5);
        System.out.println("当前阶段"+phaser.getPhase());
        System.out.println("注册线程数==="+phaser.getRegisteredParties());
        // 向phaser注册一个线程
        phaser.register();
        System.out.println("注册线程数==="+phaser.getRegisteredParties());
        // 向phaser注册多个线程,批量注册
        phaser.bulkRegister(4);
        System.out.println("注册线程数==="+phaser.getRegisteredParties());
        new Thread(()->{
            // 到达且等待
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName()+"===执行1");
        }).start();
        new Thread(()->{
            // 到达不等待,从phaser中注销一个线程
            phaser.arriveAndDeregister();
            System.out.println(Thread.currentThread().getName()+"===执行2");
        }).start();
        TimeUnit.SECONDS.sleep(3);
        System.out.println("已到达线程数==="+phaser.getArrivedParties());
        System.out.println("未到达线程数==="+phaser.getUnarrivedParties());
        System.out.println("Phaser是否结束"+phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("Phaser是否结束"+phaser.isTerminated());
    }
}

执行结果如下所示

图片

arriveAndAwaitAdvance解析

arriveAndAwaitAdvance是Phaser中一个重要实现阻塞的API,其实arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而来,两个方法的作用分别为

  • arrive:到达屏障但不阻塞,返回值为到达的阶段号。
  • awaitAdvance(int):接收一个 int 值的阶段号,在指定的屏障处阻塞。

测试代码如下

public class PhaserTestArrive {
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);
        for (int i = 0; i <5 ; i++) {
            new Task(i,phaser).start();
        }
        phaser.register();
        // 主线程需要调用arrive的原因是主线程注册的第六个线程还未到达,需要手动到达,才能调用awaitAdvance阻塞屏障
        phaser.arrive();
        // 因为Phaser线程数为6,所以即使5个线程已经到达,但是还差主线程的一个,目前阶段数就是0
        phaser.awaitAdvance(0);
        System.out.println("all task is end");
    }
    static class Task extends Thread{
        Phaser phaser;
        public Task(int num,Phaser phaser){
            super("Thread--"+String.valueOf(num));
            this.phaser = phaser;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"===task1 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task1 is end");
                // 到达且不等待
                phaser.arrive();
                System.out.println(Thread.currentThread().getName()+"===task2 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task2 is end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

中断响应

我们需要特别注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是响应中断的,其余全部不会响应中断所以不需要对其进行异常处理,演示如下

public static void main(String[] args) {
        Phaser phaser = new Phaser(3);
        Thread T1 = new Thread(()->{
            try {
                phaser.awaitAdvanceInterruptibly(phaser.getPhase());
            } catch (InterruptedException e) {
                System.out.println("中断异常");
                e.printStackTrace();
            }
            //phaser.arriveAndAwaitAdvance();
        });
        T1.start();
        T1.interrupt();
        phaser.arriveAndAwaitAdvance();
    }

图片

到此这篇关于Java中的Phaser并发阶段器详解的文章就介绍到这了,更多相关Phaser并发阶段器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 分布式医疗挂号系统SpringCache与Redis为数据字典添加缓存

    分布式医疗挂号系统SpringCache与Redis为数据字典添加缓存

    这篇文章主要为大家介绍了分布式医疗挂号系统SpringCache与Redis为数据字典添加缓存,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 关于mybatisPlus yml配置方式

    关于mybatisPlus yml配置方式

    这篇文章主要介绍了mybatisPlus yml配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • Spring Boot实现自动发送邮件

    Spring Boot实现自动发送邮件

    这篇文章主要为大家详细介绍了Spring Boot实现自动发送邮件,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-02-02
  • 字节二面SpringBoot可以同时处理多少请求

    字节二面SpringBoot可以同时处理多少请求

    这篇文章主要为大家介绍了字节二面之SpringBoot可以同时处理多少请求面试分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • Java中EnumMap代替序数索引代码详解

    Java中EnumMap代替序数索引代码详解

    这篇文章主要介绍了Java中EnumMap代替序数索引代码详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02
  • Java 深入浅出讲解代理模式

    Java 深入浅出讲解代理模式

    代理模式是Java常见的设计模式之一。所谓代理模式是指客户端并不直接调用实际的对象,而是通过调用代理,来间接的调用实际的对象
    2022-03-03
  • Mybatis配置错误:java.lang.ExceptionInInitializerError

    Mybatis配置错误:java.lang.ExceptionInInitializerError

    这篇文章主要介绍了Mybatis配置错误:java.lang.ExceptionInInitializerError的相关资料,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Scala可变参数列表,命名参数和参数缺省详解

    Scala可变参数列表,命名参数和参数缺省详解

    这篇文章主要介绍了Scala可变参数列表,命名参数和参数缺省详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • HashMap底层数据结构详细解析

    HashMap底层数据结构详细解析

    这篇文章主要介绍了HashMap底层数据结构详细解析,HashMap作为开发中常用的数据结构,也是面试中经常被问的知识点,因此作为开发者应该尽可能多的理解其底层的数据结构,需要的朋友可以参考下
    2023-11-11
  • 一文带你学会Spring JDBC的使用

    一文带你学会Spring JDBC的使用

    JDBC 就是 数据库开发 操作的 代名词,因为只要是现代商业项目的开发那么一定是离不开 数据库 的,不管你搞的是什么,只要是想使用动态的开发结构,那么一定就是 JDBC ,那么下面来教教大家传统JDBC的使用
    2022-09-09

最新评论