Java中的DelayQueue源码解析

 更新时间:2023年12月13日 08:32:59   作者:demon7552003  
这篇文章主要介绍了Java中的DelayQueue源码解析,一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素,只有延时期满后才能从队列中获取元素,需要的朋友可以参考下

介绍

一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

DelayQueue可以运用在以下应用场景:

1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

数据结构

public interface Delayed extends Comparable<Delayed> {
 
    /**
     * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
     */
    long getDelay(TimeUnit unit);
}

 getDelay方法一般用内部存储的事件,减去当前事件,即为剩余延迟事件

属性

  private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
 
    /**
    *用于优化内部阻塞通知的线程
     */
    private Thread leader = null;
    private final Condition available = lock.newCondition();

以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。

leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程,

DelayQueue实现Leader-Folloer pattern

1、当存在多个take线程时,同时只生效一个,即,leader线程

2、当leader存在时,其它的take线程均为follower,其等待是通过condition实现的

3、当leader不存在时,当前线程即成为leader,在delay之后,将leader角色释放还原

4、最后如果队列还有内容,且leader空缺,则调用一次condition的signal,唤醒挂起的take线程,其中之一将成为新的leader

5、最后在finally中释放锁

方法实现

offer,poll,peek

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            //如果插入元素是第一个元素
            if (q.peek() == e) {
                //leader设置为null
                leader = null;
                //唤醒
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            //如果未到期,则返回null,否则删除
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    //到期,则poll
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)//nanos<delay,表示超时剩余时间小于到期时间,
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        //设置当前线程为leader
                        leader = thisThread;
                        try {
                            //等待条件
                            long timeLeft = available.awaitNanos(delay);
                            //剩余超时时间
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

put,take

/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获取可中断锁。
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 从优先级队列中获取队列头元素
                E first = q.peek();
                if (first == null)
                    // 无元素,当前线程加入等待队列,并阻塞
                    available.await();
                else {
                    // 通过getDelay 方法获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        // 延迟时间到期,获取并删除头部元素。
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 线程节点进入等待队列 x 纳秒。
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader == null且还存在元素的话,唤醒一个消费线程。
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
   public void put(E e) {
        offer(e);
    }

take()方法逻辑:

1.获取锁

2.取出优先级队列q的首元素

3.如果元素q的队首/队列为空,阻塞

4.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法

5.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露

6.循环以上操作,直到return

leader作用

如果leader不为null,说明已经有消费者线程拿到锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。

到此这篇关于Java中的DelayQueue源码解析的文章就介绍到这了,更多相关DelayQueue源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring boot实现数据库读写分离的方法

    Spring boot实现数据库读写分离的方法

    本篇文章主要介绍了Spring boot实现数据库读写分离的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • Springboot如何根据实体类生成数据库表

    Springboot如何根据实体类生成数据库表

    这篇文章主要介绍了Springboot如何根据实体类生成数据库表的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java文件处理之使用itextpdf实现excel转pdf

    Java文件处理之使用itextpdf实现excel转pdf

    在文件处理中,经常有文件类型转换的使用场景,本文主要介绍了如何使用poi以及itextpdf完成excel转pdf的操作,需要的小伙伴可以参考一下
    2024-02-02
  • Java正确使用访问修饰符的姿势

    Java正确使用访问修饰符的姿势

    访问修饰符是Java语法中很基础的一部分,但是能正确的使用Java访问修饰符的程序员只在少数,下面这篇文章主要给大家介绍了关于Java正确使用访问修饰符的姿势,需要的朋友可以参考下
    2021-11-11
  • Mybatis的插件运行原理及如何编写一个插件

    Mybatis的插件运行原理及如何编写一个插件

    这篇文章主要介绍了Mybatis的插件运行原理及如何编写一个插件 ,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-07-07
  • SpringCloud微服务中跨域配置的方法详解

    SpringCloud微服务中跨域配置的方法详解

    在使用SpringCloud实现微服务时,经常会碰到前端页面访问多个二级域名的情况,跨域是首先要解决的问题。解决这个问题,可以从两方面入手,一种方案是在微服务各自的业务模块中实现,即在SpringBoot层实现,另外一种方案就是在Gateway层实现
    2023-02-02
  • Java中Boolean与字符串或者数字1和0的转换实例

    Java中Boolean与字符串或者数字1和0的转换实例

    下面小编就为大家带来一篇Java中Boolean与字符串或者数字1和0的转换实例。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • java多线程编程之从线程返回数据的两种方法

    java多线程编程之从线程返回数据的两种方法

    从线程中返回数据和向线程传递数据类似。也可以通过类成员以及回调函数来返回数据。但类成员在返回数据和传递数据时有一些区别,下面让我们来看看它们区别在哪
    2014-01-01
  • JDBC 实现通用的增删改查基础类方法

    JDBC 实现通用的增删改查基础类方法

    下面小编就为大家分享一篇JDBC 实现通用的增删改查基础类方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • Java聊天室之使用Socket实现传递对象

    Java聊天室之使用Socket实现传递对象

    这篇文章主要为大家详细介绍了Java简易聊天室之使用Socket实现传递对象功能,文中的示例代码讲解详细,具有一定的借鉴价值,需要的可以了解一下
    2022-10-10

最新评论