Java中的LinkedBlockingQueue源码解析

 更新时间:2023年12月26日 10:52:15   作者:茫然背影  
这篇文章主要介绍了Java中的LinkedBlockingQueue源码解析,LinkedBlockingQueue底层是一个链表(可以指定容量,默认是Integer.MAX_VALUE),维持了两把锁,一把锁用于入队,一把锁用于出队,并且使用一个AtomicInterger类型的变量保证线程安全,需要的朋友可以参考下

基本认识

LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。主要对象的定义如下:

    //容量,如果没有指定,该值为Integer.MAX_VALUE;
    private final int capacity;
    //当前队列中的元素
    private final AtomicInteger count = new AtomicInteger();
    //队列头节点,始终满足head.item==null
    transient Node<E> head;
    //队列的尾节点,始终满足last.next==null
    private transient Node<E> last;
    //用于出队的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //当队列为空时,保存执行出队的线程
    private final Condition notEmpty = takeLock.newCondition();
    //用于入队的锁
    private final ReentrantLock putLock = new ReentrantLock();
    //当队列满时,保存执行入队的线程
    private final Condition notFull = putLock.newCondition();

构造方法

LinkedBlockingQueue的构造方法有三个,分别如下:

从构造方法中可以得出3点结论: 

1. 当调用无参的构造方法时,容量是int的最大值 

2. 队列中至少包含一个节点,哪怕队列对外表现为空 

3. LinkedBlockingQueue不支持null元素

 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//last和head在队列为空时都存在,所以队列中至少有一个节点
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put(E e)方法

首先获得入队的锁putLock,判断队列是否已满:count == capacity

  • 未满:将节点链入尾部,元素数量+1,此时如果发现队列还没满还可以生产,就唤醒其他生产线程notFull.signal也进行生产,生产一个后,如果此时队列是有空变为非空的(证明此时所有的消费者都在阻塞notEmpty.await(),此时必须由生产者进行唤醒,不然无法向下进行,也就是从空到非空这个时候必须由生产者唤醒消费者,之后的就是消费者唤醒自己的兄弟姐妹们),就唤醒消费者队列notEmpty的头结点notEmpty.signal,通知消费这消费这个消费者消费后发现还有可以消费的元素,就通知notEmpty队列里的头结点,就这样notEmpty队列一次被唤醒了,notEmpty只在第一次是被生产者唤醒的。
  • 已满:就调用notFull.await阻塞,释放锁,从AQS队列移除,将生产者加入到notFull条件队列尾部,等待着被唤醒后继续生产
