RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解

 更新时间:2023年12月12日 09:57:28   作者:warybee  
这篇文章主要介绍了RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解,生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能,需要的朋友可以参考下

1. 概述

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都 有可能。此时可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还 没接收到这个消息的回调,那么你可以重发。

在这里插入图片描述

在实际项目中,可以利用这一机制保障消息的可靠性投递,如果消息未发送成功,可以在监听事件中记录日志、重新发送消息等操作。

2.原生API中开启Confirm消息确认机制

  • 在生产者的channel上开启确认机制: channel.confirmSelect();
  • 在channel上添加Confirm监听事件: channel.addConfirmListener(new ConfirmListener() ...

2.1 代码演示

生产者代码

监听事件的两个方法:handleAck() 消息投递成功后回调,handleNack 消息未成功投递回调

public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建一个链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //消息的确认模式
        channel.confirmSelect();
        String exchangeName="test_confirm_exchange";
        String routeKey="confirm.test";
        String msg="RabbitMQ send message confirm test!";
        for (int i=0;i<5;i++){
            channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
        }
        //确定监听事件
        channel.addConfirmListener(new ConfirmListener() {
            /**
             *  消息成功发送
             * @param deliveryTag   消息唯一标签
             * @param multiple  是否批量
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********Ack*********");
            }
            /**
             *  消息没有成功发送
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********No Ack*********");
            }
        });
    }

消费者端代码

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_confirm_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_confirm_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="confirm.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                }
            });
    }

到此这篇关于RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解的文章就介绍到这了,更多相关Confirm消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 图文教程教你IDEA中的Spring环境搭建+简单入门

    图文教程教你IDEA中的Spring环境搭建+简单入门

    这篇文章主要介绍了图文教程教你IDEA中的Spring环境搭建+简单入门,Spring的环境搭建使用Maven更加方便,需要的朋友可以参考下
    2023-03-03
  • Java高并发BlockingQueue重要的实现类详解

    Java高并发BlockingQueue重要的实现类详解

    这篇文章主要给大家介绍了关于Java高并发BlockingQueue重要的实现类的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • Java实现较大二进制文件的读、写方法

    Java实现较大二进制文件的读、写方法

    本篇文章主要介绍了Java实现较大二进制文件的读、写方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • Java搭建简单Netty开发环境入门教程

    Java搭建简单Netty开发环境入门教程

    这篇文章主要介绍了Java搭建简单Netty开发环境入门教程,有详细的代码展示和maven依赖,能够帮助你快速上手Netty开发框架,需要的朋友可以参考下
    2021-06-06
  • Java压缩之LZW算法字典压缩与解压讲解

    Java压缩之LZW算法字典压缩与解压讲解

    今天小编就为大家分享一篇关于Java压缩之LZW算法字典压缩与解压讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • java中的十个大类总结

    java中的十个大类总结

    java.lang.string字符串类将是无可争议的冠军在任何一天的普及和不可以否认。这是最后一个类,用来创建操作不可变字符串字面值
    2013-10-10
  • 基于MyBatis的parameterType传入参数类型

    基于MyBatis的parameterType传入参数类型

    这篇文章主要介绍了基于MyBatis的parameterType传入参数类型,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • java.imageIo给图片添加水印的实现代码

    java.imageIo给图片添加水印的实现代码

    最近项目在做一个商城项目, 项目上的图片要添加水印①,添加图片水印;②:添加文字水印;一下提供下个方法,希望大家可以用得着
    2013-07-07
  • Java接口请求重试机制的几种常见方法

    Java接口请求重试机制的几种常见方法

    Java接口请求重试机制是保证系统稳定性和容错能力的重要手段之一,当接口请求发生失败或暂时性错误时,通过重试机制可以提高请求的成功率,本文将详细介绍Java接口请求重试机制的几种常见方法,需要的朋友可以参考下
    2023-11-11
  • 浅析Spring IOC bean为什么默认是单例

    浅析Spring IOC bean为什么默认是单例

    单例的意思就是说在 Spring IoC 容器中只会存在一个 bean 的实例,无论一次调用还是多次调用,始终指向的都是同一个 bean 对象,本文小编将和大家一起分析Spring IOC bean为什么默认是单例,需要的朋友可以参考下
    2023-12-12

最新评论