Java延迟队列原理与用法实例详解

 更新时间:2018年09月03日 11:45:35   作者:QH_JAVA  
这篇文章主要介绍了Java延迟队列原理与用法,结合实例形式详细分析了延迟队列的概念、原理、功能及具体使用方法,需要的朋友可以参考下

本文实例讲述了Java延迟队列原理与用法。分享给大家供大家参考,具体如下:

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……
 *
 * @author whd
 * @date 2017年9月24日 下午8:57:14
 */
public class Message implements Delayed {
    private int id;
    private String body; // 消息内容
    private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
    public int getId() {
        return id;
    }
    public String getBody() {
        return body;
    }
    public long getExcuteTime() {
        return excuteTime;
    }
    public Message(int id, String body, long delayTime) {
        this.id = id;
        this.body = body;
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }
    // 自定义实现比较方法返回 1 0 -1三个参数
    @Override
    public int compareTo(Delayed delayed) {
        Message msg = (Message) delayed;
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
    }
    // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}

二、消息消费者

package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
    // 延时队列 ,消费者从其中获取消息进行消费
    private DelayQueue<Message> queue;
    public Consumer(DelayQueue<Message> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message take = queue.take();
                System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

三、延时队列

package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
     public static void main(String[] args) {
        // 创建延时队列
        DelayQueue<Message> queue = new DelayQueue<Message>();
        // 添加延时消息,m1 延时3s
        Message m1 = new Message(1, "world", 3000);
        // 添加延时消息,m2 延时10s
        Message m2 = new Message(2, "hello", 10000);
        //将延时消息放到延时队列中
        queue.offer(m2);
        queue.offer(m1);
        // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(new Consumer(queue));
        exec.shutdown();
      }
}

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类   第二、按照任务类组装的消息体类  第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

package com.test.delayqueue;
/**
 * 具体执行相关业务的业务类
 * @author whd
 * @date 2017年9月25日 上午12:49:32
 */
public class DelayOrderWorker implements Runnable {
    @Override
    public void run() {
        // TODO Auto-generated method stub
        //相关业务逻辑处理
        System.out.println(Thread.currentThread().getName()+" do something ……");
    }
}

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 延时队列中的消息体将任务封装为消息体
 *
 * @author whd
 * @date 2017年9月25日 上午12:48:30
 * @param <T>
 */
public class DelayOrderTask<T extends Runnable> implements Delayed {
    private final long time;
    private final T task; // 任务类,也就是之前定义的任务类
    /**
     * @param timeout
     *      超时时间(秒)
     * @param task
     *      任务
     */
    public DelayOrderTask(long timeout, T task) {
        this.time = System.nanoTime() + timeout;
        this.task = task;
    }
    @Override
    public int compareTo(Delayed o) {
        // TODO Auto-generated method stub
        DelayOrderTask other = (DelayOrderTask) o;
        long diff = time - other.time;
        if (diff > 0) {
            return 1;
        } else if (diff < 0) {
            return -1;
        } else {
            return 0;
        }
    }
    @Override
    public long getDelay(TimeUnit unit) {
        // TODO Auto-generated method stub
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
    @Override
    public int hashCode() {
        return task.hashCode();
    }
    public T getTask() {
        return task;
    }
}

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 延时队列管理类,用来添加任务、执行任务
 *
 * @author whd
 * @date 2017年9月25日 上午12:44:59
 */
public class DelayOrderQueueManager {
    private final static int DEFAULT_THREAD_NUM = 5;
    private static int thread_num = DEFAULT_THREAD_NUM;
    // 固定大小线程池
    private ExecutorService executor;
    // 守护线程
    private Thread daemonThread;
    // 延时队列
    private DelayQueue<DelayOrderTask<?>> delayQueue;
    private static final AtomicLong atomic = new AtomicLong(0);
    private final long n = 1;
    private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
    private DelayOrderQueueManager() {
        executor = Executors.newFixedThreadPool(thread_num);
        delayQueue = new DelayQueue<>();
        init();
    }
    public static DelayOrderQueueManager getInstance() {
        return instance;
    }
    /**
     * 初始化
     */
    public void init() {
        daemonThread = new Thread(() -> {
            execute();
        });
        daemonThread.setName("DelayQueueMonitor");
        daemonThread.start();
    }
    private void execute() {
        while (true) {
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            System.out.println("当前存活线程数量:" + map.size());
            int taskNum = delayQueue.size();
            System.out.println("当前延时任务数量:" + taskNum);
            try {
                // 从延时队列中获取任务
                DelayOrderTask<?> delayOrderTask = delayQueue.take();
                if (delayOrderTask != null) {
                    Runnable task = delayOrderTask.getTask();
                    if (null == task) {
                        continue;
                    }
                    // 提交到线程池执行task
                    executor.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 添加任务
     *
     * @param task
     * @param time
     *      延时时间
     * @param unit
     *      时间单位
     */
    public void put(Runnable task, long time, TimeUnit unit) {
        // 获取延时时间
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
        // 将任务封装成实现Delayed接口的消息体
        DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
        // 将消息体放到延时队列中
        delayQueue.put(delayOrder);
    }
    /**
     * 删除任务
     *
     * @param task
     * @return
     */
    public boolean removeTask(DelayOrderTask task) {
        return delayQueue.remove(task);
    }
}

测试类

package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
    public static void main(String[] args) {
        DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
        DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
        DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
        // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
        DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
        manager.put(work1, 3000, TimeUnit.MILLISECONDS);
        manager.put(work2, 6000, TimeUnit.MILLISECONDS);
        manager.put(work3, 9000, TimeUnit.MILLISECONDS);
    }
}

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现……

更多关于java算法相关内容感兴趣的读者可查看本站专题:《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总

希望本文所述对大家java程序设计有所帮助。

相关文章

  • Java之多个线程顺序循环执行的几种实现

    Java之多个线程顺序循环执行的几种实现

    这篇文章主要介绍了Java之多个线程顺序循环执行的几种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • RabbitMQ消息丢失解决方案

    RabbitMQ消息丢失解决方案

    把这篇文章主要为大家介绍了如何保证RabbitMQ消息不丢失的解决方发,分从从丢失的三种情况给大家介绍不同的解决方案,感兴趣的小伙伴可以参考阅读本文
    2023-07-07
  • Java注解之Retention、Documented、Inherited介绍

    Java注解之Retention、Documented、Inherited介绍

    这篇文章主要介绍了Java注解之Retention、Documented、Inherited注解介绍,本文内容和相关文章是系列文章,需要的朋友可以参考下
    2014-09-09
  • springboot 多环境配置 yml文件版的实现方法

    springboot 多环境配置 yml文件版的实现方法

    这篇文章主要介绍了springboot 多环境配置 yml文件版的实现方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • SpringBoot在项目中访问静态资源步骤分析

    SpringBoot在项目中访问静态资源步骤分析

    今天在玩SpringBoot的demo的时候,放了张图片在resources目录下,启动区访问的时候,突然好奇是识别哪些文件夹来展示静态资源的, 为什么有时候放的文件夹不能显示,有的却可以
    2023-01-01
  • 关于Spring MVC同名参数绑定问题的解决方法

    关于Spring MVC同名参数绑定问题的解决方法

    Spring MVC中的参数绑定还是蛮重要的,最近在使用中遇到了同名参数绑定的问题,想着总结分享出来,下面这篇文章主要给大家介绍了关于Spring MVC同名参数绑定问题的解决方法,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-08-08
  • Java中的Graphics2D类基本使用教程

    Java中的Graphics2D类基本使用教程

    这篇文章主要介绍了Java中的Graphics2D类基本使用教程,Graphics2D类较之Graphics类中的功能更加专业,需要的朋友可以参考下
    2015-10-10
  • JavaWeb 入门:Hello Servlet

    JavaWeb 入门:Hello Servlet

    这篇文章主要介绍了Servlet开发JavaWeb工程示例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-07-07
  • Java数据结构之HashMap源码深入分析

    Java数据结构之HashMap源码深入分析

    Java HashMap是一种基于哈希表实现的键值对存储结构,可以实现快速的数据查找和存储。它是线程不安全的,但在单线程环境中运行效率高,被广泛应用于Java开发中
    2023-04-04
  • Java设计模式之简单工厂 工厂方法 抽象工厂深度总结

    Java设计模式之简单工厂 工厂方法 抽象工厂深度总结

    设计模式(Design Pattern)是前辈们对代码开发经验的总结,是解决特定问题的一系列套路。它不是语法规定,而是一套用来提高代码可复用性、可维护性、可读性、稳健性以及安全性的解决方案
    2021-09-09

最新评论