Springboot RabbitMQ 消息队列使用示例详解
更新时间:2024年06月05日 10:12:34 作者:bj_wasin
本文通过示例代码介绍了Springboot RabbitMQ 消息队列使用,对大家的学习或工作具有一定的参考借鉴价值,感兴趣的朋友跟随小编一起看看吧
一、概念介绍:
RabbitMQ中几个重要的概念介绍:
- Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- 交换机类型主要有以下几种:
- Direct Exchange(直连交换机):这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景。
- Fanout Exchange(扇形交换机):这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
- Topic Exchange(主题交换机):这种类型的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
- Headers Exchange(头交换机):这种类型的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于需要在消息头中携带额外信息的场景。
- Queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
二、引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
三、添加配置信息
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: acknowledge-mode: manual # 手动提交
四、Direct Exchange(直连交换机)模式
1、新建配置文件 RabbitDirectConfig类
package com.example.direct; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配, * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景 */ @Configuration public class RabbitDirectConfig { /** * 队列名称 */ public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE"; public static final String QUEUE_USER ="QUEUE_USER"; /** * 交换机 */ public static final String EXCHANGE="EXCHANGE_01"; /** * 路由 */ public static final String ROUTING_KEY="ROUTING_KEY_01"; @Bean public Queue queue01() { return new Queue(QUEUE_MESSAGE, //队列名称 true, //是否持久化 false, //是否排他 false //是否自动删除 ); } @Bean public Queue queue02() { return new Queue(QUEUE_USER, //队列名称 true, //是否持久化 false, //是否排他 false //是否自动删除 ); } @Bean public DirectExchange exchange01() { return new DirectExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding demoBinding() { return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY); } @Bean public Binding demoBinding2() { return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY); } }
2、添加消息生产者 Producer类
package com.example.direct; import com.example.entity.User; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class Producer { @Resource RabbitTemplate rabbitTemplate; public void sendMessageByExchangeANdRoute(String message){ rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message); } /** * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。 * @param message */ public void sendMessageByQueue(String message){ rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message); } public void sendMessage(User user){ rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user); } }
3、添加消息消费者
package com.example.direct; import com.example.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class Consumer { @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER) public void onMessage(User user){ System.out.println("收到的实体bean消息:"+user); } @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE) public void onMessage2(String message){ System.out.println("收到的字符串消息:"+message); } }
4、 测试
package com.example; import com.example.entity.User; import com.example.direct.Producer; import com.example.fanout.FanoutProducer; import com.example.topic.TopicProducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class SpringbootRabbitMqApplicationTests { @Resource Producer producer; @Test public void sendMessage() throws InterruptedException { producer.sendMessageByQueue("哈哈"); producer.sendMessage(new User().setAge(10).setName("wasin")); } }
五、Topic Exchange(主题交换机)模式
1、新建RabbitTopicConfig类
package com.example.topic; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键, * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。 */ @Configuration public class RabbitTopicConfig { /** * 交换机 */ public static final String EXCHANGE = "EXCHANGE_TOPIC1"; /** * 队列名称 */ public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC"; /** * 路由 * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个) * 可以匹配 aa.wasin.aa.bb wasin.aa.bb wasin.aa .... * aa.bb.wasin.cc 无法匹配 */ public static final String ROUTING_KEY1 = "*.wasin.#"; @Bean public Queue queue() { return new Queue(QUEUE_TOPIC1, //队列名称 true, //是否持久化 false, //是否排他 false //是否自动删除 ); } @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1); } }
2、新建 消息生产者和发送者
TopicProducer类
package com.example.topic; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class TopicProducer { @Resource RabbitTemplate rabbitTemplate; /** * @param routeKey 路由 * @param message 消息 */ public void sendMessageByQueue(String routeKey, String message){ rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message); } }
TopicConsumer类
package com.example.topic; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Slf4j @Component public class TopicConsumer { @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1) public void onMessage2(String message){ log.info("topic收到的字符串消息:{}",message); } }
六、Fanout Exchange(扇形交换机)模式
1、 新建 RabbitFanoutConfig类
package com.example.fanout; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列, * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。 */ @Configuration public class RabbitFanoutConfig { /** * 交换机 */ public static final String EXCHANGE = "EXCHANGE_FANOUT"; /** * 队列名称 */ public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT"; /** * 队列名称 */ public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2"; @Bean public Queue queueFanout1() { return new Queue(QUEUE_FANOUT1, //队列名称 true, //是否持久化 false, //是否排他 false //是否自动删除 ); } @Bean public Queue queueFanout2() { return new Queue(QUEUE_FANOUT2, //队列名称 true, //是否持久化 false, //是否排他 false //是否自动删除 ); } @Bean public FanoutExchange exchangeFanout() { return new FanoutExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding bindingFanout() { return BindingBuilder.bind(queueFanout1()).to(exchangeFanout()); } @Bean public Binding bindingFanout2() { return BindingBuilder.bind(queueFanout2()).to(exchangeFanout()); } }
2、新建 消息生产者和发送者
FanoutProducer类:
package com.example.fanout; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class FanoutProducer { @Resource RabbitTemplate rabbitTemplate; /** * @param message 消息 */ public void sendMessageByQueue(String message) { rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message); } }
FanoutConsumer类
package com.example.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Slf4j @Component public class FanoutConsumer { /** * 手动提交 * @param message * @param channel * @param tag * @throws IOException */ @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1) public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("fanout1收到的字符串消息:{}",message); channel.basicAck(tag,false); } @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2) public void onMessage2(String message){ log.info("fanout2到的字符串消息:{}",message); } }
到此这篇关于Springboot RabbitMQ 消息队列使用的文章就介绍到这了,更多相关Springboot RabbitMQ 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Spring @Valid和@Validated区别和用法实例
这篇文章主要介绍了Spring @Valid和@Validated区别和用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2020-04-04SpringBoot动态定时任务、动态Bean、动态路由详解
这篇文章主要介绍了SpringBoot动态定时任务、动态Bean、动态路由详解,之前用过Spring中的定时任务,通过@Scheduled注解就能快速的注册一个定时任务,但有的时候,我们业务上需要动态创建,或者根据配置文件、数据库里的配置去创建定时任务,需要的朋友可以参考下2023-10-10
最新评论