RabbitMQ延时队列实现方法

 更新时间:2026年05月18日 10:33:23   作者:星辰大海2024  
文章主要介绍了在Linux环境下使用CentOS/Rocky使用Docker部署RabbitMQ 3.8版本,并实现现现了延时队列的实现方式,总结了两种方法在灵活性、性能和管理上的优缺点,感兴趣的朋友跟随小编一起看看吧

Linux环境下(centOS,Rocky)docker部署,rabbitMQ3.8

RabbitMQ延时队列实现的目的:

延时队列主要应用于延时任务(订单的超时取消和获取支付服务的支付状态)

需要知道什么是死信

死信是什么

死信(Dead Letter Message) 就是 RabbitMQ 中无法正常投递或消费的消息。

不会被直接丢弃,而是被 RabbitMQ 自动“打上死信标签”,然后重新发布到你提前配置好的 死信交换机(Dead Letter Exchange,简称 DLX),再由 DLX 路由到一个专门的死信队列(Dead Letter Queue,简称 DLQ)

死信触发条件

序号触发条件具体说明常见场景
1消费者拒绝消息消费者调用 basic.rejectbasic.nack,并且设置 requeue=false业务处理失败,不想重试
2消息过期(TTL)消息设置了存活时间(x-message-ttl 或消息属性 expiration),到期后延迟消息最常用场景
3队列达到最大长度限制队列设置了 x-max-length(最大消息数),新消息进来时把最老的消息挤出去队列爆满
4队列达到最大字节数限制队列设置了 x-max-length-bytes(最大占用字节),挤出最老的消息大消息导致队列容量超限

死信交换机与RepulishMessageRecoverer区别

维度死信交换机(DLX)RepublishMessageRecoverer
所属层级RabbitMQ Broker(服务器端) 原生机制Spring AMQP(应用层) 提供的工具类
触发时机消息成为死信时(4种情况:拒绝+不重入队、TTL过期、队列长度超限、字节数超限)消费者本地重试次数耗尽后抛出异常时
触发者RabbitMQ 服务器自动触发Spring 的 ErrorHandler + MessageRecoverer 触发
消息处理方式Broker 直接把原消息重新发布到 DLXSpring 先 ACK 原消息(告诉 Broker 已消费),然后用 RabbitTemplate 重新发布一份新消息
是否走 DLX 机制直接走 DLX不走 DLX(因为已经 ACK 了)
能否携带额外信息只能带 x-death header(记录几次死信)可以自动添加 x-exception-stacktracex-exception-message 等丰富异常信息
灵活性中等(只能配置在队列上)很高(可以指定任意交换机 + RoutingKey)
适用场景1. TTL 过期(延迟消息) 2. 队列超长 3. 手动 reject + 不重入队消费业务异常后,需要把失败消息转到专门的错误队列
配置

需要在yaml文件中配置不可重新入队

default-requeue-rejected: false

需要在yaml文件中配置不可重新入队和最大重试

default-requeue-rejected: false

retry: enabled: true

max-attempts: 5

1.死信队列+TTL的实现

实现结构:

在consumer服务基于@RabbitListener注解来声明队列、交换机和绑定队列和交换机,并且设置交换机为死信交换机:

 @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="dlx.queue",durable =" true"),
            exchange = @Exchange(name="dlx.direct"),
            key = {"hi"}
    ))
    public void listenDlxQueue(String message)throws Exception{
        log.info("消费者监听到 dlx.queue的消息,{}",message);

在normalConfiguration 下通过@Bean声明任意队列和任意交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NormalConfiguration {
    @Bean
    public DirectExchange normalExchange(){
        return  new DirectExchange("normal.direct");
    }
    @Bean
    public Queue normalQueue(){
        return  QueueBuilder
                .durable("normal.queue")
                .deadLetterExchange("dlx.direct")
                .build();
    }
//推荐依赖注入的方式
//需要注意代码规范,方法名应为小驼峰,反之,会找不到bean
  @Bean
    public Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange){
        return  BindingBuilder
                .bind(normalQueue)
                .to(normalExchange)
                .with("hi");
    }
//只能使用当前类下的bean
   /* @Bean
    public Binding normalExchangeBinding(){
        return  BindingBuilder
                .bind(normalQueue())
                .to(normalExchange())
                .with("hi");
    }*/
}

如何发送消息:通过setExpiration方法指定延时时长

@Test
    public void testSendDelayMessage() throws Exception {
        rabbitTemplate.convertAndSend("normal.direct", "hi", "hello everyone__", message -> {
            message.getMessageProperties().setExpiration(  "10000");
            return message;
        });
    }

2.延时消息插件

使用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,RabbitMQ的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是设计了一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。这种设计大大简化了延迟消息的处理过程,提高了系统的效率和可靠性。

官方文档:

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

github下载地址:版本为3.8.17

rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

安装插件

由于之前是基于Docker安装的RabbitMQ,所以需要查看RabbitMQ插件目录对应的数据卷:

docker volume inspect mq-plugins

运行结果:

[
    {
        "CreatedAt": "2023-12-15T09:57:39+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

切换到该数据卷的路径下:

需要把下载的插件文件放在这个数据卷路径下

cd /var/lib/docker/volumes/mq-plugins/_data

安装插件docker命令:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

基于注解方式

在consumer服务基于@RabbitListener注解来声明队列、交换机和绑定队列和交换机,并且设置交换机为延迟交换机:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "delay.queue", durable = "true"),
            exchange = @Exchange(value = "delay.direct", delayed = "true"),
            key = "hidelay"
    ))
    public void listenDelayQueue(String msg) {
        log.info("delay.queue:" + msg);
    }
