JAVA中的延迟队列DelayQueue应用解析

 更新时间:2023年12月14日 10:35:36   作者:戴陵FL  
这篇文章主要介绍了JAVA中的延迟队列DelayQueue应用解析,DelayQueue是一个根据元素的到期时间来排序的队列,而并非是一般的队列那样先进先出,最快过期的元素排在队首,越晚到期的元素排得越后,需要的朋友可以参考下

前言

最近在开发CRM管理系统时遇到一个需求:销售部门的人员在使用该系统时,可以从【线索公海】模块中 “领取” 潜在的客户线索到自己的【线索私海】模块中,成为自己私有的潜在客户线索,以便后期进行跟踪、开发,同时,也可以主动放弃该线索,将线索 “释放” 回【线索公海】中,若开发成功,则客户进入【客户私海】模块中,成为自己的潜在客户,若这时不想继续开发这个客户了,进行 “释放”,则该客户进入【客户公海】中以供所有销售进行 “领取”,谁领取到了,就进入相应销售的【客户私海】中

在这个基础上,我们希望实现这样一个功能: 用户在领取了线索后,若24小时内没有将线索成功开发为自己的潜在客户,则自动释放使之成为公海线索,并且48小时内冻结该线索(无法领取),同样,潜在客户60天内没有开发成正式客户,则自动释放该客户资源到公海中,同样是48小时内不能被重新认领

在这个场景下,我想到了DelayQueue

DelayQueue介绍

简单来说,DelayQueue是一个根据元素的到期时间来排序的队列,而并非是一般的队列那样先进先出,最快过期的元素排在队首,越晚到期的元素排得越后 使用时,元素必须实现Delayed接口,生产者线程往队列里添加元素时,会触发Delayed接口中的compareTo方法进行排序,消费者线索获取元素时,会调用Delayed接口中的getDelay方法来检查队首元素是否到期,getDelay方法返回的是离到期时间剩余的时间值,若getDelay返回的值小0或者等于0,则表示已到期,消费者线程取出进行消费,若getDelay方法返回的值大于0,则消费者线程会被阻塞,wait返回的时间值后,再从队列头部取出元素进行消费

数据结构

阅读DelayQueue的源码

在这里插入图片描述

可以看到它包含了: 一个PriorityQueue——PriorityQueue是一个优先级队列,它是一个没有阻塞功能的Queue,也就是说DelayQueue底层通过PriorityQueue来实现元素的存储

一个ReentrantLock锁

一个线程leader——DelayQueue使用类似Leader-Followr模式,即消费者线程要获取元素时,若元素还没过期,则消费者线程阻塞等待的时间即元素的剩余过期时间,即消费者线程等待的元素保证是最先过期的元素,这样消费者线程可以尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率

一个阻塞的条件Condition——实现出队时阻塞的功能

特性

DelayQueue是一个无界队列,因此入队时不会阻塞,与优先级队列入队相同 DelayQueue的特性主要在出队上 出队时: 1.若队列为空,则阻塞 2.若不为空,则检查堆顶的元素是否过期,剩余过期时间小于等于0则出队,若大于0,则:判断当前有无消费者线程作为leader正在等待获取元素,若leader不为null,则直接阻塞,若leader为null,则将当前消费者线程设为leader,并按照最早过期的时间进行阻塞

示意图:

在这里插入图片描述

过了2s后,元素5到期了,唤醒消费者线程1并获取元素5进行消费 同时把消费者线程2设为leader,此时元素4为堆顶元素,2s后到期,所以消费者线程2的阻塞时间设置为2s

在这里插入图片描述

又过了2s,元素4到期,唤醒消费者线程2并获取元素4进行消费 消费者线程1继续处理元素5

在这里插入图片描述

继续过2s后,若此时消费者线程1或者消费者线程2处理完任务,则继续获取元素进行消费,并且元素3刚刚好到期了 若此时两个线程都没有处理完任务,则会出现元素3到期了,但是没有消费者来取出消费,同时,队列中不断有新的元素入队,就会造成任务延期,队列会越来越大,元素延迟处理的时间会越来越长

假设此时又过了2s,还是没有消费者线程空下来:

在这里插入图片描述

因此,若任务处理时间较长,任务增长速度快,且到期时间较集中,则需要加快消费者线程处理任务的速度和增加消费者线程数量,否则就会造成任务延期越来越长,反之,也不能盲目增加消费者线程数量,数量太多导致资源浪费

实例

结合项目需求,使用DelayQueue来实现线索、客户的超时功能 (1)创建任务类:DelayTask.java,实现Delayed接口,作为延迟队列中的元素,然后只需将线索类、客户类继承该类

