Spring Boot中使用RabbitMQ 生产消息和消费消息的实例代码

 更新时间:2024年10月16日 11:58:39   作者:令人作呕的溏心蛋  
本文介绍了在SpringBoot中如何使用RabbitMQ进行消息的生产和消费,详细阐述了RabbitMQ中交换机的作用和类型,包括直连交换机、主题交换机、扇出交换机和头交换机,并解释了各自的消息路由机制,感兴趣的朋友一起看看吧

引入RabbitMQ依赖

<!-- springboot集成rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

增加RabbitMQ配置

  #rabbitmq配置
spring:
  rabbitmq:
    host: ip地址
    port: 5672
    username: 账号
    password: 密码
    virtual-host: /

配置RabbitMQ交换机以及队列

package com.ckm.ball.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//rabbitMQ绑定交换机 / 队列
@Configuration
public class RabbitMQConfig {
    //========================================================RabbitMQ Queue========================================================//
    //创建fanout模式交换机
    @Bean
    public FanoutExchange fanoutExchangeProcess() {
        return new FanoutExchange("process-data-change-exchange", true, false);
    }
    //创建队列
    @Bean
    public Queue processDataChangeQueue() {
        return new Queue("process-data-change-queue", true);
    }
    //将队列绑定到交换机
    @Bean
    public Binding chatBindExchange() {
        return BindingBuilder.bind(processDataChangeQueue()).to(fanoutExchangeProcess());
    }
}

编写接口,模拟生产消息

@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/produceMessage")
@ApiOperation(value = "生产消息", tags = "测试接口")
public void updateTokenTime() {
	//生产消息,会到交换机,交换机下发给队列,队列监听到就会消费,执行业务逻辑
    rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");
}

编写消息监听类,模拟消费消息

package com.ckm.ball.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class RabbitMQDataSyncListenerProcess {
    //监听process-data-change-queue队列 -> 消费
    @RabbitListener(queues = "process-data-change-queue")
    public void orderDead(@Payload String productIdAndOrderId) {
        log.info("当前时间:{},收到队列信息:{}", new Date().toString(), productIdAndOrderId);
        //执行你的业务逻辑
        for (int i = 0; i < 5; i++) {
            System.out.println("循环次数: " + (i + 1));
            try {
                // 暂停 2000 毫秒(2 秒)
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // 处理异常
                System.err.println("线程被中断: " + e.getMessage());
            }
        }
    }
}

RabbitMQ 中的交换机的作用

RabbitMQ 中的交换机(Exchange)是消息路由的核心组件。它负责接收来自生产者发送的消息,并根据特定的路由规则将这些消息传递给一个或多个队列(Queue)。
交换机的主要功能和类型

1.消息路由:

  • 交换机决定消息应该发送到哪些队列,基于绑定(Binding)和路由键(Routing Key)。

2.类型:

  • 直连交换机(Direct Exchange):消息直接发送到与路由键精确匹配的队列。
  • 主题交换机(Topic Exchange):消息根据路由键模式匹配一个或多个队列,支持通配符。
  • 扇出交换机(Fanout Exchange):将消息广播到所有绑定的队列,不考虑路由键。
  • 头交换机(Headers Exchange):通过消息的属性(Headers)进行路由,而不是使用路由键。

工作流程

  • 生产者发送消息到交换机。
  • 交换机根据配置的路由规则和队列的绑定关系,将消息路由到相应的队列。
  • 消费者从队列中获取消息进行处理。

在我的代码中生产消息语句:

convertAndSend(交换机,路由键也就是队列,你想传递的参数)

在扇出交换机(Fanout Exchange)模式不需要指定路由键,因为指定了也没用。

rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");

在扇出交换机(Fanout Exchange)模式,应改成:

rabbitTemplate.convertAndSend("process-data-change-exchange", "", "hhhhhhhhhhhhhh");

在扇出交换机中,可以将路由键设置为空字符串 “”,因为扇出交换机会将消息发送到所有绑定的队列,而不需要考虑路由键的具体值。

  • 在扇出交换机中,路由键被忽略。
  • 消息会被广播到所有与交换机绑定的队列中。

四种交换机模式

1. 直连交换机(Direct Exchange)

直连交换机:发送到匹配路由键的队列。

// 创建直连交换机
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct-exchange", true, false);
}
// 创建队列
@Bean
public Queue directQueue() {
    return new Queue("direct-queue", true);
}
// 将队列绑定到直连交换机,同时指定路由键
@Bean
public Binding directBinding() {
    return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct-routing-key");
}

生产消息:

直连交换机生产消息:需要指定路由键。

// 发送消息到直连交换机
rabbitTemplate.convertAndSend("direct-exchange", "direct-routing-key", "Your message here");

2. 主题交换机(Topic Exchange)

主题交换机:支持模糊匹配路由键。

// 创建主题交换机
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic-exchange", true, false);
}
// 创建队列
@Bean
public Queue topicQueue() {
    return new Queue("topic-queue", true);
}
// 将队列绑定到主题交换机,同时指定路由键
@Bean
public Binding topicBinding() {
    return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#");
}

生产消息:

主题交换机生产消息:需要指定符合主题模式的路由键。

