Springboot RabbitMQ 消息队列使用示例详解

 更新时间:2024年06月05日 10:12:34   作者:bj_wasin  
本文通过示例代码介绍了Springboot RabbitMQ 消息队列使用,对大家的学习或工作具有一定的参考借鉴价值,感兴趣的朋友跟随小编一起看看吧

一、概念介绍:

RabbitMQ中几个重要的概念介绍:

  • Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • 交换机类型主要有以下几种:
  • Direct Exchange(直连交换机):这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景。
  • Fanout Exchange(扇形交换机):这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
  • Topic Exchange(主题交换机):这种类型的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
  • Headers Exchange(头交换机):这种类型的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于需要在消息头中携带额外信息的场景。
  • Queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

二、引入依赖:

 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

三、添加配置信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动提交

四、Direct Exchange(直连交换机)模式

1、新建配置文件 RabbitDirectConfig类

package com.example.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,
 * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景
 */
@Configuration
public class RabbitDirectConfig {
    /**
     * 队列名称
     */
    public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";
    public static final String QUEUE_USER ="QUEUE_USER";
    /**
     * 交换机
     */
    public static final String EXCHANGE="EXCHANGE_01";
    /**
     * 路由
     */
    public static final String ROUTING_KEY="ROUTING_KEY_01";
    @Bean
    public Queue queue01() {
        return new Queue(QUEUE_MESSAGE, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @Bean
    public Queue queue02() {
        return new Queue(QUEUE_USER, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @Bean
    public DirectExchange exchange01() {
        return new DirectExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding demoBinding() {
        return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);
    }
    @Bean
    public Binding demoBinding2() {
        return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);
    }
}

2、添加消息生产者 Producer类

package com.example.direct;
import com.example.entity.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class Producer {
    @Resource
    RabbitTemplate rabbitTemplate;
    public void sendMessageByExchangeANdRoute(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);
    }
    /**
     * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
     * @param message
     */
    public void sendMessageByQueue(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);
    }
    public void sendMessage(User user){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);
    }
}

3、添加消息消费者

package com.example.direct;
import com.example.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class Consumer {
    @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)
    public void onMessage(User user){
        System.out.println("收到的实体bean消息:"+user);
    }
    @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)
    public void onMessage2(String message){
        System.out.println("收到的字符串消息:"+message);
    }
}

4、 测试

package com.example;
import com.example.entity.User;
import com.example.direct.Producer;
import com.example.fanout.FanoutProducer;
import com.example.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class SpringbootRabbitMqApplicationTests {
    @Resource
    Producer producer;
    @Test
    public void sendMessage() throws InterruptedException {
        producer.sendMessageByQueue("哈哈");
        producer.sendMessage(new User().setAge(10).setName("wasin"));
    }
}

五、Topic Exchange(主题交换机)模式

1、新建RabbitTopicConfig类

package com.example.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,
 * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
 */
@Configuration
public class RabbitTopicConfig {
    /**
     * 交换机
     */
    public static final String EXCHANGE = "EXCHANGE_TOPIC1";
    /**
     * 队列名称
     */
    public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";
    /**
     * 路由
     * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
     * 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....
     * aa.bb.wasin.cc 无法匹配
     */
    public static final String ROUTING_KEY1 = "*.wasin.#";
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_TOPIC1, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);
    }
}

2、新建 消息生产者和发送者

TopicProducer类

package com.example.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class TopicProducer {
    @Resource
    RabbitTemplate rabbitTemplate;
    /**
     * @param routeKey 路由
     * @param message 消息
     */
    public void sendMessageByQueue(String routeKey, String message){
        rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);
    }
}

TopicConsumer类

package com.example.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Slf4j
@Component
public class TopicConsumer {
    @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)
    public void onMessage2(String message){
        log.info("topic收到的字符串消息:{}",message);
    }
}

六、Fanout Exchange(扇形交换机)模式

1、 新建 RabbitFanoutConfig类

package com.example.fanout;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,
 * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
 */