@Data
public class DelayTask implements Delayed {
    /**
     * 开始计时时间 不设置则默认为当前系统时间
     */
    private transient Date taskStartTime = new Date();
    /**
     * 过期时间 不设置则默认1分钟
     */
    private transient long taskExpiredTime = 60 * 1000;
    /**
     * 初始设置开始计时时间
     * taskStartTime 开始时间 [String] [yyyy-MM-dd HH:mm:ss]
     * taskExpiredTime 过期时间 [long] 单位:s
     * @param taskStartTime
     * @param taskExpiredTime
     */
    public void initTaskTime(String taskStartTime, long taskExpiredTime) {
        if(Assert.notEmpty(taskStartTime)) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                this.taskStartTime = sdf.parse(taskStartTime);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        this.taskExpiredTime = taskExpiredTime;
        this.taskExpiredTime += this.taskStartTime.getTime();
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(taskExpiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return (this.getDelay(TimeUnit.MILLISECONDS) - ((DelayTask) o).getDelay(TimeUnit.MILLISECONDS)) > 0 ? 1:0;
    }
}

(2)创建一个单例的延迟队列工具类:DelayQueueHelper 声明了一个延迟队列,并且对外提供一个统一、全局的操作延迟队列的入口(入队、删除元素操作)

public class DelayQueueHelper {
    private volatile static DelayQueueHelper delayQueueHelper = null;
    //私海线索过期时间:24h
    public static final long CLUE_EXPIRED_TIME = 24 * 60 * 60 * 1000;
    //私海客户过期时间:60天
    public static final long CUS_EXPIRED_TIME = 60L * 24 * 60 * 60 * 1000;
    //线索、客户释放后冷冻时间:48h
    public static final long BLOCK_TIME = 48 * 60 * 60 * 1000;
    private DelayQueue<DelayTask> queue = new DelayQueue<>();
    private DelayQueueHelper() {
    }
    public static DelayQueueHelper getInstance() {
        if(delayQueueHelper == null) {
            synchronized(DelayQueueHelper.class) {
                delayQueueHelper = new DelayQueueHelper();
            }
        }
        return delayQueueHelper;
    }
    public void addTask(DelayTask task) {
        queue.put(task);
    }
    public void removeTask(DelayTask task) {
        if(task == null){
            return;
        }
        for(Iterator<DelayTask> iterator = queue.iterator(); iterator.hasNext();) {
            if(task instanceof Clue) {
                Clue clue = (Clue) task;
                Clue queueObj = (Clue) iterator.next();
                if(clue.getId().equals(queueObj.getId())){
                    queue.remove(queueObj);
                }
            }
        }
    }
    public DelayQueue<DelayTask> getQueue() {
        return queue;
    }
}

(3)创建一个初始化类:DelayQueueRunner,实现ApplicationRunner接口

1.系统启动时,首先将所有任务入队 (DelayQueue的缺点:宕机、系统重启后数据会被清空,因此系统初始化时需将所有满足条件的元素入队)

2.开启一个消费者线程,循环从延迟队列中获取到期的线索、客户进行消费(将线索、客户状态修改为释放状态、解除冻结状态)

@Slf4j
@Component
public class DelayQueueRunner implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
        //1.将所有未到期的线程、客户入队
        //......
        //2.开启一个消费者线程
        run(queueHelper.getQueue());
    }
    public void run(DelayQueue queue) {
        new Thread() {
            @Override
            public void run() {
                try {
                    while (true) {
                        DelayTask task = (DelayTask) queue.take();
                        executeTask(task);
                    }
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                }
            }
        }.start();
    }
    private void executeTask(DelayTask task) {
        if(task instanceof Clue) {
            Clue clue = (Clue) task;
            //修改状态
          	clue.update();
        }
    }
}

(4)在添加、释放线索记录、客户记录时,通过DelayQueueHelper对队列中的元素进行相应的入队、出队操作

 /**
     * 将线索\客户加入超时自动更新状态队列
     * @param clue 线索\客户对象
     * @param type 0:私海线索 1:私海客户 3:释放后元素
     * @param startTime 开始计时时间
     */
    public void addToTimeoutAutoUpdateQueue(Clue clue, int type, Date startTime) {
        long expireTime = 0;
        if(type == CLUE) { //线索队列
            expireTime = DelayQueueHelper.CLUE_EXPIRED_TIME;
        }else if(type == CUS) { //客户队列
            expireTime = DelayQueueHelper.CUS_EXPIRED_TIME;
        }else if(type == LOCK) { //冻结队列
            expireTime = DelayQueueHelper.BLOCK_TIME;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
        clue.initTaskTime(sdf.format(startTime), expireTime);
        queueHelper.addTask(clue);
    }
	/**
     * 将线索从超时自动更新状态队列中删除
     * @param clue
     */
    public void removeFromTimeoutAutoUpdateQueue(Clue clue) {
        DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
        queueHelper.removeTask(clue);
    }

到此这篇关于JAVA中的延迟队列DelayQueue应用解析的文章就介绍到这了,更多相关JAVA的DelayQueue应用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

最新评论