Java中的DelayQueue源码解析

 更新时间:2023年12月13日 08:32:59   作者:demon7552003  
这篇文章主要介绍了Java中的DelayQueue源码解析,一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素,只有延时期满后才能从队列中获取元素,需要的朋友可以参考下

介绍

一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

DelayQueue可以运用在以下应用场景:

1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

数据结构

public interface Delayed extends Comparable<Delayed> {
 
    /**
     * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
     */
    long getDelay(TimeUnit unit);
}

 getDelay方法一般用内部存储的事件,减去当前事件,即为剩余延迟事件

属性

  private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
 
    /**
    *用于优化内部阻塞通知的线程
     */
    private Thread leader = null;
    private final Condition available = lock.newCondition();

以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。

leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程,

DelayQueue实现Leader-Folloer pattern

1、当存在多个take线程时,同时只生效一个,即,leader线程

2、当leader存在时,其它的take线程均为follower,其等待是通过condition实现的

3、当leader不存在时,当前线程即成为leader,在delay之后,将leader角色释放还原

4、最后如果队列还有内容,且leader空缺,则调用一次condition的signal,唤醒挂起的take线程,其中之一将成为新的leader

5、最后在finally中释放锁

方法实现

offer,poll,peek

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            //如果插入元素是第一个元素
            if (q.peek() == e) {
                //leader设置为null
                leader = null;
                //唤醒
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            //如果未到期,则返回null,否则删除
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    //到期,则poll
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)//nanos<delay,表示超时剩余时间小于到期时间,
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        //设置当前线程为leader
                        leader = thisThread;
                        try {
                            //等待条件
                            long timeLeft = available.awaitNanos(delay);
                            //剩余超时时间
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

put,take

/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获取可中断锁。
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 从优先级队列中获取队列头元素
                E first = q.peek();
                if (first == null)
                    // 无元素,当前线程加入等待队列,并阻塞
                    available.await();
                else {
                    // 通过getDelay 方法获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        // 延迟时间到期,获取并删除头部元素。
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 线程节点进入等待队列 x 纳秒。
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader == null且还存在元素的话,唤醒一个消费线程。
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
   public void put(E e) {
        offer(e);
    }

take()方法逻辑:

1.获取锁

2.取出优先级队列q的首元素

3.如果元素q的队首/队列为空,阻塞

4.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法

5.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露

6.循环以上操作,直到return

leader作用

如果leader不为null,说明已经有消费者线程拿到锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。

到此这篇关于Java中的DelayQueue源码解析的文章就介绍到这了,更多相关DelayQueue源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C++排序算法之桶排序原理及实现详解

    C++排序算法之桶排序原理及实现详解

    这篇文章主要介绍了C++排序算法之桶排序原理及实现详解, C++ 桶排序是一种线性时间复杂度的排序算法,它通过将待排序元素分配到不同的桶中,然后对每个桶中的元素进行排序,最后将所有桶中的元素按顺序合并得到有序序列,需要的朋友可以参考下
    2023-10-10
  • Spring为何需要三级缓存解决循环依赖详解

    Spring为何需要三级缓存解决循环依赖详解

    这篇文章主要给大家介绍了关于Spring为何需要三级缓存解决循环依赖,而不是二级缓存的相关资料,这个也是一个Spring的高频面试题,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • Spring整合quartz做定时任务的示例代码

    Spring整合quartz做定时任务的示例代码

    这篇文章主要介绍了在spring项目使用quartz做定时任务,首先我这里的项目已经是一个可以跑起来的完整项目,web.xml里面的配置我就不贴出来了,具体实例代码跟随小编一起看看吧
    2022-01-01
  • Maven中两个命令clean 和 install的使用

    Maven中两个命令clean 和 install的使用

    Maven是一个项目管理和自动构建工具,clean命令用于删除项目中由先前构建生成的target目录,install命令用于将打包好的jar包安装到本地仓库中,供其他项目依赖使用,下面就来详细的介绍一下这两个命令
    2024-09-09
  • 详解MyBatis配置typeAliases的方法

    详解MyBatis配置typeAliases的方法

    这篇文章主要介绍了详解MyBatis配置typeAliases的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • JAVA CountDownLatch(倒计时计数器)用法实例

    JAVA CountDownLatch(倒计时计数器)用法实例

    这篇文章主要介绍了JAVA CountDownLatch(倒计时计数器)用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • 基于JavaMail的Java邮件发送

    基于JavaMail的Java邮件发送

    电子邮件的应用非常广泛,例如在某网站注册了一个账户,自动发送一封欢迎邮件,通过邮件找回密码,自动批量发送活动信息等。本文将简单介绍如何通过 Java 代码来创建电子邮件,并连接邮件服务器发送邮件
    2021-10-10
  • Spring中WebDataBinder使用详解

    Spring中WebDataBinder使用详解

    这篇文章主要为大家详细介绍了Spring中WebDataBinder的使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07
  • 详解Java语言中一个字符占几个字节?

    详解Java语言中一个字符占几个字节?

    这篇文章主要介绍了Java语言中一个字符占几个字节,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • Java中的枚举enum详细解读

    Java中的枚举enum详细解读

    这篇文章主要介绍了Java中的枚举enum详细解读,当我们使用enum关键字开发一个枚举类时,默认会继承Enum类,而且是一个final类,当有多个枚举对象时,使用逗号 ,隔开,最后一个用分号;结尾,需要的朋友可以参考下
    2024-01-01

最新评论