RabbitMQ的ACK确认机制保障消费端消息的可靠性详解

 更新时间:2023年12月12日 10:12:31   作者:warybee  
这篇文章主要介绍了RabbitMQ的ACK确认机制保障消费端消息的可靠性详解,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把,需要的朋友可以参考下

1. 概述

如果消费端在你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

生产端消息可靠性保证可以使用RabbitMQ的confirm机制。

2. ACK机制与消费端消息补偿机制

把channel.basicConsume(...)方法的autoAck参数改为false

channel.basicAck(long deliveryTag, boolean multiple);方法,消费成功签收

参数说明:

  • deliveryTag:消息标识
  • multiple:是否批量签收

basicNack(long deliveryTag, boolean multiple, boolean requeue) ,消息消费失败

参数说明:

  • deliveryTag:消息标识
  • multiple:是否批量签收
  • requeue:true 消息会重回队列,false 消息会进入到死信队列

3. 代码演示

生产端

 public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建一个链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String routeKey="ack.test";
        for (int i=0;i<5;i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            //演示重回队列机制,使用num==0的消息签收失败重回队列
            headers.put("num", i);
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg="RabbitMQ send message ack test!"+i;
            channel.basicPublish(exchangeName,routeKey,properties,msg.getBytes());
        }
    }

消息端

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_ack_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="ack.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
        /**
         * autoAck:false  设置为手工签收
         */
        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                    try {
                        Thread.sleep(3000); //休眠5秒
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //演示重回队列机制,使用num==0的消息签收失败重回队列
                    if((Integer)properties.getHeaders().get("num") == 0) {
                        /**
                         * 参数说明:1、消息标识  2、是否批量签收  3、是否重回队列
                         */
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    } else {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            });
    }

运行代码以上后,由于在消费端,设置了第一条消息,签收失败重回队列,在RabbitMQ控制台中我们可以看到始终有一条消息未签收确认

在这里插入图片描述

到此这篇关于RabbitMQ的ACK确认机制保障消费端消息的可靠性详解的文章就介绍到这了,更多相关RabbitMQ的ACK确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入了解Java核心类库--Arrays类

    深入了解Java核心类库--Arrays类

    这篇文章主要为大家详细介绍了java Arrays类定义与使用的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能给你带来帮助
    2021-07-07
  • Java多线程之FutureTask的介绍及使用

    Java多线程之FutureTask的介绍及使用

    之前介绍了线程池相关的对象,Runable Callable与Future,下面介绍FutureTask的作用,它的特性是怎样的呢,需要的朋友可以参考下
    2021-06-06
  • Spring Cloud Admin健康检查 邮件、钉钉群通知的实现

    Spring Cloud Admin健康检查 邮件、钉钉群通知的实现

    这篇文章主要介绍了Spring Cloud Admin健康检查 邮件、钉钉群通知的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 如何使用 IntelliJ IDEA 编写 Spark 应用程序(Scala + Maven)

    如何使用 IntelliJ IDEA 编写 Spark 应用程序(Sc

    本教程展示了如何在IntelliJIDEA中使用Maven编写和运行一个简单的Spark应用程序(例如WordCount程序),本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-11-11
  • spring cloud如何修复zuul跨域配置异常的问题

    spring cloud如何修复zuul跨域配置异常的问题

    最近的开发过程中,使用spring集成了spring-cloud-zuul,在配置zuul跨域的时候遇到了问题,下面这篇文章主要给大家介绍了关于spring cloud如何修复zuul跨域配置异常的问题,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-09-09
  • SpringBoot通过注解注入Bean的几种方式解析

    SpringBoot通过注解注入Bean的几种方式解析

    这篇文章主要为大家介绍了SpringBoot注入Bean的几种方式示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪
    2022-03-03
  • Java调用Shell命令的方法

    Java调用Shell命令的方法

    这篇文章主要介绍了Java调用Shell命令的方法,实例分析了java调用shell命令的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • java读取用户登入退出日志信息上传服务端

    java读取用户登入退出日志信息上传服务端

    这篇文章主要介绍了java读取用户登入退出日志信息上传服务端的相关资料,需要的朋友可以参考下
    2016-05-05
  • Java IO流深入理解

    Java IO流深入理解

    这篇文章主要介绍了java IO流的深入理解,下面和小编来一起学习一下吧,希望能给你带来帮助,也希望您能够多多关注脚本之家的更多内容
    2021-07-07
  • Activiti如何启动流程并使流程前进

    Activiti如何启动流程并使流程前进

    这篇文章主要介绍了Activiti如何启动流程并使流程前进,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03

最新评论