SpringBoot 整合 RabbitMQ 的使用方式(代码示例)

 更新时间:2024年10月11日 09:39:36   作者:寰梦  
本文详细介绍了使用RabbitTemplate进行消息传递的几种模式,包括点对点通信、发布/订阅模式、工作队列模式、路由模式和主题模式,每种模式都通过代码示例展示了生产者和消费者的实现,帮助开发者理解和运用RabbitMQ进行高效的消息处理

一、RabbitTemplate 的使用

1.【导入依赖】

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.1</version>
</dependency>

2.【添加配置】

rabbitmq:
    host:   #ip地址
    port: 5672 #端口
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 默认每次取出一条消息消费, 消费完成取下一条
        acknowledge-mode: manual # 设置消费端手动ack确认
        retry:
          enabled: true # 是否支持重试
    publisher-confirm-type: correlated  #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)

3.【点对点通信(队列模式)(Point-to-Point Messaging)】

使用方式:

这种方式也被称为队列(Queue)模型。消息发送者(Producer)发送消息到队列,然后消息接收者(Consumer)从队列中获取消息进行处理。这种模型下,每个消息只有一个消费者可以接收,确保消息的可靠传递和顺序处理。

代码示例: 生产者

    /**
     * 第一种模型: 简单模型
     * 一个消息生产者  一个队列  一个消费者
     * @return
     */
    @GetMapping("hello/world")
    public void helloWorld() {
        SysUser sysUser = new SysUser();
        // 发送消息
        // 第一个参数: String routingKey 路由规则 【交换机 和队列的绑定规则 】  队列名称
        // 第二个参数: object message 消息的内容
//        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!");
        ///  MessagePostProcessor 消息包装器  如果需要对消息进行包装
        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> {
            // 设置唯一的标识
            message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
            return message;
        });

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class HelloWorldConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * 监听 hello_world_queue 队列消费消息
     * queues 监听队列的名称  要求这个队列必须是已经存在的队列
     * queuesToDeclare 监听队列 如果这个队列不存在 则 rabbitMQ 中 RabbitAdmin 会帮助去构建这个队列
     */
    @RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))
    public void helloWorldConsumer(String msg, Message message, Channel channel) {
        // 获取消息的唯一标识
        String messageId = message.getMessageProperties().getMessageId();
        // 将消息添加到 Redis的set集合中  set 不能重复的  方法的返回值 添加成功的数量
        Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId);
        if (count != null && count == 1) {
            // 没有消费过   正常消费
            log.info("hello_world_queue队列消费者接收到了消息,消息内容:{}", message);
        }
    }
}

4.【发布/订阅模式(Publish/Subscribe Messaging)】

使用方式:

在发布/订阅模式中,消息发送者将消息发布到交换机(Exchange),而不是直接发送到队列。交换机负责将消息路由到一个或多个绑定的队列中。每个订阅者(Subscriber)可以选择订阅它感兴趣的消息队列,从而接收消息。

代码示例: 生产者

/**
 * 工作队列
 * 一个生产者  一个队列  多个消费者
 */
@GetMapping("work/queue")
public void workQueue() {
    for (int i = 1; i <= 10; i++) {
        rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!");
    }
}

