SpringBoot中的多RabbitMQ数据源配置实现

 更新时间:2023年09月07日 11:48:36   作者:阿劲  
本篇博客将介绍如何在 Spring Boot 中配置和管理多个 RabbitMQ 数据源,以满足不同的应用需求,具有一定的参考价值,感兴趣的可以了解一下

简介

在构建复杂的应用程序时,经常需要与多个数据源进行交互。这可能包括连接多个数据库、消息队列或其他数据存储系统。RabbitMQ 是一个流行的消息队列系统,它通过消息队列实现了应用程序之间的松耦合,适用于异步任务处理、解耦、削峰填谷等场景。本篇博客将介绍如何在 Spring Boot 中配置和管理多个 RabbitMQ 数据源,以满足不同的应用需求,并提供示例代码使用

1. 依赖引入

首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依赖,以便引入 RabbitMQ 相关的库和功能。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 抽象类

创建一个抽象类 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。这些信息包括主机、端口、用户名、密码、虚拟主机、队列名、交换机名、确认机制和消费条数等。这个抽象类的目的是为了让子类继承这些基本配置信息,并根据不同的数据源创建相应的RabbitMQ连接和管理器。

@Data
public abstract class AbstractRabbitConfiguration {
     protected String host;
    protected Integer port;
    protected String userName;
    protected String password;
    protected String virtualHost;
    protected String queueName;
    protected String exchangeName;
    protected String routingKey;
    protected String acknowledge = "manual";
    protected Integer prefetch = 1;
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(Boolean.TRUE);
        return connectionFactory;
    }
}

3. 子类

在抽象类的基础上,我们可以创建多个子类,每个子类对应一个不同的RabbitMQ数据源配置。以一个名为 RabbitConfig 的子类为例,假设它是用于主数据源的配置。

@Configuration
@ConfigurationProperties(prefix = "kxj.rabbit")
public class RabbitConfig extends AbstractRabbitConfiguration {
    @Bean("primaryConnectionFactory")
    @Primary
    public ConnectionFactory primaryConnectionFactory() {
        return super.connectionFactory();
    }
    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory,
                                         @Qualifier("confirmCallback") ConfirmCallback confirmCallback,
                                         @Qualifier("returnCallback") ReturnCallback returnCallback) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean(name = "primaryContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置ACK确认机制
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        // 设置消费者消费条数
        factory.setPrefetchCount(prefetch);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    @Bean(name = "primaryRabbitAdmin")
    public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        // 声明交换机,队列及对应绑定关系
        Queue queue = RabbitmqUtil.createQueue(queueName);
        FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName);
        Binding binding = RabbitmqUtil.createBinding(queue, exchange, "");
        RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin);
        return rabbitAdmin;
    }
}

在子类中,我们使用 @Configuration 注解将它标记为Spring的配置类,并使用 @ConfigurationProperties 注解将以 kxj.rabbit 为前缀的配置属性注入到类中。这使得我们可以在配置文件中为不同的数据源配置不同的RabbitMQ属性。

在子类中,我们定义多个Bean来配置RabbitMQ的连接、管理和消息处理等,以满足不同数据源的需求。在这里创建主数据源的连接工厂,并使用 @Primary 注解将其标记为默认的连接工厂。
除了连接工厂之外,我们还可以配置其他与RabbitMQ相关的Bean,如 RabbitTemplate、RabbitAdmin 以及回调类等。这些Bean可以根据不同数据源的需求进行配置,例如设置消息确认机制、消息返回机制和消息转换器等。

另外,我们在 rabbitTemplate 方法中也进行了一些配置,如设置 mandatory 为 true,设置消息转换器为 Jackson2JsonMessageConverter 等。

4. 配置回调类

在处理消息时,我们通常需要设置确认回调(ConfirmCallback)和返回回调(ReturnCallback)。这些回调类可以用于处理消息的确认和返回情况。

@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("传递消息到交换机成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
        } else {
            log.error("传递消息到交换机失败,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
        }
    }
}
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msg = new String(message.getBody());
        log.error(String.format("消息{%s}不能被正确路由,routingKey为{%s}", msg, routingKey));
    }
}

5. 配置文件

server.port=8895
kxj.rabbit.host=MQ地址
kxj.rabbit.port=MQ端口
kxj.rabbit.virtualHost=/
kxj.rabbit.userName=guest
kxj.rabbit.password=guest
kxj.rabbit.queueName=test.queue
kxj.rabbit.exchangeName=test.exchange
kxj.rabbit.routingKey=test-routing-key

6. 工具类

在 RabbitMQ 的配置过程中,我们需要声明交换机、队列和绑定关系等,这些操作可以通过一个工具类 RabbitmqUtil 来实现。

