RabbitMq中channel接口的几种常用参数详解

 更新时间:2023年08月28日 09:38:31   作者:Alan0517  
这篇文章主要介绍了RabbitMq中channel接口的几种常用参数详解,RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,需要的朋友可以参考下

1. 背景概述

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message acknowledgement), 消费者在订阅队列时,可以指定autoAck参数,

  • 当autoAck 等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。
  • 当autoAck 等于true时,RabbitMQ 会自动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置autoAck 参数为false ,消费者就有足够的时间处理消息(任务) ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ 会一直等待持有消息直到消费者显式调用Basic.Ack 命令为止。

当autoAck 参数置为false ,对于RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接, 则RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开, 这么设计的原因是RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbtiMQ 的Web 管理平台(15672端口)上可以看到当前队列中的" Ready" 状态和"Unacknowledged" 状态的消息数,分别对应上文中的等待投递给消费者的消息数和己经投递给消费者但是未收到确认信号的消息数

也可以通过相应的命令来查看上述信息:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?

RabbitMQ 在2 .0.0 版本开始引入了Basic.Reject 这个命令,消费者客户端可以调用与其对应的channel.basicReject 方法来告诉RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中deliveryTag 可以看作消息的编号,它是一个64 位的长整型值,最大值是9223372036854775807, 如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令, 消费者客户端可以调用channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中deliveryTag 和requeue 的含义可以参考basicReject 方法。

  • multiple 参数设置为false 则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack 和basicReject 方法一样;
  • multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。

注意要点:

将channel.basicReject 或者channel.basicNack 中的requeue 设置为false ,可以启用" 死信队列 "的功能。

死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。 对于requeue , AMQP 中还有一个命令Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

1.Basic.RecoverOk basicRecover() throws IOException;

2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个channel.basicRecover 方法用来请求RabbitMQ 重新发送还未被确认的消息。

如果requeue 参数设置为true, 则未被确认的消息会被重新加入到队列中, 这样对于同一条消息来说,可能会被分配给与之前不同的消费者。

如果requeue 参数设置为false ,那么同一条消息会被分配给与之前相同的消费者, 默认情况下,如果不设置requeue 这个参数,相当于channel.basicRecover(true) ,即requeue 默认为true

2. 通常参数解释

  • consumerTag :会话的标签,是固定的 ;
  • deliveryTag : 每次接收消息+1,可以做此消息处理通道的名字。

因此 deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。

3. Channel一些Api解释

3.1. basicNack 不确认消息

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

简单理解就是: 不确认deliveryTag对应的消息

  • 参数1: 消息
  • 参数2: 是否应用于多消息
  • 参数3: 是否重新放回队列,否则丢弃或者进入死信队列

第二个参数,怎么理解basic.nack多消息,比如现在有多条消息去调用这个nack方法,他是怎么执行的?

  • 有个先后顺序,就是调用nack时,之前所有没有ack的消息都会被标记为nack,多条消息同时调用,则调用的这个语句执行前,如果还有未执行回复确认的消息就会被回复nack,后续的消息回复nack可能只作用于当条消息。

注意: nack后的消息也会被自己消费;

3.2. basicReject 拒绝消息

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

简单理解就是:拒绝deliveryTag对应的消息

  • 参数1: 消息
  • 参数2: 是否重新放回队列,否则丢弃或者进入死信队列

区别在于:

  • basicReject一次只能拒绝接收一个消息
  • basicNack方法可以支持一次0个或多个消息的拒收

3.3. RecoverOk 是否恢复消息到队列

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。 false则消息会重新被投递给自己。

3.4. exchangeDeclare 声明交换机

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;
  • exchange:交换机名称
  • type:交换机类型 有direct、fanout、topic三种
  • durable:设置是否持久化。durable设置true表示持久化 ,服务器重启会将Exchange(交换机)存盘。注意:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。
  • autoDelete: 设置是否自动删除 。.当已经没有消费者时,服务器是否可以删除该Exchange。自动删除的前提是至少有一个队列或者交换机与这个交换器绑定的队列或者交换器都与之解绑;
  • internal:设置是否内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  • argument:其他一些结构化参数,比如alternate-exchange

3.5. queueDeclare 声明队列

  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map&lt;String, Object&gt; arguments) throws IOException;
  • 队列的名字
  • 队列里面的消息是否支持持计化
  • 设置该队列,是否可以供对个消费者消费
  • 是否自动删除消息
  • 其他参数

3.6. queueBind 绑定队列

 Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • queue: 队列名
  • exchange: 交换器名称
  • routingKey :路由key或者绑定key
  • arguments: 一些参数

3.7. queueUnbind 解绑队列

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • queue: 队列名
  • exchange: 交换器名称
  • routingKey :路由key或者绑定key
  • arguments: 一些参数

3.8. exchangeBind 绑定交换机

  Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  • destination :目标交换器
  • source :源交换器
  • routingKey 路由key
  • arguments: 一些相关参数

消息从source交换器转发到destination交换器存储在destination绑定的队列queue中

3.9. exchangeUnbind 解绑交换机

 Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • destination :目标交换器
  • source :源交换器
  • routingKey 路由key
  • arguments: 一些相关参数

3.10. basicQos 消息流量

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的,

