Java中的BlockingQueue阻塞队列原理以及实现详解

 更新时间:2023年12月22日 08:36:25   作者:huisheng_qaq  
这篇文章主要介绍了Java中的BlockingQueue阻塞队列原理以及实现详解,在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,需要的朋友可以参考下

一,BlockingQueue

在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。

用一句话描述这个阻塞队列就是:它是线程的一个通信工具,在任意时刻,不管并发有多高,在单jvm进程上,同一时间永远只有一个线程能够对队列进行入队和出队的操作,它的特性是在任意时刻只有一个线程可以进行take或者put操作。因此这个队列是一个线程安全的队列。

比较适用于生产者和消费者的场景,因此适用的应用场景如下 线程池,springCloud-Eureka的三级缓存,Nacos,Netty,RakectMq等

所有的阻塞队列都都实现了对这个BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E>

1,主要常用的队列有如下

ArrayBlockingQueue: 由数组支持的有界队列

LinkedBlockingQueue: 由链接节点支持的可选有界队列

PriorityBlockingQueue: 由优先级堆支持的无界优先级队列

DelayQueue: 由优先级堆支持的、基于时间的调度队列

2,基本工作原理实现如下

1,以一个有界队列为例,首先消费者这边获取到锁,然后会生产商品,然后会往队列中填满数据,队列填满之后,生产者端会进行阻塞,同时会释放这把锁,并且会通知这个消费者赶紧去消费。当然内部也做了很多事情,不一定就是说一定要阻塞队列满了之后才会去唤醒生产者去消费,而是消费者那边也会有一个监听事件,只有队列不为空,就会有这个消费者来消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PwPGs9DF-1657455705341)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1657445347685.png)]

2,消费者在接收到生产者的通知之后呢,就会先去获取到这把锁,然后对里面的产品进行消费,当队列里面的产品都被消费完成之后,消费者这边又会释放这把锁,然后将自身阻塞,并同时去唤醒这个生产者继续生产产品。

在这里插入图片描述

3,生产者又获取到锁,然后重复执行第一步。

3,基本api使用如下

在这里插入图片描述

在这里插入图片描述

二,源码剖析

在了解过一定的工作原理之后,接下来可以对源码分析一波。

2.1,ArrayBlockingQueue

这里主要通过这个ArrayBlockingQueue为例,来描述一下这个阻塞队列的工作流程

BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

这个构造方法里面有如下参数

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); //非公平锁
    notEmpty = lock.newCondition(); //条件对象,用于唤醒指定线程
    notFull =  lock.newCondition(); //条件对象
}

生产者会向队列中put产品,生产者后会持有锁,此时会向队列中存放产品,如果队列满了,则会阻塞自己,并且在最后会释放锁。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //生产者加锁
    lock.lockInterruptibly();
    try {
        while (count == items.length) //如果队列满了,则会阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock(); //释放锁
    }
}

既然涉及到ReentrantLock,那么就用从之前的AQS里面讲起了,这里面这要是一个CLH同步等待队列,由一个双向链表和一个同步阻塞器组成,同步阻塞器会有一个state和一个exclusiveOwnerThread状态组成,state=0表示当前没有对象获取到锁,可以来竞争锁。每个结点由一个前驱指针和一个后继指针,并且里面有一个waitStatus等待状态,该状态主要表示下一个结点的存活状态。

在这里插入图片描述

这里的话不会像之前一样使用这个CLH同步等待队列,而是加入了一种新的Condition条件等待队列,如下图。由firstWaiter和nextWaiter组成的单向链表队列,里面的waitStatus为CONDITION:-2 。也就是说如果当前生产者结点后面的结点又是一个生产者节点,因为期间可能存在多个生产者的线程,而为了唤醒接下来的消费者,就会创建一个条件等待队列,去存储后面的生产者结点。 就是说在CLH同步等待队列中,当前结点为生产者的话,在阻塞队列满了之后,如果CLH中的下一个节点还是生产者,则会将waitStatus的状态设置成-2,并将下一个节点移动到这个条件等待队列里面并进行排队,如果下一个结点还是,又会将下一个结点移动到这个条件等待队列里面并进行排队。知道下一个结点是消费者为止。

在这里插入图片描述

await()释放锁的流程如下

