RabbitMQ中的prefetch_count参数详解

 更新时间:2023年11月27日 14:21:19   作者:Throwable文摘  
这篇文章主要介绍了RabbitMQ中的prefetch_count参数用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

前提

在某一次用户标签服务中大量用到异步流程,使用了RabbitMQ进行解耦。

其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和prefetch_count参数都做了调整和测试,得到一个相对合理的组合。

这里深入分析一下prefetch_count参数在RabbitMQ中的作用。

prefetch_count参数的含义

先从AMQPAdvanced Message Queuing Protocol,即高级消息队列协议,RabbitMQ实现了此协议的0-9-1版本的大部分内容)和RabbitMQ的具体实现去理解prefetch_count参数的含义,可以查阅对应的文档(见文末参考资料)。

AMQP 0-9-1定义了basic.qos方法去限制消费者基于某一个Channel或者Connection上未进行ack的最大消息数量上限。

basic.qos方法支持两个参数:

  • global:布尔值。
  • prefetch_count:整数。

这两个参数在AMQP 0-9-1定义中的含义和RabbitMQ具体实现时有所不同,见下表:

global参数值AMQP 0-9-1中prefetch_count参数的含义RabbitMQ中prefetch_count参数的含义
falseprefetch_count值在当前Channel的所有消费者共享prefetch_count对于基于当前Channel创建的消费者生效
trueprefetch_count值在当前Connection的所有消费者共享prefetch_count值在当前Channel的所有消费者共享

或者用简洁的英文表格理解:

globalprefetch_count in AMQP 0-9-1prefetch_count in RabbitMQ
falsePer channel limitPer customer limit
truePer connection limitPer channel limit

这里画一个图理解一下:

上图仅仅为了区分协议本身和RabbitMQ中实现的不同,接着说说prefetch_count对于消费者(线程)和待消费消息的作用。

假定一个前提:RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个Channel实例。

global参数为false时候,效果如下:

而当global参数为true时候,效果如下:

在消费者线程处理速度远低于RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度的场景下,prefetch_count条未进行ack的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。

这部分消息会占据JVM的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到RabbitMQ的消费者,必须要考虑这些"预取消息"的内存占用量。

不过值得注意的是:prefetch_countRabbitMQ服务端的参数,它的设置值或者快照都不会存放在RabbitMQ客户端」

同时需要注意prefetch_count生效的条件和特性(从参数设置的一些demo和源码上感知):

  • prefetch_count参数仅仅在basic.consumeautoAck参数设置为false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
  • basic.consume如果在非自动确认模式下忘记了手动调用basic.ack,那么prefetch_count正是未ack消息数量的最大上限。
  • prefetch_count是由RabbitMQ服务端控制,一般情况下能保证各个消费者线程中的未ack消息分发是均衡的,这点笔者猜测是consumerTag起到了关键作用。

RabbitMQ客户端中prefetch_count源码跟踪

编写本文的时候引入的RabbitMQ客户端版本为:com.rabbitmq:amqp-client:5.9.0

上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读RabbitMQJava客户端源码,主要是针对basic.qosbasic.consume两个方法,对应的是com.rabbitmq.client.impl.ChannelN#basicQos()com.rabbitmq.client.impl.ChannelN#basicConsume()两个方法。

先看ChannelN#basicQos()

这里的basicQos()方法多了一个prefetchSize参数,用于限制分发内容的大小上限,默认值0代表无限制,而prefetchCount的取值范围是[0,65535],取值为0也是代表无限制。

这里的ChannelN#basicQos()实现中直接封装basic.qos方法参数进行一次RPC调用,意味着直接更变RabbitMQ服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。

接着看ChannelN#basicConsume()方法:

上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:

整个消息消费过程,prefetch_count参数并未出现在客户端代码中,又再次印证了前面一节的结论,即prefetch_count参数的行为和作用完全由RabbitMQ服务端控制。

而最终Customer或者常用的DefaultCustomer句柄是在WorkPoolRunnable中回调的,这类任务的执行线程来自于ConsumerWorkService内部的线程池,而这个线程池又使用了Executors.newFixedThreadPool()去构建,使用了默认的线程工厂类,因此在Customer#handleDelivery()方法内部打印的线程名称的样子是pool-1-thread-*

这里VariableLinkedBlockingQueue就是前一节中的message queue的原型

prefetch_count参数使用

设置prefetch_count参数比较简单,就是调用Channel#basicQos()方法:

public class RabbitQos {
 