public void put(E e) throws InterruptedException {
        //不允许元素为null
        if (e == null) throw new NullPointerException();
        int c = -1;
        //以当前元素新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获得入队的锁
        putLock.lockInterruptibly();
        try {
            //如果队列已满,那么将该线程加入到Condition的等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            //将节点入队
            enqueue(node);
            //得到插入之前队列的元素个数
            c = count.getAndIncrement();
            //如果还可以插入元素,那么释放等待的入队线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //解锁
            putLock.unlock();
        }
        //通知出队线程队列非空
        if (c == 0)
            signalNotEmpty();
    }
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock
        takeLock.lock();
        try {
            //释放notEmpty条件队列中的第一个等待线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

E take()方法

首先获取takeLodck,判断队列是否可以消费:count.get() == 0

  • 可以消费:返回头节点的下一个节点(头结点不存数据,是永远存在的一个节点),并移除此节点,元素数量-1,如果此时发现队列中还有元素可以消费,就唤醒其他消费者notEmpty.signal,进行消费。如果此时队列是从满变为未满的(证明此时所有的生产者都在阻塞,此时必须有消费者唤醒生产者的第一个节点,之后就是生产者唤醒自己的兄弟姐妹了),
  • 不可以消费:调用notEmpty.awati进行阻塞,释放锁,从AQS队列移除,进入NotEmpty队列尾部,等待被唤醒
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock锁
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,那么加入到notEmpty条件的等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //得到队头元素
            x = dequeue();
            //得到取走一个元素之前队列的元素个数
            c = count.getAndDecrement();
            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果队列中的元素从满到非满,通知put线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

remove()方法

remove(Object)操作会从队列的头遍历到尾,用到了队列的两端,所以需要对两端加锁,而对两端加锁就需要获取两把锁;

remove()是从头结点删除,所以这个方法只需要获取take锁。

 public boolean remove(Object o) {
        //因为队列不包含null元素,返回false
        if (o == null) return false;
        //获取两把锁
        fullyLock();
        try {
            //从头的下一个节点开始遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //如果匹配,那么将节点从队列中移除,trail表示前驱节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //释放两把锁
            fullyUnlock();
        }
    }

size()方法

由于count是一个AtomicInteger的变量,所以该方法是一个原子性的操作,是线程安全的。

public int size() {
        return count.get();
    }

LinkedBlockingDeque

从上面的字段,可以得到LinkedBlockingDeque内部只有一把锁以及该锁上关联的两个条件,同一时刻只有一个线程可以在队头或者队尾执行入队或出队操作。可以发现这点和LinkedBlockingQueue不同,LinkedBlockingQueue可以同时有两个线程在两端执行操作。  

LinkedBlockingQueue实现总结

LinkedBlockingQueue底层是一个链表(可以指定容量,默认是Integer.MAX_VALUE),维持了两把锁,一把锁用于入队,一把锁用于出队,并且使用一个AtomicInterger类型的变量保证线程安全,AtomicInterger:表示当前队列中含有的元素个数:

  • 生产者不断进行生产会向链表尾部不断链入元素,直到达到容量后,此时所有生产者依次进入notFull条件队列进行阻塞,此时如果任意一个消费者消费了一个元素,就会通知notFull队列第一个节点进行生产,notFull第一个节点生产完毕后发现还有位置可以生产就会唤醒notFull的第二个节点,notFull第二个节点生产后发现还有位置则唤醒notFull第三个节点,就这样就可以唤醒notFull里的所有生产者
  • 消费从链表头部开始向后消费,只要还有元素就可以不断消费,消费完所有的元素后,此时所有消费者依次进入notEmpty条件队列进行阻塞, 这个时候一旦生产者生产了一个元素,就会唤醒notEmpty的第一个节点,而这个节点消费完后如果发现还有元素可以消费,就会唤醒自己的兄弟姐妹(notEmpty的第二个节点),notEmpty的第二个节点消费完后如果发现还有元素可以消费就会再唤醒notEmpty的第三个节点,就这样就唤醒了notEmpty里的所有的消费者
  • 消费者一直在砍头,生产者一直在添尾

到此这篇关于Java中的LinkedBlockingQueue源码解析的文章就介绍到这了,更多相关LinkedBlockingQueue源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring-boot的debug调试代码实例

    Spring-boot的debug调试代码实例

    这篇文章主要介绍了Spring-boot的debug调试代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Java使用多线程批次查询大量数据(Callable返回数据)方式

    Java使用多线程批次查询大量数据(Callable返回数据)方式

    今天给大家分享Java使用多线程批次查询大量数据(Callable返回数据)方式,多线程有好几种方式,今天说的方式比较好,实现Callable<> 这种方式能返回查询的数据,加上Future异步获取方式,查询效率大大加快,感兴趣的朋友一起看看吧
    2023-11-11
  • SpringBoot AOP统一处理Web请求日志的示例代码

    SpringBoot AOP统一处理Web请求日志的示例代码

    springboot有很多方法处理日志,例如拦截器,aop切面,service中代码记录等,下面这篇文章主要给大家介绍了关于SpringBoot AOP统一处理Web请求日志的相关资料,需要的朋友可以参考下
    2023-02-02
  • 如何解决线程太多导致java socket连接池出现的问题

    如何解决线程太多导致java socket连接池出现的问题

    这篇文章主要介绍了如何解决线程太多导致socket连接池出现的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Java 定时任务技术趋势详情

    Java 定时任务技术趋势详情

    这篇文章主要介绍了Java 定时任务技术趋势详情,定时任务是每个业务常见的需求,比如每分钟扫描超时支付的订单,每小时清理一次数据库历史数据,每天统计前一天的数据并生成报表等,下文更多相关资料,需要的小伙伴可以参考一下
    2022-05-05
  • windows命令行中java和javac、javap使用详解(java编译命令)

    windows命令行中java和javac、javap使用详解(java编译命令)

    最近重新复习了一下java基础,这里便讲讲对于一个类文件如何编译、运行、反编译的。也让自己加深一下印象
    2014-03-03
  • java应用程序如何自定义log4j配置文件的位置

    java应用程序如何自定义log4j配置文件的位置

    这篇文章主要介绍了java应用程序如何自定义log4j配置文件的位置,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 如何利用jwt来保护你的接口服务

    如何利用jwt来保护你的接口服务

    项目软件要对外提供部分定制接口,为了保证软件数据的安全性,这篇文章主要给大家介绍了关于如何利用jwt来保护你的接口服务的相关资料,需要的朋友可以参考下
    2021-08-08
  • 详细介绍Java函数式接口

    详细介绍Java函数式接口

    函数式接口在Java中是指有且仅有一个抽象方法的接口。当然接口中可以包含其他的方法默认、静态、私有,具体内容请参考下面文章内容
    2021-09-09
  • Springboot中登录后关于cookie和session拦截问题的案例分析

    Springboot中登录后关于cookie和session拦截问题的案例分析

    这篇文章主要介绍了Springboot中登录后关于cookie和session拦截案例,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08

最新评论