rabbitmq消息队列原理分析

 更新时间:2025年12月02日 09:14:00   作者:软件开发随心记  
这篇文章主要介绍了rabbitmq消息队列原理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、rabbitmq架构

RabbitMQ是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现,由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。

rabbitmq简单架构如下:

上图简单展示了rabbitmq的架构,从图中看到几个关键字:vhost、exchange、route key、queue等,后面会介绍这些概念。

下面看下rabbitmq的进程模型:

看到这个图,相信大家应该很熟悉,没错就是事件驱动模型(或者说反应堆模型),这是一种高性能的非阻塞io线程模型,不过在Erlang中称为进程模型。

tcp_acceptor进程接收客户端连接,创建rabbit_reader、rabbit_writer、rabbit_channel进程。

  • rabbit_reader接收客户端连接,解析AMQP帧;rabbit_writer向客户端返回数据;
  • rabbit_channel解析AMQP方法,对消息进行路由,然后发给相应队列进程。
  • rabbit_amqqueue_process是队列进程,在RabbitMQ启动(恢复durable类型队列)或创建队列时创建。
  • rabbit_msg_store是负责消息持久化的进程。

在整个系统中,存在一个tcp_accepter进程,一个rabbit_msg_store进程,有多少个队列就有多少个rabbit_amqqueue_process进程,每个客户端连接对应一个rabbit_reader和rabbit_writer进程。

二、关于AMQP协议

1.AMQP帧组件

AMQP帧由五个不同的组件组成:

帧类型
信道编号
以字节为单位的帧大小
帧有效载荷payload
结束字节标志(ASCII值206)

2.帧类型

AMQP规范定义了五种类型的帧:协议头帧、方法帧、内容帧、消息体帧及心跳帧。每种帧类型都有明确的目的,有些帧的使用频率比其他的高很多:

协议头帧用于连接到rabbitmq,进使用一次。
方法帧携带发送给rabbitmq或者从rabbitmq接收到的rpc请求或者响应
内容头包含一条消息的大小和属性。
消息体帧包含消息的内容
心跳帧在客户端与rabbitmq直接进行传递,作为一种校验机制确保连接的两端都可用并且正常工作。

3.将消息编组成帧

我们使用方法帧、内容头帧和消息体帧组成一个完整的rabbitmq消息。方法头帧携带命令和执行它所需要的参数(如交换器和路由键)、内容帧包含消息的基本属性以及消息的大小,消息体帧也就是携带我们真正需要发送的消息内容。

4.方法帧结构

5.内容头帧结构

内容头包含的具体属性如下:

  • content-type:消息体的报文编码,如application/json
  • expiration:消息过期时间
  • reply-to:响应消息的队列名
  • content-encoding:报文压缩的编码,如gzip
  • message-id:消息的编号
  • correlation-id:链路id
  • deliver-mode:告诉rabbitmq将消息写入磁盘还是内存
  • user-id:投递消息的用户(发送消息时不要设置该值)
  • timestamp:投递消息的时间
  • headers:定义一些属性,可用于实现rabbitmq路由(比如exchange类型是headers的时候用到)

6.消息体帧结构

7.几个概念

  • Broker:简单来说就是消息队列服务器实体
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列,队列类型又分为临时队列,持久化队列,排他队列
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
  • producer:消息生产者,就是投递消息的程序
  • consumer:消息消费者,就是接受消息的程序
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

四、通讯过程

1.启动会话

2.声明交换器

3.声明队列

4.绑定队列到exchange

5.发送消息-使用事务机制

对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,假如使用no_ack模式,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。

如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。

当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。但是使用事务,会导致性能下降,它使得生产者发布消息后必须等到消息真正持久化后服务端响应了才结束本次连接,所以需要在实际应用中平衡性能与安全的问题。

6.发送消息-非事务方式

使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。

Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务(也就是异步监听服务端的ack即可)。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。

但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

7.消费消息

五、使用delivery-mode平衡速度和安全

delivery-mode有两个值:1表示非持久化,2表示持久化消息

1.发送消息到纯内存队列中

delivery-mode = 1

特点:非持久化的消息在服务宕机的时候会丢失数据,但是由于不需要磁盘io,尽可能地降低消息投递的延迟性,性能较高。

2.发布消息到支持磁盘存储的队列

delivery-mode = 2

特点:持久化的消息安全性较高,尽管服务宕机,数据也不会丢失,但是在投递消息的过程中需要发生磁盘io,性能相对纯内存投递的方式低,但是尽管是产生了磁盘io,由于日志的记录方式是直接追加到消息日志文件的末尾,属于顺序io,没有随机io,所以性能还是可以接受的。

大概原理:

  • 所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。当某个消息被删除时,并不立即从文件中删除相关信息,而是做一些记录,当垃圾数据达到一定比例时,启动垃圾回收处理,将逻辑相邻的文件中的数据合并到一个文件中。

消息的读写及删除:

  • rabbitmq在启动时会创建msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。所有队列的消息的写入和删除最终都由这两个进程负责处理,而消息的读取则可能是队列本身直接打开文件进行读取,也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理。

在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。消息的删除只是从ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。

六、消息路由模式

1.fanout模式

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

上图中,生产者发送到Exchange的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

2.direct模式

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。如图,生产者发送消息的routing key=key1的时候,只有绑定了key1的queue才能收到信息

3.topic模式

topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“image.new.profile”.
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配下一个据点前的所有字符,“#”用于匹配所有字符,包括句点(可以是零个)

如图,生产者以routing key为image.new.profile发布消息,这key可以被image.*.profile以及image.#匹配到,所有这两个队列都可以收到消息。由此可见,topic的路由方式更加灵活。

