简单聊聊RabbitMQ中的队头阻塞问题

 更新时间:2025年02月19日 08:14:28   作者:Java中文社群  
这篇文章主要为大家详细介绍了RabbitMQ中的队头阻塞问题以及相关的解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

RabbitMQ 延迟消息的队头阻塞问题是指,在使用死信队列(DLX)和 TTL(消息过期时间)实现延迟消息时,由于队列的先进先出(FIFO)特性,在队列头部消息未过期的情况下,即使后续消息已经过期也不能及时处理的情况

实现原理

RabbitMQ 延迟消息的实现方式有以下两种:

  • 死信队列+TTL
  • 使用 rabbitmq-delayed-message-exchange 插件

而我们本文要讨论的“RabbitMQ 延迟消息的队头阻塞问题”只会发生在死信队列+TTL 的实现方式中。

死信队列 + TTL 的实现流程如下:

  • 生产者先将设置了 TTL(过期时间)的消息发送到普通队列。
  • 普通队列没有消息者,所以一定会过期,消息过期之后就会发送到死信队列。
  • 消费者订阅死信队列获取消息,并执行延迟任务。

代码实现

死信队列 + TTL 在 Spring Boot 项目中的实现代码如下。

定义死信交换器(DLX)和死信队列

// Spring Boot 配置示例
@Configuration
public class RabbitConfig {
    // 定义死信交换器
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange");
    }

    // 定义死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue("dlx.queue");
    }

    // 绑定死信队列到 DLX
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");
    }

    // 定义普通队列,设置死信交换器和路由键
    @Bean
    public Queue mainQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dlx.exchange");
        args.put("x-dead-letter-routing-key", "dlx.routing.key");
        // 可选:设置队列级别的 TTL(所有消息统一过期时间)
        args.put("x-message-ttl", 10000); // 10秒
        return new Queue("main.queue", true, false, false, args);
    }

    // 主队列绑定到默认交换器(根据需要调整)
    @Bean
    public Binding mainBinding() {
        return BindingBuilder.bind(mainQueue()).to(new DirectExchange("default.exchange")).with("main.routing.key");
    }
}

发送消息时设置 TTL(消息级别)

// 发送延迟消息(消息级别 TTL)
public void sendDelayedMessage(String message, int delayMs) {
    rabbitTemplate.convertAndSend("default.exchange", "main.routing.key", message, msg -> {
        // 设置消息过期时间(覆盖队列级别的 TTL)
        msg.getMessageProperties().setExpiration(String.valueOf(delayMs));
        return msg;
    });
}

消费者监听死信队列

@RabbitListener(queues = "dlx.queue")
public void handleDelayedMessage(String message) {
System.out.println("处理延迟消息: " + message);
}

所以说消息的过期时间 TTL 的设置方式有以下两种:

队列级别:通过设置队列的 x-message-ttl 参数,设置队列统一的过期时间。

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置队列消息过期时间为 60 秒
channel.queueDeclare(queueName, true, false, false, args);

消息级别:通过给每个消息设置 expiration 属性,为每个消息设置过期时间。

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2) // 消息持久化
        .expiration("60000") // 设置消息过期时间为 60 秒
        .build();
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());

如果同时设置了消息级 TTL 和队列级 TTL,消息的实际过期时间会取两者中的最小值。

造成队头阻塞的原因

造成队头阻塞的原因有以下两个:

  • 先进先出的队列特性:队列中的消息必须按顺序处理,即使后面的消息 TTL 较短且已过期,也必须等待队头的消息先被处理(或过期)。
  • TTL 检查机制:RabbitMQ 默认仅在处理队头消息时检查其 TTL,如果队头消息的 TTL 较长(例如 10 分钟),即使后续消息的 TTL 更短(例如 1 分钟),这些消息也会被阻塞,直到队头消息过期或被移除。

如下图所示:

解决方案

  • 为不同延迟时间创建独立队列:将相同 TTL 的消息放入同一队列,避免消息的过期时间不一致。
  • 使用延迟插件:使用 RabbitMQ 的延迟插件 rabbitmq_delayed_message_exchange,直接通过延迟交换机实现延迟消息,绕过死信队列的 FIFO 限制。延迟插件是通过将消息存储到内置数据库 Mnesia,再通过不断判断过期消息,实现延迟消息的投递和执行的,因此它不存在队列的先进先出和队头阻塞的问题。

小结

队头阻塞问题是发生在使用死信队列加 TTL 实现 RabbitMQ 延迟消息的场景中,造成的原因是队列先进先出的特性,加上延迟消息的检查机制导致的,我们可以使用 RabbitMQ 的延迟插件来避免此问题。

那么问题来了,使用延迟插件如何实现延迟任务?它和死信队列的实现方式有哪些具体的区别呢?

到此这篇关于简单聊聊RabbitMQ中的队头阻塞问题的文章就介绍到这了,更多相关RabbitMQ队头阻塞内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • mybatisplus的连表增强插件mybatis plus join

    mybatisplus的连表增强插件mybatis plus join

    本文主要介绍了mybatisplus的连表增强插件mybatis plus join,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • java之向linux文件夹下写文件无权限的问题

    java之向linux文件夹下写文件无权限的问题

    这篇文章主要介绍了java之向linux文件夹下写文件无权限的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Spring+MyBatis多数据源配置实现示例

    Spring+MyBatis多数据源配置实现示例

    本篇文章主要介绍了Spring+MyBatis多数据源配置实现示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • 详解如何用Java去除HTML标签

    详解如何用Java去除HTML标签

    在平时工作中,偶尔会用 Java 做一些解析HTML的工作。有时需要删除所有的HTML标签,只保留纯文字内容。这个问题在做过一些爬虫工作的朋友来说很简单。下面来说说,我们平时使用到的集中解析的方法
    2022-12-12
  • java迭代器和for循环优劣详解

    java迭代器和for循环优劣详解

    在本篇文章里小编给大家整理的是一篇关于java迭代器和for循环优劣详解内容,对此有兴趣的朋友们可以学习参考下。
    2021-01-01
  • 基于XML的Spring声明事务控制

    基于XML的Spring声明事务控制

    这篇文章主要为大家详细介绍了基于XML的Spring声明事务控制,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-07-07
  • Java更新调度器(update scheduler)的使用详解

    Java更新调度器(update scheduler)的使用详解

    Java更新调度器是Java中的一个特性,可以自动化Java应用程序的更新过程,它提供了一种方便的方式来安排Java应用程序的更新,确保其与最新的功能、错误修复和安全补丁保持同步,本文将深入介绍如何使用Java更新调度器,并解释它对Java开发人员和用户的好处
    2023-11-11
  • Java算法练习题,每天进步一点点(2)

    Java算法练习题,每天进步一点点(2)

    方法下面小编就为大家带来一篇Java算法的一道练习题(分享)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧,希望可以帮到你
    2021-07-07
  • 几种JAVA细粒度锁的实现方式

    几种JAVA细粒度锁的实现方式

    这篇文章主要为大家详细介绍了几种JAVA细粒度锁的实现方式,感兴趣的小伙伴们可以参考一下
    2016-05-05
  • Java遍历Map四种方式讲解

    Java遍历Map四种方式讲解

    这篇文章主要介绍了Java遍历Map四种方式讲解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08

最新评论