AQS同步组件CyclicBarrier循环屏障用例剖析

 更新时间:2022年08月07日 14:35:22   作者:共饮一杯无  
这篇文章主要为大家介绍了AQS同步组件CyclicBarrier循环屏障用例剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

CyclicBarrier原理

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

当某个线程调用了await方法之后,就会进入等待状态,并将计数器+1,直到所有线程调用await方法使计数器为CyclicBarrier设置的值,才可以继续执行,由于计数器可以重复使用,所以我们又叫它循环屏障。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

源码分析

    /**
     * 创建一个新的CyclicBarrier当给定数量的参与方(线程)等待它时,它将触发,
     * 并且在障碍触发时不执行预定义的操作。
     *
     * @param  在barrier被触发之前必须调用await()的线程数
     * @throws IllegalArgumentException 如果parties小于1抛出异常
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

/**
     * 
     * 当前线程调用await方法的线程告知CyclicBarrier已经到达屏障,然后当前线程被阻塞
     *
     * @return 当前线程的到达索引,其中索引为- 1表示第一个到达的,0表示最后一个到达的
     * @throws InterruptedException 如果当前线程在等待时被中断
     * @throws BrokenBarrierException 如果另一个线程在当前线程等待时被中断或超时,
     * 或者屏障被重置,或者在调用await方法时屏障被破坏,或者屏障操作(如果存在)由于异常而失败
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

使用案例

await()

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready {}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        barrier.await();
        log.info("{} continue", threadNum);
    }

输出结果:

16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue

通过输出结果可以知道,每次屏障会阻塞5个线程,5个线程执行后计数器达到预设值,继续执行后续操作。

await(long timeout, TimeUnit unit)

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready{}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        //等待2s.为了使在发生异常的时候,不影响其他线程,一定要catch
        //由于设置了超时时间后阻塞的线程可能会被中断,抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
        try {
            barrier.await(2, TimeUnit.SECONDS);
        }catch (Exception e){
            //查看执行异常的线程
            log.info("线程{} 执行异常,阻塞被中断?{}",threadNum,barrier.isBroken());
        }
        log.info("{} continue", threadNum);
    }

输出结果:

17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 线程0 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 线程2 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程1 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程3 执行异常,阻塞被中断?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程4 执行异常,阻塞被中断?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程5 执行异常,阻塞被中断?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程6 执行异常,阻塞被中断?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程7 执行异常,阻塞被中断?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程8 执行异常,阻塞被中断?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程9 执行异常,阻塞被中断?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue

CyclicBarrier(int parties, Runnable barrierAction)

 /**
     * 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

输出结果:

17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

以上就是AQS同步组件CyclicBarrier循环屏障用例剖析的详细内容,更多关于AQS同步组件CyclicBarrier的资料请关注脚本之家其它相关文章!

相关文章

  • Java中的LinkedList底层源码分析

    Java中的LinkedList底层源码分析

    这篇文章主要介绍了Java中的LinkedList底层源码分析,底层基于双向链表,往LinkedList中间插入元素时,不需要移动大量的元素,只需要修改前后节点的指针,速度快,需要的朋友可以参考下
    2023-12-12
  • Spring Security安全框架之记住我功能

    Spring Security安全框架之记住我功能

    这篇文章主要介绍了Spring Security安全框架之记住我,本次就来探究如何实现这种自动登录、记住我的功能,通过实例代码图文相结合给大家介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • springboot实现拦截器的3种方式及异步执行的思考

    springboot实现拦截器的3种方式及异步执行的思考

    实际项目中,我们经常需要输出请求参数,响应结果,方法耗时,统一的权限校验等。本文首先为大家介绍 HTTP 请求中三种常见的拦截实现,并且比较一下其中的差异。感兴趣的可以了解一下
    2021-07-07
  • 配置Spring4.0注解Cache+Redis缓存的用法

    配置Spring4.0注解Cache+Redis缓存的用法

    本篇文章主要介绍了详解配置Spring4.0注解Cache+Redis缓存的用法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-04-04
  • 关于设置Mybatis打印调试sql的两种方式

    关于设置Mybatis打印调试sql的两种方式

    这篇文章主要介绍了关于设置Mybatis打印调试sql的两种方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-08-08
  • 轻松掌握Java状态模式

    轻松掌握Java状态模式

    这篇文章主要帮助大家轻松掌握Java状态模式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-09-09
  • java中Ajax与Axios的使用小结

    java中Ajax与Axios的使用小结

    在项目中我们经常会遇到需要向请求头中添加消息的场景,本文主要介绍了java中Ajax与Axios的使用小结,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧
    2024-02-02
  • 讲解Java设计模式编程中的建造者模式与原型模式

    讲解Java设计模式编程中的建造者模式与原型模式

    这篇文章主要介绍了Java设计模式编程中的建造者模式与原型模式,设计模式有利于团队开发过程中的代码维护,需要的朋友可以参考下
    2016-02-02
  • springboot集成junit编写单元测试实战

    springboot集成junit编写单元测试实战

    在做单元测试时,代码覆盖率常常被拿来作为衡量测试好坏的指标,本文主要介绍了springboot集成junit编写单元测试实战,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • java开发主流定时任务解决方案全横评详解

    java开发主流定时任务解决方案全横评详解

    这篇文章主要为大家介绍了java开发主流定时任务解决方案全横评详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09

最新评论