// 发送消息到主题交换机
rabbitTemplate.convertAndSend("topic-exchange", "topic.routing.key", "Your message here");

3. 扇出交换机(Fanout Exchange)

扇出交换机:将消息广播到所有绑定的队列。

// 创建扇出交换机
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout-exchange", true, false);
}
// 创建队列
@Bean
public Queue fanoutQueue1() {
    return new Queue("fanout-queue-1", true);
}
@Bean
public Queue fanoutQueue2() {
    return new Queue("fanout-queue-2", true);
}
// 将队列绑定到扇出交换机
@Bean
public Binding fanoutBinding1() {
    return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

生产消息:

扇出交换机生产消息:不需要路由键,使用空字符串即可。

// 发送消息到扇出交换机
rabbitTemplate.convertAndSend("fanout-exchange", "", "Your message here");

4. 头交换机(Headers Exchange)

头交换机:根据消息头中匹配的属性进行路由。

// 创建头交换机
@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers-exchange", true, false);
}
// 创建队列
@Bean
public Queue headersQueue() {
    return new Queue("headers-queue", true);
}
// 将队列绑定到头交换机,同时指定头属性
@Bean
public Binding headersBinding() {
    Map<String, Object> headers = new HashMap<>();
    headers.put("format", "pdf");
    headers.put("type", "report");
    return BindingBuilder.bind(headersQueue())
            .to(headersExchange())
            .whereAll(headers)
            .match();
}

生产消息:

头交换机生产消息:需要构建一个带有头属性的消息。

// 发送消息到头交换机
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("format", "pdf");
messageProperties.setHeader("type", "report");
Message message = new Message("Your message here".getBytes(), messageProperties);
rabbitTemplate.send("headers-exchange", "", message);

到此这篇关于Spring Boot中使用RabbitMQ 生产消息和消费消息的文章就介绍到这了,更多相关Spring Boot生产消息和消费消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java大对象存储之@Lob注解处理BLOB和CLOB数据的方法

    Java大对象存储之@Lob注解处理BLOB和CLOB数据的方法

    本文将深入探讨@Lob注解的使用方法、最佳实践以及在处理大对象存储时应当注意的性能与内存考量,我们将通过实际示例展示如何在Java应用中有效地管理和操作BLOB和CLOB数据,感兴趣的朋友一起看看吧
    2025-05-05
  • jsp、struts、spring、mybatis实现前端页面功能模块化拆分的方案

    jsp、struts、spring、mybatis实现前端页面功能模块化拆分的方案

    这篇文章主要介绍了 jsp、struts、spring、mybatis实现前端页面功能模块化拆分的方案,非常不错,需要的朋友参考下
    2017-01-01
  • java实现多数据源切换方式

    java实现多数据源切换方式

    本文介绍实现多数据源切换的四步方法:导入依赖、配置文件、启动类注解、使用@DS标记mapper和服务层,通过注解实现数据源动态切换,适用于实际开发中的多数据源场景
    2025-08-08
  • 深入解析Spring Boot热部署与性能优化实践指南

    深入解析Spring Boot热部署与性能优化实践指南

    本文从Spring Boot热部署原理入手,结合生产环境中的实战经验,深入分析热部署的底层实现机制,并给出性能优化的实践建议,帮助开发者在提升开发效率的同时保障系统性能,本文给大家介绍的非常详细,感兴趣的朋友一起看看吧
    2025-10-10
  • spring解决循环依赖的方案示例

    spring解决循环依赖的方案示例

    这篇文章主要介绍spring如何解决循环依赖,文中有相关的代码示例给大家参考,对我们的学习或工作有一定的帮助,感兴趣的同学可以借鉴阅读
    2023-05-05
  • Java中常用的数据库连接池_动力节点Java学院整理

    Java中常用的数据库连接池_动力节点Java学院整理

    数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个;释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏
    2017-08-08
  • Java实现获取控制台输出结果转换为变量的详细操作

    Java实现获取控制台输出结果转换为变量的详细操作

    在Java编程中,有时需将控制台的输出捕获为字符串,以便于后续的处理或测试,这种需求在日志记录、单元测试或调试时尤为常见,下面,将通过详细步骤来介绍如何使用ByteArrayOutputStream和PrintStream来实现这一功能,需要的朋友可以参考下
    2024-06-06
  • Java数据库连接池之DBCP浅析_动力节点Java学院整理

    Java数据库连接池之DBCP浅析_动力节点Java学院整理

    这篇文章主要为大家详细介绍了Java数据库连接池之DBCP的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • Java中extends与implements解析:继承与接口实现的本质区别

    Java中extends与implements解析:继承与接口实现的本质区别

    在Java面向对象编程中,extends和implements是两个核心关键字,它们分别用于实现类继承和接口实现,本文将深入剖析这两个关键字的异同点,并通过典型应用场景和代码示例帮助开发者掌握它们的正确使用方式,感兴趣的朋友一起看看吧
    2025-09-09
  • java实现ip地址与十进制数相互转换

    java实现ip地址与十进制数相互转换

    本文介绍在java中IP地址转换十进制数及把10进制再转换成IP地址的方法及实例参考,晒出来和大家分享一下
    2012-12-12

最新评论