    static String QUEUE = "qos.test";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, true, false, false, null);
        channel.basicQos(2);
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1------" + Thread.currentThread().getName());
                sleep();
            }
        });
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("2------" + Thread.currentThread().getName());
                sleep();
            }
        });
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
        }
        sleep();
    }
 
    private static void sleep() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception ignore) {
 
        }
    }
}

上面是原生的amqp-client的写法,如果使用了spring-amqpspring-boot-starter-amqp),可以通过配置文件中的spring.rabbitmq.listener.direct.prefetch属性指定所有消费者线程的prefetch_count,如果要针对部分消费者线程进行该属性的设置,则需要针对RabbitListenerContainerFactory进行改造。

prefetch_count参数最佳实践

关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。

该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:

这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。

也就是说,「官方是建议把prefetch_count设置为30

这里再参看一下spring-boot-starter-amqp中对此参数定义的默认值,具体是AbstractMessageListenerContainer中的DEFAULT_PREFETCH_COUNT

如果没有通过spring.rabbitmq.listener.direct.prefetch进行覆盖,那么使用spring-boot-starter-amqp中的注解定义的消费者线程中设置的prefetch_count就是250

笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。

总结如下(个人经验仅供参考):

  • 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把prefetch_count设置为1
  • 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未ack总消息量的内存极限,从而设计一个合理的prefetch_count值。
  • 当消费者线程的处理速度十分快,远远大于RabbitMQ服务端的消息分发,在网络带宽充足的前提下,设置可以把prefetch_count值设置为0,不做任何的消息流控。
  • 一般场景下,建议使用RabbitMQ官方的建议值30或者spring-boot-starter-amqp中的默认值250

总结

  • prefetch_countRabbitMQ服务端的参数,设置后即时生效。
  • prefetch_count对于AMQP-0-9-1中的定义与RabbitMQ中的实现不完全相同。
  • prefetch_count值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java实现打字游戏小程序

    java实现打字游戏小程序

    这篇文章主要为大家详细介绍了java实现打字游戏小程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-10-10
  • 使用SpringBoot 配置Oracle和H2双数据源及问题

    使用SpringBoot 配置Oracle和H2双数据源及问题

    这篇文章主要介绍了使用SpringBoot 配置Oracle和H2双数据源及问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 使用Java编写控制JDBC连接、执行及关闭的工具类

    使用Java编写控制JDBC连接、执行及关闭的工具类

    这篇文章主要介绍了如何使用Java来编写控制JDBC连接、执行及关闭的程序,包括一个针对各种数据库通用的释放资源的工具类的写法,需要的朋友可以参考下
    2016-03-03
  • Java基于FFmpeg实现Mp4视频转GIF

    Java基于FFmpeg实现Mp4视频转GIF

    FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。本文主要介绍了在Java中如何基于FFmpeg进行Mp4视频到Gif动图的转换,感兴趣的小伙伴可以了解一下
    2022-11-11
  • Java中Collection、List、Set、Map之间的关系总结

    Java中Collection、List、Set、Map之间的关系总结

    今天小编就为大家分享一篇关于Java中Collection、List、Set、Map之间的关系总结,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • 基于Java的MathML转图片的方法(示例代码)

    基于Java的MathML转图片的方法(示例代码)

    最近接到一个新需求mathML转图片怎么实现呢?刚开始还真是蒙圈了,不知道怎么实现,今天小编记录一种基于Java的MathML转图片的方法,感兴趣的朋友一起看看吧
    2021-06-06
  • java数据结构与算法之简单选择排序详解

    java数据结构与算法之简单选择排序详解

    这篇文章主要介绍了java数据结构与算法之简单选择排序,结合实例形式分析了选择排序的原理、实现方法与相关操作技巧,需要的朋友可以参考下
    2017-05-05
  • Mybatis分解式查询使用方法

    Mybatis分解式查询使用方法

    这篇文章主要介绍了Mybatis分解式查询使用方法,分解式查询就是将一条Sql语句拆分成多条。在 MyBatis 多表查询中,使用连接查询时一个 Sql 语句就可以查询出所有的数据
    2023-04-04
  • SpringBoot事务异步调用引发的bug解决

    SpringBoot事务异步调用引发的bug解决

    本文主要介绍了SpringBoot事务异步调用引发的bug解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • Springboot通过run启动web应用的方法

    Springboot通过run启动web应用的方法

    这篇文章主要介绍了Springboot通过run启动web应用的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-03-03

最新评论