@Configuration
public class RabbitFanoutConfig {
    /**
     * 交换机
     */
    public static final String EXCHANGE = "EXCHANGE_FANOUT";
    /**
     * 队列名称
     */
    public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";
    /**
     * 队列名称
     */
    public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";
    @Bean
    public Queue queueFanout1() {
        return new Queue(QUEUE_FANOUT1, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @Bean
    public Queue queueFanout2() {
        return new Queue(QUEUE_FANOUT2, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @Bean
    public FanoutExchange exchangeFanout() {
        return new FanoutExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding bindingFanout() {
        return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
    }
    @Bean
    public Binding bindingFanout2() {
        return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
    }
}

2、新建 消息生产者和发送者

FanoutProducer类:

package com.example.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class FanoutProducer {
    @Resource
    RabbitTemplate rabbitTemplate;
    /**
     * @param message 消息
     */
    public void sendMessageByQueue(String message) {
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);
    }
}

FanoutConsumer类

package com.example.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Slf4j
@Component
public class FanoutConsumer {
    /**
     * 手动提交
     * @param message
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)
    public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("fanout1收到的字符串消息:{}",message);
        channel.basicAck(tag,false);
    }
    @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)
    public void onMessage2(String message){
        log.info("fanout2到的字符串消息:{}",message);
    }
}

到此这篇关于Springboot RabbitMQ 消息队列使用的文章就介绍到这了,更多相关Springboot RabbitMQ 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java关键字instanceof用法及实现策略

    Java关键字instanceof用法及实现策略

    instanceof 运算符是用来在运行时判断对象是否是指定类及其父类的一个实例。这篇文章主要介绍了Java关键字instanceof用法解析,需要的朋友可以参考下
    2020-08-08
  • 手写Java LockSupport的示例代码

    手写Java LockSupport的示例代码

    LockSupport给我们提供了一个非常强大的功能,它是线程阻塞最基本的元语,他可以将一个线程阻塞也可以将一个线程唤醒,因此经常在并发的场景下进行使用。本文将用60行代码实现手写LockSupport,需要的可以参考一下
    2022-08-08
  • Springboot框架实现自动装配详解

    Springboot框架实现自动装配详解

    在使用springboot时,很多配置我们都没有做,都是springboot在帮我们完成,这很大一部分归功于springboot自动装配。本文将详细为大家讲解SpringBoot的自动装配原理,需要的可以参考一下
    2022-08-08
  • Java日期处理工具类DateUtils详解

    Java日期处理工具类DateUtils详解

    这篇文章主要为大家详细介绍了Java日期处理工具类DateUtils的相关代码,包含日期和时间常用操作,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-12-12
  • 教你用Java Swing实现自助取款机系统

    教你用Java Swing实现自助取款机系统

    今天给大家带来的是关于JAVA的相关知识,文章围绕着如何用Java Swing实现自助取款机系统展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • Spring Boot高可用限流三种实现解决方案

    Spring Boot高可用限流三种实现解决方案

    限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,本文就介绍了Spring Boot高可用限流三种实现解决方案,具有一定的参考价值,感兴趣的可以了解一下
    2023-08-08
  • java集合进行排序的方式总结

    java集合进行排序的方式总结

    在本篇文章里小编给大家整理的是一篇关于java集合进行排序的两种方式总结,有兴趣的朋友们可以学习参考下。
    2021-08-08
  • Spring @Valid和@Validated区别和用法实例

    Spring @Valid和@Validated区别和用法实例

    这篇文章主要介绍了Spring @Valid和@Validated区别和用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • 浅析Java8新特性Lambda表达式和函数式接口

    浅析Java8新特性Lambda表达式和函数式接口

    Lambda表达式理解为是 一段可以传递的代码。最直观的是使用Lambda表达式之后不用再写大量的匿名内部类,简化代码,提高了代码的可读性
    2017-08-08
  • SpringBoot动态定时任务、动态Bean、动态路由详解

    SpringBoot动态定时任务、动态Bean、动态路由详解

    这篇文章主要介绍了SpringBoot动态定时任务、动态Bean、动态路由详解,之前用过Spring中的定时任务,通过@Scheduled注解就能快速的注册一个定时任务,但有的时候,我们业务上需要动态创建,或者根据配置文件、数据库里的配置去创建定时任务,需要的朋友可以参考下
    2023-10-10

最新评论