void basicQos(int prefetchSize, int prefetchCount, boolean global)
  • param1:prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
  • param2:prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
  • param3:global,是否将上面的设置应用于整个通道
    • false:表示只应用于当前消费者
    • true:表示当前通道的所有消费者都应用这个限流策略

消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。

队列中没有被消费的消息不会被删除,还是存在于队列中。

一般和channel.basicAck配套使用

3.11. basicAck 消息确认

 void basicAck(long deliveryTag, boolean multiple) throws IOException
  • deliveryTag:该消息的index
  • multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

3.12. basicConsume 消息消费

该重载方法有点多,具体我就不列举了,参数解释一下:

  • queue:队列名
  • autoAck:是否自动确认消息
  • deliverCallback: 当一个消息发送过来后的回调接口
  • cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
  • callback: 消费者对象的回调接口
  • shutdownSignalCallback: 当channel/connection 关闭后回调
  • arguments: 消费的一组参数
  • consumerTag: 客户端生成的用于建立上线文的使用者标识
  • nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
  • exclusive: 如果是单个消费者,则为true

启动一个消费者,并返回服务端生成的消费者标识

3.13. basicPublish 发布消息

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  • exchange:要将消息发送到的Exchange(交换器)
  • routingKey:路由的 key 是哪个
  • 其他参数
    • mandatory:true 如果mandatory标记被设置
    • immediate: true 如果immediate标记被设置,注意:RabbitMQ服务端不支持此标记
    • props:其它的一些属性,如:{@linkMessageProperties.PERSISTENT_TEXT_PLAIN}
  • body:发送消息的消息体

3.14. basicGet 主动拉取队列中的一条消息

GetResponse basicGet(String queue, boolean autoAck)
  • 参数1: 队列名
  • 参数2: 是否自动确认

3.15. basicCancel 取消消费者对队列的订阅关系

void basicCancel(String consumerTag)

consumerTag:服务器端生成的消费者标识

4. 消息确认一些观点

  • 消息监听内必须使用channel对消息进行确认,不管是确认消费成功还是确认消费失败
  • 消息监听内的异常处理有两种方式:
    • 内部catch后直接处理,然后使用channel对消息进行确认
    • 配置RepublishMessageRecoverer将处理异常的消息发送到指定队列专门处理或记录
  • 监听的方法内抛出异常貌似没有太大用处。因为抛出异常就算是重试也非常有可能会继续出现异常,当重试次数完了之后消息就只有重启应用才能接收到了,很有可能导致消息消费不及时。当然可以配置RepublishMessageRecoverer来解决,但是万一RepublishMessageRecoverer发送失败了呢。。那就可能造成消息消费不及时了。所以即使需要将处理出现异常的消息统一放到另外队列去处理,个人建议两种方式:
    • catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
    • 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败

到此这篇关于RabbitMq中channel接口的几种常用参数详解的文章就介绍到这了,更多相关channel接口常用参数内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中bcrypt算法实现密码加密的方法步骤

    Java中bcrypt算法实现密码加密的方法步骤

    我们可以在Spring Boot和SSM中实现密码加密,使用bcrypt算法可以保障密码的安全性,并且减少了手动编写哈希函数的工作量,本文就来详细的介绍一下,感兴趣的可以了解一下
    2023-08-08
  • java获取时间的方法总结

    java获取时间的方法总结

    以下是对java中获取时间的几种方法进行了详细的分析介绍,需要的朋友可以参考下
    2013-07-07
  • Arrays.asList方法总结

    Arrays.asList方法总结

    本文主要对Arrays.asList方法进行总结。具有很好的参考价值,下面跟着小编一起来看下吧
    2017-02-02
  • JAVA String.valueOf()方法的用法说明

    JAVA String.valueOf()方法的用法说明

    这篇文章主要介绍了JAVA String.valueOf()方法的用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java 运算符详情

    Java 运算符详情

    这篇文章主要介绍了Java 运算符,Java 中的运算符与 C 语言基本一致。下面文章就围绕Java 中的运算符的相关资料展开内容,需要的朋友可以参考一下
    2021-11-11
  • springBoot2.6.2自动装配之注解源码解析

    springBoot2.6.2自动装配之注解源码解析

    对于springboot个人认为它就是整合了各种组件,然后提供对应的自动装配和启动器(starter),基于这个流程去实现一个定义的装配组件,下面这篇文章主要给大家介绍了关于springBoot2.6.2自动装配之注解源码解析的相关资料,需要的朋友可以参考下
    2022-01-01
  • java多线程编程之线程的生命周期

    java多线程编程之线程的生命周期

    线程要经历开始(等待)、运行、挂起和停止四种不同的状态。这四种状态都可以通过Thread类中的方法进行控制。下面给出了Thread类中和这四种状态相关的方法
    2014-01-01
  • 浅谈synchronized加锁this和class的区别

    浅谈synchronized加锁this和class的区别

    synchronized 是 Java 语言中处理并发问题的一种常用手段,本文主要介绍了synchronized加锁this和class的区别,具有一定的参考价值,感兴趣的可以了解一下
    2021-11-11
  • SpringCloud Gateway读取Request Body方式

    SpringCloud Gateway读取Request Body方式

    这篇文章主要介绍了SpringCloud Gateway读取Request Body方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • java中的forkjoin框架的使用

    java中的forkjoin框架的使用

    这篇文章主要介绍了java中的fork join框架的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03

最新评论