RabbitMQ队列的选择及应用场景

 更新时间:2025年07月14日 09:48:41   作者:莱温·克里  
本文介绍RabbitMQ的Classic、Quorum、Stream及懒队列,分别适用于单机、集群高可用、消息持久化与分布式备份、内存优化等场景,帮助选择合适队列类型以解决不同需求,感兴趣的跟随小编一起看看吧

前言

  本篇是Rabbit MQ高级特性的学习笔记,记录RabbitMQ的Classic,Quorum,Stream队列,懒队列的特性和运用场景

一、Rabbit MQ队列的选择

  Rabbit MQ默认提供了三种队列:

  • Classic:经典队列,在单机模式下是最常用的。
  • Quorum:仲裁队列,通常用于集群环境下,保证集群的高可用性。
  • Stream:流式队列,是自3.9.0版本开始引入的新特性,这种队列类型的消息是持久化到磁盘并且具备分布式备份的。

1.1、Classic

经典队列是Rabbit MQ默认的队列,与Spring Boot整合时,使用new Queue,创建的队列就是普通队列:

经典队列在创建时除了指定队列的名称,还有额外的四个选项:

对应的是页面上的:

durable代表了队列是否进行持久化,如果开启持久化,则会将队列保存到磁盘上,将来Rabbit MQ重启后,可以从磁盘恢复,否则队列只存在于内存中,服务重启后会自动删除。这里的durable只是设置了队列的持久化。消息和交换机,同样可以设置持久化。如果消息设置了持久化,而队列未设置持久化,重启之后,消息依旧会丢失。所以如果要保证消息的零丢失,消息和队列都需要设置持久化

// 队列声明
channel.queueDeclare("safe_queue", true, false, false, null); // Durability=true
// 消息发布
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // 持久化消息
    .build();

  exclusive代表了队列是否排他。当属性为true时,表示仅允许声明了该队列的连接进行访问,其他连接无法访问该队列。通常用于单消费者模式,确保队列只能被特定消费者访问。
  autoDelete代表了队列是否自动删除,如果设置为true,表示该队列在最后一个消费者断开连接之后,进行删除操作。通常用于临时队列的场景。
  arguments可以指定更多的参数,比如指定死信交换机和路由键,消息过期时间,最大长度,是否为懒队列等

1.2、Quorum

Quorum是针对镜像队列的一种优化,目前已经取代了镜像队列,作为Rabbit MQ集群部署保证高可用性的解决方案。传统的镜像队列,是将消息副本存储在一组节点上,以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上,并使这些节点上的队列保持同步。当一个节点失败时,其他节点上的队列不受影响,因为它们上面都有消息的备份。
  镜像队列使用主从模式,所有消息写入和读取均通过主节点,并异步复制到镜像节点。主节点故障时需重新选举,期间队列不可用。而仲裁队列基于Raft分布式共识算法,所有节点组成仲裁组。消息需被多数节点持久化后才确认成功,Leader故障时自动触发选举。
  相比较于传统的主从模式,避免了发生网络分区时的脑裂问题(基于Raft分布式共识算法避免)。

和普通队列的区别:

相比较于普通队列,仲裁队列增加了一个对于有毒消息的处理。什么是有毒消息?首先,消费者从队列中获取到了元素,队列会将该元素删除,但是消费者消费失败了,会给队列nack,并且可以设置消息重新入队。这样可能存在因为业务代码的问题,某条消息一直处理不成功的问题。仲裁队列会记录消息的重新投递次数,判断是否超过了设置的阈值,如果超过了就直接丢弃,或者放入死信队列人工处理。
如果需要声明一个仲裁队列,只需要加入参数:

@Configuration
public class QuorumConfig {
    @Bean
    public Queue quorumQueue() {
        Map<String,Object> params = new HashMap<>();
        params.put("x-queue-type","quorum");
        return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);
    }
}

  仲裁队列适用于集群环境下,队列长期存在,并且对于消息可靠性要求高,允许牺牲一部分性能(因为raft算法,消息需被多数节点持久化后才确认成功)的场景。

1.3、Stream

在传统的队列模型中,同一条消息只能被一个消费者消费(一个队列如果有多个消费者,是工作分发的机制。消息1->消费者1,消息2->消费者2,消息3->消费者1,不能两个消费者读同一条消息。),并且消息是阅后即焚的(消费者接收到消息后,队列中的该消息就删除,如果消费者拒绝签收并且设置了重新入队,再把消息重新放入队列中),无法重复从队列中获取相同的消息。并且在当队列中积累的消息过多时,性能下降会非常明显。
Stream队列正是解决了以上的这些问题。Stream队列的核心是用aof文件的形式存储队列,将消息以aof的方式追加到文件中。允许用户在日志的任何一个连接点开始重新读取数据。(需要用户自己记录偏移量)
声明一个stream队列:

