RabbitMQ的ACK确认机制保障消费端消息的可靠性详解

 更新时间:2023年12月12日 10:12:31   作者:warybee  
这篇文章主要介绍了RabbitMQ的ACK确认机制保障消费端消息的可靠性详解,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把,需要的朋友可以参考下

1. 概述

如果消费端在你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

生产端消息可靠性保证可以使用RabbitMQ的confirm机制。

2. ACK机制与消费端消息补偿机制

把channel.basicConsume(...)方法的autoAck参数改为false

channel.basicAck(long deliveryTag, boolean multiple);方法,消费成功签收

参数说明:

  • deliveryTag:消息标识
  • multiple:是否批量签收

basicNack(long deliveryTag, boolean multiple, boolean requeue) ,消息消费失败

参数说明:

  • deliveryTag:消息标识
  • multiple:是否批量签收
  • requeue:true 消息会重回队列,false 消息会进入到死信队列

3. 代码演示

生产端

 public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建一个链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String routeKey="ack.test";
        for (int i=0;i<5;i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            //演示重回队列机制,使用num==0的消息签收失败重回队列
            headers.put("num", i);
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg="RabbitMQ send message ack test!"+i;
            channel.basicPublish(exchangeName,routeKey,properties,msg.getBytes());
        }
    }

消息端

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_ack_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="ack.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
        /**
         * autoAck:false  设置为手工签收
         */
        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                    try {
                        Thread.sleep(3000); //休眠5秒
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //演示重回队列机制,使用num==0的消息签收失败重回队列
                    if((Integer)properties.getHeaders().get("num") == 0) {
                        /**
                         * 参数说明:1、消息标识  2、是否批量签收  3、是否重回队列
                         */
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    } else {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            });
    }

运行代码以上后,由于在消费端,设置了第一条消息,签收失败重回队列,在RabbitMQ控制台中我们可以看到始终有一条消息未签收确认

在这里插入图片描述

到此这篇关于RabbitMQ的ACK确认机制保障消费端消息的可靠性详解的文章就介绍到这了,更多相关RabbitMQ的ACK确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot如何实现持久化登录状态获取

    SpringBoot如何实现持久化登录状态获取

    这篇文章主要介绍了SpringBoot 如何实现持久化登录状态获取,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java实现HttpGet请求传body参数

    Java实现HttpGet请求传body参数

    这篇文章主要为大家详细介绍了Java实现HttpGet请求传body参数的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-02-02
  • MyEclipse10安装Log4E插件

    MyEclipse10安装Log4E插件

    这篇文章主要介绍了MyEclipse10安装Log4E插件的相关资料,需要的朋友可以参考下
    2017-10-10
  • Mybatis-Plus通用枚举的使用详解

    Mybatis-Plus通用枚举的使用详解

    这篇文章主要介绍了Mybatis-Plus通用枚举的使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • Java中为何要使用ArrayList

    Java中为何要使用ArrayList

    这篇文章主要介绍了Java中为何要使用ArrayList,帮助大家更好的理解和学习Java,感兴趣的朋友可以了解下
    2020-09-09
  • Springboot中的@ComponentScan注解使用解析

    Springboot中的@ComponentScan注解使用解析

    这篇文章主要介绍了Springboot中的@ComponentScan注解使用解析,@ComponentScan用于类或接口上主要是指定扫描路径,spring会把指定路径下带有指定注解的类注册到IOC容器中,需要的朋友可以参考下
    2024-01-01
  • Java中的equsals和==

    Java中的equsals和==

    这篇文章主要介绍了Java中的equsals和==的相关内容,感兴趣的朋朋友可以参考下文
    2021-08-08
  • Java线程休眠之sleep方法详解

    Java线程休眠之sleep方法详解

    这篇文章主要介绍了Java线程休眠之sleep方法详解,Thread 类中有一个静态方法的sleep方法,当该线程调用sleep方法后,就会暂时让CPU的调度权,但是监视器资源比如锁并不会释放出去,需要的朋友可以参考下
    2024-01-01
  • Java实现将类数据逐行写入CSV文件的方法详解

    Java实现将类数据逐行写入CSV文件的方法详解

    这篇文章主要为大家详细介绍了Java如何实现将类数据逐行写入CSV文件,文中的示例代码讲解详细,具有一定的参考价值,需要的可以借鉴一下
    2022-11-11
  • 分析并发编程之LongAdder原理

    分析并发编程之LongAdder原理

    LongAdder类是JDK1.8新增的一个原子性操作类。AtomicLong通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,但是JDK开发组并不满足于此,因为非常搞并发的请求下AtomicLong的性能是不能让人接受的
    2021-06-06

最新评论