聊聊RabbitMQ发布确认高级问题

 更新时间:2022年01月04日 11:24:51   作者:崇尚学技术的科班人  
这篇文章主要介绍了RabbitMQ发布确认高级问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1、发布确认高级

1. 存在的问题

再生产环境中由于一些不明原因导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,会导致消息丢失。

1.1、发布确认SpringBoot版本

1.1.1、确认机制方案

在这里插入图片描述

当消息不能正常被接收的时候,我们需要将消息存放在缓存中。

1.1.2、代码架构图

在这里插入图片描述

1.1.3、配置文件

spring.rabbitmq.host=192.168.123.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:禁用发布确认模式,是默认值。
  • CORRELATED:发布消息成功到交换机会触发回调方方法。
  • CORRELATED:就是发布一个就确认一个。

1.1.4、配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

1.1.5、回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回调接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机接受失败后进行回调
     * 1. 保存消息的ID及相关消息
     * 2. 是否接收成功
     * 3. 接受失败的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交换机已经收到id为:{}的消息",id);
        }else{
            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);
        }
    }
}

1.1.6、生产者

import com.xiao.springbootrabbitmq.utils.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;



    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("发送得内容是:{}",message);
    }
}

1.1.7、消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到队列" + CONFIRM_QUEUE_NAME + "消息:{}",msg);
    }
}

1.1.8、测试结果

1. 第一种情况

在这里插入图片描述

ID1的消息正常送达,ID2的消息由于RoutingKey的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收

2. 第二种情况

在这里插入图片描述

我们再上一种情况下修改了ID1的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息

1.2、回退消息

1.2.1、Mandatory参数

  • 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(就是消息被交换机成功接收后,无法到达队列),那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个事件的
  • 通过设置该参数可以在消息传递过程中不可达目的地时将消息返回给生产者。

1.2.2、配置文件

spring.rabbitmq.publisher-returns=true

需要在配置文件种开启返回回调

1.2.3、生产者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);
        log.info("发送得内容是:{}",message + routingKey1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("发送得内容是:{}",message + routingKey2);
    }
}

1.2.4、回调接口代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回调接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机接受失败后进行回调
     * 1. 保存消息的ID及相关消息
     * 2. 是否接收成功
     * 3. 接受失败的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交换机已经收到id为:{}的消息",id);
        }else{
            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);
        }
    }


    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        Message message = returnedMessage.getMessage();
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        String replyText = returnedMessage.getReplyText();
        log.error("消息{},被交换机{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    }
}

1.2.5、测试结果

其他类的代码与上一小节案例相同

在这里插入图片描述

ID2的消息由于RoutingKey不可路由,但是还是被回调函数处理了。

1.3、备份交换机

1.3.1、代码架构图

在这里插入图片描述

这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机

1.3.2、配置类代码

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

    public static final String BACKUP_QUEUE_NAME = "backup_queue";

    public static final String WARNING_QUEUE_NAME = "warning_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    @Bean
    public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                        @Qualifier("backupQueue") Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                         @Qualifier("warningQueue") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

1.3.3、消费者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class WarningConsumer {
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("报警发现不可路由的消息内容为:{}",msg);
    }
}

1.3.4、测试结果

在这里插入图片描述

mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高

到此这篇关于RabbitMQ发布确认高级的文章就介绍到这了,更多相关RabbitMQ发布确认高级内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • idea同步yapi插件的详细步骤

    idea同步yapi插件的详细步骤

    yapi是一个很好的接口文档维护工具,其swagger功能,可将接口信息同步到yapi平台上,这篇文章主要介绍了idea同步yapi插件,需要的朋友可以参考下
    2024-04-04
  • Javas使用Redlock实现分布式锁过程解析

    Javas使用Redlock实现分布式锁过程解析

    这篇文章主要介绍了Javas使用Redlock实现分布式锁过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-08-08
  • Spring Boot中@value的常见用法及案例

    Spring Boot中@value的常见用法及案例

    @Value注解是Spring框架中强大且常用的注解之一,本文主要介绍了SpringBoot中@value的常见用法及案例,具有一定的参考价值,感兴趣的可以了解一下
    2023-09-09
  • 详解Java编程中throw和throws子句的使用方法

    详解Java编程中throw和throws子句的使用方法

    这篇文章主要介绍了详解Java编程中throw和throws子句的使用方法,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-09-09
  • java实现字符串匹配求两个字符串的最大公共子串

    java实现字符串匹配求两个字符串的最大公共子串

    这篇文章主要介绍了java实现求两个字符串最大公共子串的方法,详细的描述了两个字符串的最大公共子串算法的实现,需要的朋友可以参考下
    2016-10-10
  • java同步器AQS架构AbstractQueuedSynchronizer原理解析下

    java同步器AQS架构AbstractQueuedSynchronizer原理解析下

    这篇文章主要为大家介绍了java同步器AQS架构AbstractQueuedSynchronizer原理解析下,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-03-03
  • SpringBoot全局异常与数据校验的方法

    SpringBoot全局异常与数据校验的方法

    这篇文章主要介绍了SpringBoot全局异常与数据校验的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-11-11
  • Java编程之如何通过JSP实现头像自定义上传

    Java编程之如何通过JSP实现头像自定义上传

    之前做这个头像上传功能还是花了好多时间的,今天我将我的代码分享给大家,下面这篇文章主要给大家介绍了关于Java编程之如何通过JSP实现头像自定义上传的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-12-12
  • Intellij IDEA创建web项目的超详细步骤记录

    Intellij IDEA创建web项目的超详细步骤记录

    如果刚开始接触IDEA,或者之前使用的是eclipse/myEclipse的话,即使是创建一个JAVA WEB项目,估计也让很多人费了好几个小时,下面这篇文章主要给大家介绍了关于Intellij IDEA创建web项目的超详细步骤,需要的朋友可以参考下
    2022-08-08
  • SpringBoot+Vue实现EasyPOI导入导出的方法详解

    SpringBoot+Vue实现EasyPOI导入导出的方法详解

    项目开发过程中,很大的需求都有 导入导出功能。本文将利用SpringBoot+Vue实现EasyPOI导入导出功能,感兴趣的可以了解一下
    2022-08-08

最新评论