@Configuration
public class StreamConfig {
    @Bean
    public Queue streamQueue() {
        Map<String,Object> params = new HashMap<>();
        params.put("x-queue-type","stream");
        params.put("x-max-length-bytes", 20_000_000_000L); // 指定队列的大小
        params.put("x-stream-max-segment-size-bytes", 100_000_000); // 文件分片存储,每一片的大小
			 //必须设置持久化为true,同时独占和自动删除模式为false
        return new Queue(MyConstants.QUEUE_STREAM,true,false,false,params);
    }
}

  声明消费者:

public void stremReceiver(Channel channel,String message){
		try {
			channel.basicQos(100);
			Consumer myconsumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope,
										   AMQP.BasicProperties properties, byte[] body)
						throws IOException {
					System.out.println("========================");
					String routingKey = envelope.getRoutingKey();
					System.out.println("routingKey >"+routingKey);
					String contentType = properties.getContentType();
					System.out.println("contentType >"+contentType);
					long deliveryTag = envelope.getDeliveryTag();
					System.out.println("deliveryTag >"+deliveryTag);
					System.out.println("content:"+new String(body,"UTF-8"));
					// (process the message components here ...)
					//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
					//没有答复过的消息,服务器会一直不停转发。
					channel.basicAck(deliveryTag, false);
				}
			};
			Map<String,Object> consumeParam = new HashMap<>();
			//first: 从日志队列中第一个可消费的消息开始消费
			//last: 消费消息日志中最后一个消息
			//next: 相当于不指定offset,消费不到消息。
			//Offset: 一个数字型的偏
			//Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。
			//例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
			consumeParam.put("x-stream-offset","last");
			channel.basicConsume(MyConstants.QUEUE_STREAM, false,consumeParam, myconsumer);
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("quorumReceiver received message : "+ message);
	}

1.4、懒队列

Rabbit MQ对于常规队列的处理是,将消息优先存在于内存中,在合适的时机再持久化到磁盘上,而懒队列则相反,懒队列会尽可能早的将消息内容保存到磁盘当中,并且只有在用户请求到时,才临时从磁盘加载到内存当中。懒队列的设计也是为了应对消息堆积问题的。
声明懒队列的方式,只需要加入参数,相应的,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");

懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。

到此这篇关于RabbitMQ队列的选择的文章就介绍到这了,更多相关RabbitMQ队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java/Android引用类型及其使用全面分析

    Java/Android引用类型及其使用全面分析

    下面小编就为大家带来一篇Java/Android引用类型及其使用全面分析。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09
  • Java对接乐橙摄像头详细步骤(绑定设备/直播/控制)

    Java对接乐橙摄像头详细步骤(绑定设备/直播/控制)

    大华乐橙SDK(LechangeSDK)是一套由大华科技推出的智能安防领域专用软件开发工具包,下面这篇文章主要介绍了Java对接乐橙摄像头(绑定设备/直播/控制)的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-12-12
  • springboot集成mybatis官方生成器

    springboot集成mybatis官方生成器

    本文主要介绍了springboot集成mybatis官方生成器,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • Java设计模式之策略模式示例详解

    Java设计模式之策略模式示例详解

    策略模式属于Java 23种设计模式中行为模式之一,该模式定义了一系列算法,并将每个算法封装起来,使它们可以相互替换,且算法的变化不会影响使用算法的客户。本文将通过示例详细讲解这一模式,需要的可以参考一下
    2022-08-08
  • java实现解析二进制文件的方法(字符串、图片)

    java实现解析二进制文件的方法(字符串、图片)

    本篇文章主要介绍了java实现解析二进制文件的方法(字符串、图片),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • java微信企业号开发之开发模式的开启

    java微信企业号开发之开发模式的开启

    这篇文章主要为大家详细介绍了java微信企业号开发之开发模式的开启方法,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • 基于Java的界面开发详细步骤(用户注册登录)

    基于Java的界面开发详细步骤(用户注册登录)

    通过一段时间Java Web的学习,写一个简单的注册登陆界面来做个总结,这篇文章主要给大家介绍了基于Java的界面开发(用户注册登录)的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-01-01
  • 解读String字符串导致的JVM内存泄漏问题

    解读String字符串导致的JVM内存泄漏问题

    这篇文章主要介绍了解读String字符串导致的JVM内存泄漏问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • 详解springboot设置cors跨域请求的两种方式

    详解springboot设置cors跨域请求的两种方式

    这篇文章主要介绍了详解springboot设置cors跨域请求的两种方式,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-11-11
  • 详解Spring DI依赖注入的方式和类型

    详解Spring DI依赖注入的方式和类型

    这篇文章主要介绍了详解Spring DI依赖注入的方式和类型,DI是由容器动态的将某个依赖关系注入到组件之中。依赖注入的目的并非为软件系统带来更多功能,而是为了提升组件重用的频率,并为系统搭建一个灵活、可扩展的平台,需要的朋友可以参考下
    2023-05-05

最新评论