public class RabbitmqUtil {
    public static DirectExchange createDirectExchange(String exchangeName) {
        if (StringUtils.isNotBlank(exchangeName)) {
            return new DirectExchange(exchangeName, true, false);
        }
        return null;
    }
    public static TopicExchange createTopicExchange(String exchangeName) {
        if (StringUtils.isNotBlank(exchangeName)) {
            return new TopicExchange(exchangeName, true, false);
        }
        return null;
    }
    public static FanoutExchange createFanoutExchange(String exchangeName) {
        if (StringUtils.isNotBlank(exchangeName)) {
            return new FanoutExchange(exchangeName, true, false);
        }
        return null;
    }
    public static Queue createQueue(String queueName) {
        if (StringUtils.isNotBlank(queueName)) {
            return new Queue(queueName, true);
        }
        return null;
    }
    public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) {
        if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) {
            return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs();
        }
        return null;
    }
//    public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
//        rabbitAdmin.declareQueue(queue);
//        rabbitAdmin.declareExchange(exchange);
//        rabbitAdmin.declareBinding(binding);
//    }
    public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
        if (queue != null) {
            rabbitAdmin.declareQueue(queue);
        }
        if (exchange != null) {
            rabbitAdmin.declareExchange(exchange);
        }
        if (binding != null) {
            rabbitAdmin.declareBinding(binding);
        }
    }
}

7. 测试用例

我们可以编写一些测试用例来验证以上配置是否正确。下面是一个发送消息到主数据源的示例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTest {
    @Autowired
    @Qualifier("primaryRabbitAdmin")
    private RabbitAdmin primaryRabbitAdmin;
    @Autowired
    @Qualifier("primaryContainerFactory")
    private SimpleRabbitListenerContainerFactory primaryContainerFactory;
    @Autowired
    @Qualifier("primaryConnectionFactory")
    private ConnectionFactory primaryConnectionFactory;
    @Autowired
    private RabbitTemplate primaryRabbitTemplate;
    @Test
    public void testSend() {
        String message = "Hello, World!";
        primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message);
        String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue");
        assertEquals(message, receivedMessage);
    }
}

在上面的测试用例中,我们使用了 @Qualifier 注解来指定主数据源的 Bean,然后通过 RabbitTemplate 发送消息到 test.exchange,并在队列 test.queue 中接收到消息。我们可以通过断言来判断发送和接收的消息是否一致,以此验证配置是否正确。

总结

通过使用抽象类和子类的方式,我们可以轻松地配置和管理多个RabbitMQ数据源,每个数据源可以有不同的属性配置。这种方法使得我们的应用程序更具灵活性,能够与多个RabbitMQ实例交互,满足不同数据源的需求。同时,回调类的使用也可以帮助我们处理消息的确认和返回情况,确保消息的可靠性传递。

到此这篇关于SpringBoot中的多RabbitMQ数据源配置实现的文章就介绍到这了,更多相关SpringBoot 多RabbitMQ数据源配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java设计模式中的装饰者模式

    Java设计模式中的装饰者模式

    这篇文章主要介绍了Java设计模式中的装饰者模式,装饰者模式即Decorator Pattern,装饰模式是在不必改变原类文件和使用继承的情况下,动态地扩展一个对象的功能
    2022-07-07
  • springboot 同时启用http/https的配置方法

    springboot 同时启用http/https的配置方法

    本文给大家分享springboot 同时启用http/https的配置方法,通过修改配置文件、增加java配置的方法来实现此操作,具体内容详情跟随小编通过本文学习下吧
    2021-05-05
  • 详解Mybatis注解写法(附10余个常用例子)

    详解Mybatis注解写法(附10余个常用例子)

    这篇文章主要介绍了详解Mybatis注解写法(附10余个常用例子),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • Java并发系列之Semaphore源码分析

    Java并发系列之Semaphore源码分析

    这篇文章主要为大家详细介绍了Java并发系列之Semaphore源码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-02-02
  • 如何去除Java中List集合中的重复数据

    如何去除Java中List集合中的重复数据

    这篇文章主要介绍了Java中List集合去除重复数据的方法,对大家的工作或学习有一定价值,有需求的朋友可以参考下
    2020-05-05
  • 基于Java字符编码的使用详解

    基于Java字符编码的使用详解

    本篇文章对Java字符编码的使用进行了详细的分析介绍。需要的朋友参考下
    2013-05-05
  • Java8中对于LocalDateTime的序列化和反序列化问题

    Java8中对于LocalDateTime的序列化和反序列化问题

    这篇文章主要介绍了Java8中对于LocalDateTime的序列化和反序列化问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • jvm垃圾回收算法详细解析

    jvm垃圾回收算法详细解析

    这篇文章主要介绍了jvm垃圾回收算法详细解析,JVM有一套完整的垃圾回收算法,可以对程序运行时产生的垃圾对象进行及时的回收,以便释放JVM相应区域的内存空间,确保程序稳定高效的运行,但在真正了解垃圾回收算法之前,有必要对JVM的对象的引用做一个简单的铺垫
    2022-07-07
  • Java Socket编程简介_动力节点Java学院整理

    Java Socket编程简介_动力节点Java学院整理

    这篇文章主要介绍了Java Socket编程简介的相关知识,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-05-05
  • java多线程实现交通灯管理系统

    java多线程实现交通灯管理系统

    这篇文章主要为大家详细介绍了java多线程实现交通灯管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08

最新评论