3.headers模式

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

七、rabbitmq流量控制

RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性。Erlang进程之间并不共享内存(binaries类型除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱。Erlang默认没有对进程邮箱大小设限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。

在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。然后RabbitMQ会进行page操作,将内存中的数据持久化到磁盘中。

为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。

八、 RabbitMQ 多层消息队列

RabbitMQ完全实现了AMQP协议,类似于一个邮箱服务。Exchange负责根据ExchangeType和RoutingKey将消息投递到对应的消息队列中,消息队列负责在消费者获取消息前暂存消息。

在RabbitMQ中,MessageQueue主要由两部分组成,一个为AMQQueue,主要负责实现AMQP协议的逻辑功能。另外一个是用来存储消息的BackingQueue。

为了高效处理入队和出队的消息、避免不必要的磁盘IO,BackingQueue进程为消息设计了4种状态和5个内部队列。

(1) 4种状态包括:

alpha,消息的内容和索引都在内存中;
beta,消息的内容在磁盘,索引在内存;
gamma,消息的内容在磁盘,索引在磁盘和内存中都有;
delta,消息的内容和索引都在磁盘。

对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,然后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。

(2) 5个内部队列

包括:q1、q2、delta、q3、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。所以delta队列并不在内存中,其他4个队列则是由erlang queue模块实现。

消息从q1入队,q4出队,在内部队列中传递的过程一般是经q1顺序到q4。实际执行并非必然如此:开始时所有队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。

当内存紧张时触发paging,paging将大量alpha状态的消息转换为beta和gamma;如果内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,所以Paging的开销较大,严重影响系统性能。

九、高可用队列(HA)

在生产环境下,一般都不会允许rabbitmq这种消息中间件单点,以免单点故障导致服务不可用,那么rabbitmq同样可以集群部署来保证服务的可用性,在rabbitmq集群中,我们可以定义HA队列,可以在web管理平台设置,也可以通过AMQP接口设置,当我们定义某个HA队列的时候,会在集群的各个节点上都建立该队列,发布消息的时候,直接发送至master服务,当master服务受到消息后,把消息同步至各个从节点,假如开启事务的情况下,是需要在消息被同步到各个节点之后才算完成事务,所以会带来一定的性能损耗,所以还是回到之前说的,性能和安全直接,需要根据实际业务的需要找到平衡点。

当master服务宕机之后,其中一个slaver节点会升级为master,消息不会丢失(因为已经完成了事务的消息都会在各个节点有备份)

ha-队列可以跨越集群的每台服务,或者仅使用其中一批独立节点。如果是全部节点都为副本的时候,将x-ha-policy参数设置为all,否则设置为nodes,然后在设置另一个参数:x-ha-nodes,该参数指定ha队列所在的节点列表。

思考下,rabbitmq的集群节点是不是越多越好?

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 使用FileReader采用的默认编码

    使用FileReader采用的默认编码

    这篇文章主要介绍了使用FileReader采用的默认编码,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • spring security实现下次自动登录功能过程解析

    spring security实现下次自动登录功能过程解析

    这篇文章主要介绍了spring security实现记住我下次自动登录功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java文件操作工具类fileUtil实例【文件增删改,复制等】

    Java文件操作工具类fileUtil实例【文件增删改,复制等】

    这篇文章主要介绍了Java文件操作工具类fileUtil,结合实例形式分析了java针对文件进行读取、增加、删除、修改、复制等操作的相关实现技巧,需要的朋友可以参考下
    2017-10-10
  • Java源码解析之HashMap的put、resize方法详解

    Java源码解析之HashMap的put、resize方法详解

    这篇文章主要介绍了Java源码解析之HashMap的put、resize方法详解,文中有非常详细的代码示例,对正在学习java的小伙伴们有很大的帮助,需要的朋友可以参考下
    2021-04-04
  • 如何设置springboot启动端口

    如何设置springboot启动端口

    spring boot是个好东西,可以不用容器直接在main方法中启动,而且无需配置文件,方便快速搭建环境。下面给大家介绍springboot启动端口的设置方法和spring boot创建应用端口冲突8080 问题,感兴趣的朋友一起看看吧
    2017-08-08
  • MyBatis生成UUID的实现

    MyBatis生成UUID的实现

    这篇文章主要介绍了MyBatis生成UUID的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • 创建一个Java的不可变对象

    创建一个Java的不可变对象

    这篇文章主要介绍了创建一个Java的不可变对象,一个类的对象在通过构造方法创建后如果状态不会再被改变,那么它就是一个不可变(immutable)类。它的所有成员变量的赋值仅在构造方法中完成,不会提供任何 setter 方法供外部类去修改,需要的朋友可以参考下
    2021-11-11
  • 深入理解Java8新特性之Stream API的终止操作步骤

    深入理解Java8新特性之Stream API的终止操作步骤

    Stream是Java8的一大亮点,是对容器对象功能的增强,它专注于对容器对象进行各种非常便利、高效的 聚合操作(aggregate operation)或者大批量数据操作。Stream API借助于同样新出现的Lambda表达式,极大的提高编程效率和程序可读性,感兴趣的朋友快来看看吧
    2021-11-11
  • Java异常处理中的各种细节汇总

    Java异常处理中的各种细节汇总

    这篇文章主要给大家介绍了关于Java异常处理中的各种细节的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-01-01
  • Maven默认中央仓库(settings.xml 配置详解)

    Maven默认中央仓库(settings.xml 配置详解)

    这篇文章主要介绍了Maven默认中央仓库(settings.xml 配置详解),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-12-12

最新评论