RabbitMQ中的延迟队列机制详解

 更新时间:2023年09月20日 11:10:39   作者:煎丶包  
这篇文章主要介绍了RabbitMQ中的延迟队列机制详解,延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望,在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列,需要的朋友可以参考下

一、延迟队列

延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、队列TTL

在这里插入图片描述

创建一个配置类,声明并配置交换机和队列

@Configuration
public class TtlQueueConfig {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "X";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "Y";
    //普通队列名称
    public static final String NORMAL_QUEUE_A = "QA";
    public static final String NORMAL_QUEUE_B = "QA";
    //死信队列名称
    public static final String DEAD_QUEUE = "QD";
    //声明普通交换机
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    //声明死信交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }
    //声明普通队列,TTL为10s
    @Bean("QA")
    public Queue qA() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build();
    }
    //声明普通队列,TTL为10s
    @Bean("QB")
    public Queue qB() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build();
    }
    //声明死信队列
    @Bean("QD")
    public Queue qD() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    //绑定对应的交换机和队列
    @Bean
    public Binding queueABindingX(@Qualifier("QA") Queue QA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("QB") Queue QB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("QD") Queue QD,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(QD).to(yExchange).with("YD");
    }
}

创建一个生产者

@Slf4j
@RestController
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/ttl/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条信息给两个队列:{}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列QA:" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列QB:" + message);
    }
}

创建一个消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receivedQD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到死信队列的消息:{}", new Date().toString(), message);
    }
}

浏览器发送消息

在这里插入图片描述

消费者分别过了10s和40s接收到了消息

在这里插入图片描述

三、延迟队列的优化

不同的延迟时间需要设置不同的 TTL ,可以优化声明一个通用的 QC 队列,具体的延迟时间有生产者决定

在这里插入图片描述

在配置类 TtlQueueConfig 中配置通用队列 QC

    //通用队列名称
    public static final String Generic_QUEUE_C = "QC";
    //声明通用队列
    @Bean("QC")
    public Queue qC() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //因为是通用队列,所以不设置TTL,由生产者指定消息的TTL
        return QueueBuilder.durable(Generic_QUEUE_C).withArguments(arguments).build();
    }
    //绑定通用队列和普通交换机
    @Bean
    public Binding queueCBindingX(@Qualifier("QC") Queue QC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QC).to(xExchange).with("XC");
    }
    //绑定通用队列和死信交换机
    @Bean
    public Binding queueCBindingY(@Qualifier("QC") Queue QC,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(QC).to(yExchange).with("YD");
    }

生产者发送消息,并指定 TTL 时长

    //发送消息,并指定消息的TTL
    @GetMapping("/ttl/sendExpirationMsg/{message}/{ttlTime}")
    public void sendExpirationMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
        log.info("当前时间:{}, 发送一条TTL为{}ms的消息给队列QC:{}", new Date().toString(), ttlTime, message);
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            //设置消息的TTL时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

发送两条消息

在这里插入图片描述

在这里插入图片描述

消费者接收消息

在这里插入图片描述

但是,如果连续发送两条消息,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。结果会导致第二条消息消费者收到时间有误。

在这里插入图片描述

四、基于 RabbitMQ 插件实现延迟队列

如果不能实现在消息粒度上的 TTL ,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。可以使用基于 RabbitMQ 插件来实现延迟队列,从而解决这个问题。

基于 RabbitMQ 插件实现延迟,是交换机实现延迟,而不再是队列实现延迟

在这里插入图片描述

在这里插入图片描述

创建一个基于插件的延迟队列配置类 DelayedQueueConfig

@Configuration
public class DelayedQueueConfig {
    //交换机名称
    public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
    //队列名称
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed_routingKey";
    //声明交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");  //设置延迟类型
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }
    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //绑定队列和交换机
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

创建生产者发送延迟消息

    //基于插件发送消息
    @GetMapping("/ttl/sendDelayedMsg/{message}/{delayedTime}")
    public void sendDelayedMsg(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) {
        log.info("当前时间:{}, 发送一条时长为{}ms的消息给延迟队列delayed_queue:{}", new Date().toString(), delayedTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
            //设置消息的延迟时长
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

创建消费者

@Slf4j
@Component
public class DelayedQueueConsumer {
    //监听消息
    @RabbitListener(queues = {DelayedQueueConfig.DELAYED_QUEUE_NAME})
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到延迟队列的消息:{}", new Date().toString(), msg);
    }
}

当连续发送两条不同延迟时长的消息时,消费者会先接收到延迟时长短的那条消息,再接收延迟时长长的那条消息。

在这里插入图片描述

实现延迟队列,一种是基于死信队列的方式,一种是基于RabbitMQ插件的方式。

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

到此这篇关于RabbitMQ中的延迟队列机制详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java SpringAOP技术之注解方式详解

    Java SpringAOP技术之注解方式详解

    这篇文章主要为大家详细介绍了Java SpringAOP技术之注解方式,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-02-02
  • SpringBoot深入分析讲解监听器模式上

    SpringBoot深入分析讲解监听器模式上

    监听器模式,大家应该并不陌生,主要的组成要素包括了事件、监听器以及广播器;当事件发生时,广播器负责将事件传递给所有已知的监听器,而监听器会对自己感兴趣的事件进行处理
    2022-07-07
  • Activiti开发环境的搭建过程详解

    Activiti开发环境的搭建过程详解

    这篇文章主要介绍了Activiti开发环境的搭建过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Java中深拷贝,浅拷贝与引用拷贝的区别详解

    Java中深拷贝,浅拷贝与引用拷贝的区别详解

    这篇文章主要为大家详细介绍了Java面试中常遇见的问题:深拷贝、浅拷贝与引用拷贝的区别,文中通过示例进行了详细讲解,需要的可以参考一下
    2022-08-08
  • jdk7 中HashMap的知识点总结

    jdk7 中HashMap的知识点总结

    HashMap的原理是老生常谈了,不作仔细解说。一句话概括为HashMap是一个散列表,它存储的内容是键值对(key-value)映射。这篇文章主要总结了关于jdk7 中HashMap的知识点,需要的朋友可以参考借鉴,一起来看看吧。
    2017-01-01
  • SpringBoot如何访问html和js等静态资源配置

    SpringBoot如何访问html和js等静态资源配置

    这篇文章主要介绍了SpringBoot如何访问html和js等静态资源配置,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • 几个好用Maven镜像仓库地址(小结)

    几个好用Maven镜像仓库地址(小结)

    这篇文章主要介绍了几个好用Maven镜像仓库地址(小结),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • 基于@Table注解无法使用及报红的解决

    基于@Table注解无法使用及报红的解决

    这篇文章主要介绍了基于@Table注解无法使用及报红的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Java 发送http请求上传文件功能实例

    Java 发送http请求上传文件功能实例

    本文通过实例代码给大家介绍了Java 发送http请求上传文件功能,需要的朋友参考下吧
    2017-06-06
  • SpringBoot整合Netty的流程步骤

    SpringBoot整合Netty的流程步骤

    Netty是一个基于Java的开源网络应用框架,它提供了高性能、异步事件驱动的网络编程能力,Netty旨在帮助开发者构建高性能、高可靠性的网络应用程序,本文给大家详细介绍了SpringBoot整合Netty的流程步骤,需要的朋友可以参考下
    2023-09-09

最新评论