public final void await() throws InterruptedException {
    //线程是否被中断,如果被中断,直接抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //条件等待队列,会构建一个新的队列
    Node node = addConditionWaiter();
    //释放锁,并对对应的结点进行唤醒操作
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断当前结点是在条件队列里面还是在同步队列里面
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

构建条件等待队列如下

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

出队,消费者在获取产品时,产品就会出队,与此同时,在队列出队成功之后,队列中就会有一个空位,会调用notFull.signal()方法,通知生产者可以去生产产品了。并将这个条件等待队列放回这个CLH队列里面,只有在CLH队列里面才会获取锁。最后在CLH中才能进行unPark释放锁的操作。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //队列中有空位,通知生产者生产产品
    notFull.signal();
    return x;
}

消费者获取产品

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

三,总结

BlockingQueue也是基于这个AQS的方式实现的,主要是利用这个生产者和消费者这个模型来实现。

通过这个AQS中的CLH同步队列来对节点的锁的阻塞和释放,期间利用了这个条件等待队列来实现,如果存在多个生产者的线程的情况下,就会将这些线程加入到一个条件等待的队列里面。

并将这个节点的状态改为-2,condition状态。

在全部进入条件等待队列之后,这个锁还在并没有释放,因此最后又需要将这个条件等待队列里面的结点加回到CLH同步队列中,再进行排队的释放这个锁。结点出队的时候,然后生产者会通过一个singal监听这个消费者,每当这个阻塞队列里面出队,有一个位置的的时候,生产者就会生产这个产品。

消费者也会监听这个队列,队列中只要不为空,就回去消费队列中的产品。

获取锁的条件 只有在CLH队列里等待的Node结点并且前驱结点的 waitStatus 为sinal = -1的可被唤醒的结点。

条件队列里面的这些节点是不能获取到锁的。

到此这篇关于Java中的BlockingQueue阻塞队列原理以及实现详解的文章就介绍到这了,更多相关BlockingQueue阻塞队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java自带的四种线程池实例详解

    java自带的四种线程池实例详解

    java线程的创建非常昂贵,需要JVM和OS(操作系统)互相配合完成大量的工作,下面这篇文章主要给大家介绍了关于java自带的四种线程池的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2022-04-04
  • HttpServletRequestWrapper干预Request处理流程解析

    HttpServletRequestWrapper干预Request处理流程解析

    这篇文章主要分析在 Tomcat的处理 http 请求的流程中干预 Request对象, 通过基于HttpServletRequestWrapper和 Filter组合进行干预,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2023-09-09
  • java编程枚举类型那些事!枚举类型定义和重写枚举的方法

    java编程枚举类型那些事!枚举类型定义和重写枚举的方法

    本文主要介绍了枚举类型的有关内容,涉及简单的枚举类型定义,以及枚举类型的值在Java中的定义方法,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • java实现的计算器功能示例【基于swing组件】

    java实现的计算器功能示例【基于swing组件】

    这篇文章主要介绍了java实现的计算器功能,结合实例形式分析了java基于swing组件实现计算器功能相关运算操作技巧,需要的朋友可以参考下
    2017-12-12
  • JavaWeb实现文件上传功能详解

    JavaWeb实现文件上传功能详解

    这篇文章主要介绍了JavaWeb实现文件上传功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • springboot表单提交之validator校验

    springboot表单提交之validator校验

    在前台表单验证的时候,通常会校验一些数据的可行性,比如是否为空,长度,身份证,邮箱等等,这篇文章主要给大家介绍了关于springboot表单提交之validator校验的相关资料,需要的朋友可以参考下
    2021-05-05
  • HttpClient详细使用示例详解

    HttpClient详细使用示例详解

    这篇文章主要介绍了HttpClient详细使用示例详解,本文给大家介绍的非常想详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • SpringBoot 整合Security权限控制的初步配置

    SpringBoot 整合Security权限控制的初步配置

    这篇文章主要为大家介绍了SpringBoot 整合Security权限控制的初步配置实例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • java切分字符串的2种方法实例

    java切分字符串的2种方法实例

    在我们日常工作中经常遇到截取字符串的需求,下面这篇文章主要给大家介绍了关于java切分字符串的2种方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06
  • 关于Mybatis的@param注解及多个传参

    关于Mybatis的@param注解及多个传参

    这篇文章主要介绍了关于Mybatis的@param注解及多个传参,@Param的作用就是给参数命名,比如在mapper里面某方法A(int id),当添加注解后A(@Param(“userId”) int id),也就是说外部想要取出传入的id值,只需要取它的参数名userId就可以了,需要的朋友可以参考下
    2023-05-05

最新评论