消费者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class WorkQueueConsumer {
    /***
     * 消费者1
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer(String message) throws InterruptedException {
        Thread.sleep(200);
        log.info("work_queue队列消费者1接收到了消息,消息内容:{}", message);
    }
    /***
     * 消费者2
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer2(String message) throws InterruptedException {
        Thread.sleep(400);
        log.info("work_queue队列消费者2接收到了消息,消息内容:{}", message);
    }
}

5.【工作队列模式(Work Queues)】

使用方式:

工作队列模式也称为任务队列(Task Queues),它可以用来实现任务的异步处理。多个工作者(Worker)同时监听同一个队列,当有新的任务消息被发送到队列中时,空闲的工作者会获取并处理这些任务,确保任务能够并行处理而不会重复执行。

代码示例: 生产者

/**
 * 发布订阅
 * 一个生产者  多个队列   多个消费者   涉及 到交换机  fanout
 */
@GetMapping("publish/subscribe")
public void publishSubscribe() {
    // 第一个参数: 交换机的名称  没有要求
    // 第二个参数: 交换机和队列的绑定规则    如果是发布订阅模式 那么这个规则默认不写 只需要交换机和队列绑定即可不需要规则
    // 第三个参数: 消息内容
    rabbitTemplate.convertAndSend("publish_subscribe_exchange", "",
            "hello publisher subscribe!!");
}

消费者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PublisherSubscribeConsumer {
    private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class);
    /**
     * 发布订阅模型消费者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),
            exchange = @Exchange(name = "publish_subscribe_exchange",
                    type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe(String message) {
        log.info("发布订阅模型消费者1接收到了消息,消息内容:{}", message);
    }
    /**
     * 发布订阅模型消费者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),
            exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe2(String message) {
        log.info("发布订阅模型消费者2接收到了消息,消息内容:{}", message);
    }
}

6.【路由模式(Routing)】

使用方式:

路由模式允许发送者根据消息的路由键(Routing Key)将消息路由到特定的队列。发送者将消息发送到交换机,并且通过设置不同的路由键,使消息能够被交换机路由到不同的队列。消费者可以根据需要选择监听哪些队列来接收消息。

代码示例: 生产者

/**
 * 路由模型
 * 一个生产者  多个队列   多个消费者   涉及 到交换机  direct
 */
@GetMapping("routing")
public void routing() {
    // 第一个参数: 交换机的名称  没有要求
    // 第二个参数: 交换机和队列的绑定规则    字符串 随意
    // 第三个参数: 消息内容
    rabbitTemplate.convertAndSend("routing_exchange", "aaa",
            "hello routing!!");
}

消费者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class RoutingConsumer {
    /**
     * 路由模型消费者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "abc", "error", "info" }))
    public void routingConsumer(String message) {
        log.info("路由模型消费者1接收到了消息,消息内容:{}", message);
    }
    /**
     * 路由模型消费者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "aaa", "ccc", "waadaffas" }))
    public void routingConsumer2(String message) {
        log.info("路由模型消费者2接收到了消息,消息内容:{}", message);
    }
    /**
     * 路由模型消费者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "bbbb", "asdfasd", "asdfasdf" }))
    public void routingConsumer3(String message) {
        log.info("路由模型消费者3接收到了消息,消息内容:{}", message);
    }
}

7.【主题模式(Topics)】

使用方式:

主题模式是路由模式的一种扩展,它允许发送者根据消息的多个属性(如主题)将消息路由到一个或多个队列。主题交换机(Topic Exchange)使用通配符匹配路由键与队列绑定键的模式,从而实现更灵活的消息路由和过滤。

代码示例: 生产者

/**
 * 主题模型
 * 一个生产者  多个队列   多个消费者   涉及 到交换机  topic
 */
@GetMapping("topic")
public void topic() {
    // 第一个参数: 交换机的名称  没有要求
    // 第二个参数: 交换机和队列的绑定规则    多个单词  以 “.” 拼起来
    // 第三个参数: 消息内容
    rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name",
            "hello topic!!");
}

消费者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class TopicConsumer {
    /**
     * *  表示任意一个单词
     * #  表示任意一个单词 或 多个
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "error.*.info", "#.name" }))
    public void topicConsumer(String message) {
        log.info("xxxxxxxxx1");
    }
    /**
     * *  表示任意一个单词
     * #  表示任意一个单词 或 多个
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "username" }))
    public void topicConsumer2(String message) {
        log.info("xxxxxxxxx2");
    }
    /**
     * *  表示任意一个单词
     * #  表示任意一个单词 或 多个
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "bwie.*", "error.*.info" }))
    public void topicConsumer3(String message) {
        log.info("xxxxxxxxx3");
    }
}

到此这篇关于SpringBoot 整合 RabbitMQ 的使用的文章就介绍到这了,更多相关SpringBoot 整合 RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java OpenSSL生成的RSA公私钥进行数据加解密详细介绍

    Java OpenSSL生成的RSA公私钥进行数据加解密详细介绍

    这篇文章主要介绍了Java OpenSSL生成的RSA公私钥进行数据加解密详细介绍的相关资料,这里提供实例代码及说明具体如何实现,需要的朋友可以参考下
    2016-12-12
  • redisson分布式锁的用法大全

    redisson分布式锁的用法大全

    这篇文章主要介绍了redisson分布式锁的用法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • Java毕业设计实战之财务预算管理系统的实现

    Java毕业设计实战之财务预算管理系统的实现

    这是一个使用了java+SSM+Jsp+Mysql+Layui+Maven开发的财务预算管理系统,是一个毕业设计的实战练习,具有财务预算管理该有的所有功能,感兴趣的朋友快来看看吧
    2022-02-02
  • java.lang.OutOfMemoryError: Metaspace异常解决的方法

    java.lang.OutOfMemoryError: Metaspace异常解决的方法

    这篇文章主要介绍了java.lang.OutOfMemoryError: Metaspace异常解决的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • SpringCloud的Gateway网关详解

    SpringCloud的Gateway网关详解

    这篇文章主要介绍了SpringCloud的Gateway网关详解,Gateway 是 Spring Cloud 官方推出的一个基于 Spring 5、Spring Boot 2 和 Project Reactor 的 API 网关实现,本文将介绍 Spring Cloud Gateway 的基本概念、核心组件以及如何配置和使用它,需要的朋友可以参考下
    2023-09-09
  • Java 对象序列化 NIO NIO2详细介绍及解析

    Java 对象序列化 NIO NIO2详细介绍及解析

    这篇文章主要介绍了Java 对象序列化 NIO NIO2详细介绍及解析的相关资料,序列化机制可以使对象可以脱离程序的运行而对立存在,需要的朋友可以参考下
    2017-02-02
  • Springboot项目打包如何将依赖的jar包输出到指定目录

    Springboot项目打包如何将依赖的jar包输出到指定目录

    公司要对springboot项目依赖的jar包进行升级,但是遇到一个问题,项目打包之后,没办法看到他里面依赖的jar包,版本到底是不是升上去了,没办法看到,下面通过本文给大家分享Springboot项目打包如何将依赖的jar包输出到指定目录,感兴趣的朋友一起看看吧
    2024-05-05
  • Go反射底层原理及数据结构解析

    Go反射底层原理及数据结构解析

    这篇文章主要介绍了Go反射底层原理及数据结构解析,反射的实现和interface的组成很相似,都是由“类型”和“数据值”构成,下面小编分享更多相关内容需要的小伙伴可以参考一下
    2022-06-06
  • Spring Session的使用示例

    Spring Session的使用示例

    最近团队一个项目需要使用Session,之前没有在实际项目中使用过Spring-Session,这里记录一下使用的过程
    2021-06-06
  • JVM垃圾回收器选型与调优方式

    JVM垃圾回收器选型与调优方式

    本文介绍了JVM垃圾回收器选型与调优的关键原理和实战技巧,包括基础数据结构、API使用、常见问题及解决方案、性能优化建议等,并提供了实际项目中的最佳实践,通过理解这些知识,可以有效提升代码质量
    2026-05-05

最新评论