RabbitMQ消息的延迟队列详解

 更新时间:2024年02月11日 10:31:27   作者:云村小威  
这篇文章主要介绍了RabbitMQ消息的延迟队列,延迟队列也就是死信交换机,有些队列的消息成为死信后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列,感兴趣的同学可以参考下文

Dead Letter Exchange(死信交换机)

在MQ中,当消息成为死信(Dead message 死掉的信息)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况

  • 队列消息长度到达限制
  • 消费者拒签消息,并且不把消息重新放入原队列
  • 消息到达存活时间未被消费

有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理的。但是你可以配置某个交换机为此队列的死信交换机,该队列的消息成为死信后会被重新发送到此 DLX 。至于怎么处理这个DLX中的死信就是看具体的业务场景了,DLX 中的信息可以被路由到新的队列。

生产者

    /**
     * 普通交换机绑定普通交换机
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //信息配置
        Map<String, Object> map = new HashMap<>();
        //message在该队列queue的存活时间最大为15秒
        map.put("x-message-ttl", 15000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-exchange", "exchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        map.put("x-dead-letter-routing-key", "queueB");
        return new Queue("queueA", true, false, false, map);
    }
    @Bean
    public DirectExchange exchangeA() {
        return new DirectExchange("exchangeA");
    }
    @Bean
    public Binding bindingA() {
        return BindingBuilder
                .bind(queueA())
                .to(exchangeA()).with("queueA");
    }
    /**
     * 死信交换机绑定死信交换机
     *
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue("queueB");
    }
    @Bean
    public DirectExchange exchangeB() {
        return new DirectExchange("exchangeB");
    }
    @Bean
    public Binding bindingB() {
        return BindingBuilder
                .bind(queueB())
                .to(exchangeB()).with("queueB");
    }

模拟发送请求

    @RequestMapping("/send6")
    public String sendSix() throws JsonProcessingException {
        rabbitTemplate.convertAndSend("exchangeA", "queueA", "检查订单是否过期");
        return "🫶";
    }

这时我发送请求到队列queueA,并设置了15秒的延迟,将超时的信息调用到死信交换机中。在这里我是没开启消费者所有没有消费者去处理该请求的,信息在queueA队列等待15秒后将会转到死信交换机queueB队列进行处理:

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。经典的应用场景是下单减库存。

根据以上结论,在rabbitmq中消费者只要接到信息就会自动确认进行处理。所以在上面并没有开启消费者,当请求時效后(如订单未支付,定时30分钟自动取消功能)我们不应该再让它正常处理,而把该请求放到死信交换机中安排对应的处理,所以我们需要打消费者自动处理请求改成手动。

如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

确认消息(局部方法处理消息)

默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

消费者添加手动确认消息配置配置 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manua

消费者接受消息:

package com.ycxw.consumer.demos;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DLXReceiver {
    @RabbitListener(queues = {"queueA"})
    @RabbitHandler
    public void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("已接受到队列queueA传递过来的消息:" + msg);
        channel.basicReject(tag, false);// 拒接消息,如果为true则拒绝后又从新回到队列被接受(循环),除非消息过期。
        //channel.basicAck(tag, true); 确认消息()一次性全接受,如果为false则接受一次
    }
    /**
     * 接受死信消息
     *
     * @param msg
     */
    @RabbitListener(queues = {"queueB"})
    @RabbitHandler
    public void handlerB(String msg) {
        /**
         * ...接受到信息,去数据库处理
         */
        System.out.println("已接受到队列queueB传递过来的消息:" + msg);
    }
}

第一次进入普通队列别拒绝后,转到死信队列中处理...

需要注意的 basicAck 方法需要传递两个参数

  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

到此这篇关于RabbitMQ消息的延迟队列详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot项目的logback日志配置(包括打印mybatis的sql语句)

    SpringBoot项目的logback日志配置(包括打印mybatis的sql语句)

    这篇文章主要介绍了SpringBoot项目的logback日志配置(包括打印mybatis的sql语句),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Java 实现将List平均分成若干个集合

    Java 实现将List平均分成若干个集合

    这篇文章主要介绍了Java 实现将List平均分成若干个集合,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-08-08
  • SpringCloud中的OpenFeign调用解读

    SpringCloud中的OpenFeign调用解读

    OpenFeign是一个显示声明式的WebService客户端,使用OpenFeign能让编写Web Service客户端更加简单OpenFeign的设计宗旨式简化Java Http客户端的开发,本文给大家介绍SpringCloud之OpenFeign调用解读,感兴趣的朋友一起看看吧
    2023-11-11
  • Java Spring Boot消息服务万字详解分析

    Java Spring Boot消息服务万字详解分析

    在实际项目开发中,有时需要与其他系统进行集成完成相关业务功能,这种情况最原始做法是程序内部相互调用,除此之外,还可以用消息服务中间件进行业务处理,用消息服务中间件处理业务能够提升系统的异步通信和扩展解耦能力。Spring Boot对消息服务管理提供了非常好的支持
    2021-10-10
  • IDEA如何修改配置文件的存放位置

    IDEA如何修改配置文件的存放位置

    这篇文章主要介绍了IDEA如何修改配置文件的存放位置,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 深入解析Jdk8中Stream流的使用让你脱离for循环

    深入解析Jdk8中Stream流的使用让你脱离for循环

    这篇文章主要介绍了Jdk8中Stream流的使用,让你脱离for循环,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • 详解Java线程池如何统计线程空闲时间

    详解Java线程池如何统计线程空闲时间

    这篇文章主要和大家分享一个面试题:Java线程池是怎么统计线程空闲时间?文中的示例代码讲解详细,对我们掌握Java有一定帮助,需要的可以参考一下
    2022-11-11
  • 解决grails服务端口冲突的办法(grails修改端口号)

    解决grails服务端口冲突的办法(grails修改端口号)

    grails中默认的服务端口为8080,当本机中需要同时启动两个不同的项目时,就会造成端口冲突,下面给出解决方法
    2013-12-12
  • JavaWeb三大组件之Filter过滤器详解

    JavaWeb三大组件之Filter过滤器详解

    这篇文章主要介绍了JavaWeb三大组件之Filter过滤器详解,过滤器Filter是Java Web应用中的一种组件,它在请求到达Servlet或JSP之前或者响应送回客户端之前,对请求和响应进行预处理和后处理操作,需要的朋友可以参考下
    2023-10-10
  • Java基础巩固小项目点菜系统的实现

    Java基础巩固小项目点菜系统的实现

    这篇文章主要介绍了一个Java小项目点菜系统的实现,主要是用的集合,适合正在学习Java的朋友拿来实战练手,感兴趣的朋友快来看看吧
    2022-03-03

最新评论