RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解

 更新时间:2023年12月12日 09:57:28   作者:warybee  
这篇文章主要介绍了RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解,生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能,需要的朋友可以参考下

1. 概述

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都 有可能。此时可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还 没接收到这个消息的回调,那么你可以重发。

在这里插入图片描述

在实际项目中,可以利用这一机制保障消息的可靠性投递,如果消息未发送成功,可以在监听事件中记录日志、重新发送消息等操作。

2.原生API中开启Confirm消息确认机制

  • 在生产者的channel上开启确认机制: channel.confirmSelect();
  • 在channel上添加Confirm监听事件: channel.addConfirmListener(new ConfirmListener() ...

2.1 代码演示

生产者代码

监听事件的两个方法:handleAck() 消息投递成功后回调,handleNack 消息未成功投递回调

public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建一个链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //消息的确认模式
        channel.confirmSelect();
        String exchangeName="test_confirm_exchange";
        String routeKey="confirm.test";
        String msg="RabbitMQ send message confirm test!";
        for (int i=0;i<5;i++){
            channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
        }
        //确定监听事件
        channel.addConfirmListener(new ConfirmListener() {
            /**
             *  消息成功发送
             * @param deliveryTag   消息唯一标签
             * @param multiple  是否批量
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********Ack*********");
            }
            /**
             *  消息没有成功发送
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********No Ack*********");
            }
        });
    }

消费者端代码

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_confirm_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_confirm_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="confirm.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                }
            });
    }

到此这篇关于RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解的文章就介绍到这了,更多相关Confirm消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Spring基于xml的两种依赖注入方式

    详解Spring基于xml的两种依赖注入方式

    这篇文章主要介绍了详解Spring基于xml的两种依赖注入方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-12-12
  • Spring Boot面试必问之启动流程知识点详解

    Spring Boot面试必问之启动流程知识点详解

    SpringBoot是Spring开源组织下的子项目,是Spring组件一站式解决方案,主要是简化了使用Spring的难度,简省了繁重的配置,提供了各种启动器,开发者能快速上手,这篇文章主要给大家介绍了关于Spring Boot面试必问之启动流程知识点的相关资料,需要的朋友可以参考下
    2022-06-06
  • Spring Boot 2和Redis例子实现过程解析

    Spring Boot 2和Redis例子实现过程解析

    这篇文章主要介绍了Spring Boot2发布与调用REST服务过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • MyBatis实战之动态生成SQL详解

    MyBatis实战之动态生成SQL详解

    这篇文章主要为大家详细介绍了如何使用MyBatis实现动态生成SQL,可以告别硬编码,拥抱智能SQL生成,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-07-07
  • 利用maven deploy上传本地jar至私服的方法

    利用maven deploy上传本地jar至私服的方法

    这篇文章主要介绍了利用maven deploy上传本地jar至私服的方法,本文结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-02-02
  • Java基础篇之分布式版本控制工具Git

    Java基础篇之分布式版本控制工具Git

    Git是一个开源的分布式版本控制系统,可以有效、高速地处理从很小到非常大的项目版本管理。 也是Linus Torvalds为了帮助管理Linux内核开发而开发的一个开放源码的版本控制软件
    2021-10-10
  • redisson特性及优雅实现示例

    redisson特性及优雅实现示例

    这篇文章主要为大家介绍了redisson特性及优雅实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • java判断域名无法访问自行访问下一条

    java判断域名无法访问自行访问下一条

    这篇文章主要为大家介绍了java实现判断域名无法访问的时候自行访问下一条域名示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • Mybatis与Jpa的区别和性能对比总结

    Mybatis与Jpa的区别和性能对比总结

    mybatis和jpa两个持久层框架,从底层到用法都不同,但是实现的功能是一样的,所以说一直以来颇有争议,所以下面这篇文章主要给大家介绍了关于Mybatis与Jpa的区别和性能对比的相关资料,需要的朋友可以参考下
    2021-06-06
  • Java详解HashMap实现原理和源码分析

    Java详解HashMap实现原理和源码分析

    这篇文章主要介绍了Java关于HashMap的实现原理并进行源码分析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09

最新评论