RabbitMQ高级应用之消费端限流策略basicQos详解

 更新时间:2023年08月28日 10:08:26   作者:疯狂的帆  
这篇文章主要介绍了RabbitMQ高级应用之消费端限流策略basicQos详解,高并发情况下,队列里面一瞬间就就积累了上万条数据,但是消费者无法同时处理这么多请求,这种场景下我们就需要对消费端进行限流,需要的朋友可以参考下

业务场景

高并发情况下,队列里面一瞬间就就积累了上万条数据,但是消费者无法同时处理这么多请求,这个时候当我们打开客户端,瞬间就有巨量的信息给推送过来

但是客户端是没有办法同时处理这么多数据的,结果就是消费者(客户端)挂掉了…

这种场景下我们就需要对消费端进行限流

限流策略实现

限流策略关键代码:

channel.basicQos(); 

编写生产者

// 生产者
public class Producer {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("消费端限流策略—测试数据:" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

编写消费者1

// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

编写消费者2

// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**  设置限流机制
         *  param1: prefetchSize,消息本身的大小 如果设置为0  那么表示对消息本身的大小不限制
         *  param2: prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
         *  param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
         */
        channel.basicQos(0, 5, false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

运行结果

在这里插入图片描述

在这里插入图片描述

小结

  1. 限流的核心代码就是 channel.basicQos();
  2. 限流情况 ack 不能设置自动签收,一定要手动签收 channel.basicQos()
/**
     * @param prefetchSize maximum amount of content (measured in
     * octets) that the server will deliver, 0 if unlimited
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @param global true if the settings should be applied to the
     * entire channel rather than each consumer
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

该方法的作用是:进行消费端的限流

  • param1:prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
  • param2:prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
  • param3:global,是否将上面的设置应用于整个通道
    • false:表示只应用于当前消费者
    • true:表示当前通道的所有消费者都应用这个限流策略

到此这篇关于RabbitMQ高级应用之消费端限流策略basicQos详解的文章就介绍到这了,更多相关RabbitMQ消费端限流策略basicQos内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot使用validation-api实现对枚举类参数校验的方法

    SpringBoot使用validation-api实现对枚举类参数校验的方法

    这篇文章主要介绍了SpringBoot使用validation-api实现对枚举类参数校验,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • springboot结合vue实现增删改查及分页查询

    springboot结合vue实现增删改查及分页查询

    本文主要介绍了springboot结合vue实现增删改查及分页查询,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • java8 多个list对象用lambda求差集操作

    java8 多个list对象用lambda求差集操作

    这篇文章主要介绍了java8 多个list对象用lambda求差集操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • tk.mybatis如何扩展自己的通用mapper

    tk.mybatis如何扩展自己的通用mapper

    这篇文章主要介绍了tk.mybatis如何扩展自己的通用mapper操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 解读Java报错输出的信息究竟是什么

    解读Java报错输出的信息究竟是什么

    Java报错输出的信息主要包括异常的主要描述信息和当前线程的栈帧信息,栈帧是虚拟机栈的基本存储单元,主要由局部变量表、操作数栈和帧数据三部分组成,局部变量表用于存放方法的参数和局部变量,操作数栈用于保存计算过程中产生的中间结果
    2024-12-12
  • 使用Apache Ignite实现Java数据网格

    使用Apache Ignite实现Java数据网格

    今天我们来探讨如何使用Apache Ignite来实现Java数据网格,Apache Ignite是一个高性能的内存计算平台,它提供了分布式缓存、数据网格和计算功能,可以显著提高大规模应用的数据处理性能,感兴趣的小伙伴跟着小编一起来看看吧
    2024-08-08
  • IDEA 2020.1.1好用的plugins插件推荐

    IDEA 2020.1.1好用的plugins插件推荐

    这篇文章主要介绍了IDEA 2020.1.1好用的plugins插件推荐,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • SpringBoot2 task scheduler 定时任务调度器四种方式

    SpringBoot2 task scheduler 定时任务调度器四种方式

    这篇文章主要介绍了SpringBoot2 task scheduler 定时任务调度器四种方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • SpringBoot实现反向代理的示例代码

    SpringBoot实现反向代理的示例代码

    本文主要介绍了SpringBoot实现反向代理的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • JDK集合源码之解析TreeMap(二)

    JDK集合源码之解析TreeMap(二)

    下面小编就为大家带来一篇浅谈java中的TreeMap 排序与TreeSet 排序。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2021-07-07

最新评论