//通过  delayed = "true" 声明为延迟交换机

基于@Bean方式

在consumer服务基于@Bean注解来声明交换机、队列和绑定队列和交换机,并且设置交换机为延迟交换机:

@Configuration
public class DirectConfiguration {
    @Bean
    public DirectExchange delayExchange() {
        return ExchangeBuilder
                .directExchange("delay.direct")
                .delayed()//这里声明为延迟交换机
                .durable(true)
                .build();
    }
    @Bean
    public Queue delayedQueue() {
        return new Queue("delay.queue");
    }
    @Bean
    public Binding delayQueueBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

如何发送消息:通过setDelay方法来设置延时时长

 @Test
    public void testSendDelayMessageByplugin() {
        rabbitTemplate.convertAndSend("delay.direct", "hidelay", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        });
        log.info("消息发送成功");
    }

总结:

维度死信队列 + TTL(DLX + TTL)延时消息插件(x-delayed-message)
是否需要插件❌ 不需要(原生功能)✅ 需要安装 rabbitmq_delayed_message_exchange
实现原理消息设置 TTL 过期 → 成为死信 → 路由到 DLX → 进入目标队列发布消息时带 x-delay 头 → 插件内部暂存 → 到期自动投递
延迟时间灵活性❌ 固定延迟(通常每种延迟时间建一个队列)支持任意/动态延迟(毫秒级 per-message)
消息存储位置存放在普通队列中(会占用队列资源)存放在插件内部(Mnesia 表)
延迟精确度一般(尤其是高并发时可能有误差)较高(插件定时器更精确)
消息量支持✅ 极高(千万级轻松支持)❌ 有上限(Mnesia 内存/磁盘限制)
性能影响队列会膨胀,内存/磁盘压力大插件专用存储,普通队列不膨胀
实现复杂度中等(需配置 DLX、TTL、多个队列)简单(一个特殊交换机 + x-delay 参数)
管理服务支持✅ 几乎所有云厂商都支持❌ 部分云厂商(阿里云、腾讯云等)不支持插件
能否取消延迟❌ 较难✅ 相对容易(删除未到期消息)
适用场景固定延迟、超高并发、大消息量(如订单 30 分钟超时)动态延迟、少量精确延时(如 7 秒后、3 小时 15 分后)

到此这篇关于RabbitMQ延时队列实现方法的文章就介绍到这了,更多相关RabbitMQ延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • EasyExcel读写、模板填充、Web上传下载入门到实战全指南

    EasyExcel读写、模板填充、Web上传下载入门到实战全指南

    EasyExcel类是一套基于Java的开源Excel解析工具类,相较于传统的框架如Apache poi、jxl等更加快速、简洁,还可以解决大文件内存溢出问题,这篇文章主要介绍了EasyExcel读写、模板填充、Web上传下载入门到实战的相关资料,需要的朋友可以参考下
    2026-01-01
  • java对接海康摄像头的完整步骤记录

    java对接海康摄像头的完整步骤记录

    在Java中调用海康威视摄像头通常需要使用海康威视提供的SDK,下面这篇文章主要给大家介绍了关于java对接海康摄像头的完整步骤,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-06-06
  • Java 手动解析不带引号的JSON字符串的操作

    Java 手动解析不带引号的JSON字符串的操作

    这篇文章主要介绍了Java 手动解析不带引号的JSON字符串的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10
  • java 通过 SmbFile 类操作共享文件夹的示例

    java 通过 SmbFile 类操作共享文件夹的示例

    这篇文章主要介绍了java 通过 SmbFile 类操作共享文件夹,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • Java可变参数的使用规范与限制指南

    Java可变参数的使用规范与限制指南

    本文详细解释了Java中的可变参数机制,包括其格式、底层原理、使用规范和限制,强调了可变参数只适用于参数个数不确定的场景,放置在参数列表最后,不能与数组重载,不能与固定参数重载,方法内部需判空,以及命名规范等,同时提供了常见易错案例分析,帮助开发者避免常见问题
    2026-05-05
  • Java使用FastExcel高效读取和写入Excel

    Java使用FastExcel高效读取和写入Excel

    FastExcel 是一个 Java 库,旨在高效地读取和写入 Excel 文件,它最初是 EasyExcel 的分叉版本,旨在提供增强的性能、持续维护和新功能,同时保持与原始 EasyExcel API 的兼容性,本文给大家介绍了Java如何使用FastExcel高效读取和写入Excel,需要的朋友可以参考下
    2025-11-11
  • 区块链java代码实现

    区块链java代码实现

    这篇文章主要为大家详细介绍了区块链java代码实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • IntelliJ IDEA 2020 安装和常用配置(推荐)

    IntelliJ IDEA 2020 安装和常用配置(推荐)

    这篇文章主要介绍了IntelliJ IDEA 2020 安装和常用配置(推荐),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 浅谈java并发之计数器CountDownLatch

    浅谈java并发之计数器CountDownLatch

    CountDownLatch是通过一个计数器来实现的,当我们在new 一个CountDownLatch对象的时候需要带入该计数器值,该值就表示了线程的数量。下面我们来深入了解一下吧
    2019-06-06
  • SpringBoot事务失效的七种场景分析及解决方案

    SpringBoot事务失效的七种场景分析及解决方案

    Spring Boot 中的事务失效通常是由于代理机制、异常处理、传播行为配置不当等原因引起的,通过合理配置和排查,可以有效避免事务失效问题,所以本文给大家总结了SpringBoot事务失效的七种场景分析及解决方案,需要的朋友可以参考下
    2025-05-05

最新评论