RabbitMQ 从零到实战:概念、配置与 Spring Boot 集成实践指南

 更新时间:2026年06月11日 10:58:49   作者:霸道流氓气质  
RabbitMQ 是一个开源的消息队列系统,用于在分布式系统中存储和转发消息,它支持多种消息协议,包括 AMQP(高级消息队列协议),本文给大家介绍RabbitMQ 从零到实战:概念、配置与 Spring Boot 集成实践指南,感兴趣的朋友一起看看吧

一、RabbitMQ 核心概念

1.1 AMQP 模型中的角色

Producer(生产者)
    │
    │ 发送消息
    ▼
Exchange(交换机) ──── Binding(绑定规则) ──── Queue(队列)
                                                │
                                                │ 消费消息
                                                ▼
                                         Consumer(消费者)
角色说明
Producer消息发送方,将消息发送到 Exchange
Exchange接收消息并按规则路由到 Queue。消息不直接发到 Queue
BindingExchange 和 Queue 之间的绑定关系,定义路由规则
Queue消息最终存储的地方,消费者从这里拉取消息
Consumer消息接收方,从 Queue 中消费消息
Routing Key消息携带的路由键,Exchange 根据它决定发到哪个 Queue
Virtual Host逻辑隔离单元,类似数据库的 schema,不同 vhost 之间资源互不可见

1.2 Exchange 四种类型

类型路由规则典型场景
DirectRouting Key 精确匹配点对点,如订单处理
Fanout广播到所有绑定的 Queue,忽略 Routing Key通知广播,如日志
TopicRouting Key 模式匹配(* 匹配一个词,# 匹配多个词)灵活路由,如 order.createorder.*
Headers根据消息头属性匹配(几乎不用)特殊场景

1.3 与 Kafka 等术语的对应关系

很多人混淆 RabbitMQ 和 Kafka 的术语:

概念RabbitMQKafkaRocketMQ
消息存储单元QueuePartition (within Topic)Queue (within Topic)
消息分类/频道Exchange + Routing KeyTopicTopic
消费者分组多个 Consumer 消费同一 Queue(竞争消费)Consumer GroupConsumer Group
消息广播Fanout Exchange 绑定多个 Queue不同 Consumer Group不同 Consumer Group

注意:RabbitMQ 中没有原生的 “Topic”(作为消息分类的概念),它用 Exchange + Queue + Binding 的组合来实现类似能力。Kafka 和 RocketMQ 才有 Topic 的概念。

1.4 消费模式

竞争消费(Work Queue)

  • 多个 Consumer 监听同一个 Queue
  • 一条消息只被其中一个 Consumer 消费
  • 用于负载均衡

发布订阅(Pub/Sub)

  • 使用 Fanout Exchange 绑定多个 Queue
  • 每个 Queue 对应一个 Consumer
  • 一条消息被所有 Consumer 消费
  • 用于广播通知

1.5 消息确认机制(ACK)

模式说明风险
自动确认(auto-ack)消息发送给 Consumer 后立即从 Queue 删除Consumer 处理失败则消息丢失
手动确认(manual-ack)Consumer 处理完后主动发 ACK,Queue 才删除消息更安全,但需处理超时和重复投递
拒绝(reject/nack)Consumer 明确拒绝消息,可选择重新入队或丢弃死循环风险(不断重新入队)

1.6 消息持久化

  • Queue 持久化:Broker 重启后 Queue 定义不丢失(durable=true
  • 消息持久化:消息写入磁盘,Broker 重启后消息不丢失(deliveryMode=2
  • Exchange 持久化:Broker 重启后 Exchange 不丢失

三者都要配置才能保证消息在 Broker 故障恢复后不丢。

二、首次申请和部署 RabbitMQ

2.1 方式一:本地安装(开发环境)

Docker 方式(推荐)

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  rabbitmq:3-management
  • 5672:AMQP 协议端口(程序连接用)
  • 15672:管理控制台端口(浏览器访问)
  • 访问 http://localhost:15672 使用 admin/admin123 登录管理后台

2.2 方式二:云服务(生产环境)

以阿里云 AMQP(即本文参考工程使用的方式)为例:

申请步骤

  1. 登录阿里云控制台 → 搜索"消息队列 AMQP"
  2. 创建实例(选择地域、规格)
  3. 获取实例信息:
    • 接入点地址(addresses):如 amqp-cn-xxx.mq-amqp.cn-qingdao-xxx.aliyuncs.com
    • 实例 ID(instance-id)
    • AccessKey ID / Secret
    • 用户名/密码(由 AccessKey 生成的 Base64 编码)
  4. 在实例控制台中创建 Virtual Host
  5. 创建 Exchange
  6. 创建 Queue
  7. 建立 Binding(将 Queue 绑定到 Exchange,设置 Routing Key)

2.3 管理后台中的操作

在 RabbitMQ Management UI 中需要手动或通过 API 创建:

1. 创建 Exchange
   - Name: my-app.exchange
   - Type: direct
   - Durable: true
2. 创建 Queue
   - Name: my-app.import-task
   - Durable: true
3. 创建 Binding
   - Source: my-app.exchange
   - Destination: my-app.import-task
   - Routing Key: import-task(通常和 Queue 名相同)

三、Spring Boot 集成 RabbitMQ 配置详解

3.1 配置

spring:
  rabbitmq:
    # AMQP 接入点地址(阿里云格式,自建则为 host:port)
    addresses: amqp-cn-xxx5.mq-amqp.cn-qingdao-xxxx-a.aliyuncs.com
    # 认证信息(阿里云 AMQP 用 Base64 编码的 AccessKey)
    username: xxxx...(Base64编码)
    password: xxxxxx...(Base64编码)
    # 阿里云 AMQP 特有配置
    access-key-id: xxxxxx
    access-key-secret: xxxx
    instance-id: amqp-cn-xxxx
    region: cn-qingdao
    # 消费者监听配置
    listener:
      simple:
        # 消费失败时不重新入队(避免死循环)
        default-requeue-rejected: false
        # 最小并发消费者数
        concurrency: 1
        # 最大并发消费者数
        max-concurrency: 3
        # 预取数量(每次从 Broker 拉取几条未确认的消息)
        prefetch: 1

3.2 配置项含义详解

配置项含义建议值
addressesBroker 地址,多个用逗号分隔生产用集群地址
username / password连接认证不要用默认 guest
virtual-host虚拟主机,逻辑隔离按环境或团队划分
connection-timeout连接超时5000~10000ms
listener.simple.concurrency每个 @RabbitListener 启动的最小消费者线程数IO 密集设 2~5
listener.simple.max-concurrency最大消费者线程数5~20
listener.simple.prefetch每个消费者一次预取的消息数1(保证公平分发),高吞吐设 10~50
listener.simple.default-requeue-rejected消费异常时是否重新入队false(配合死信队列)

3.3 自建 RabbitMQ 的标准配置

spring:
  rabbitmq:
    host: 192.168.1.1xx
    port: 5672
    username: myapp
    password: myapp123
    virtual-host: /myapp
    connection-timeout: 5000
    listener:
      simple:
        acknowledge-mode: manual      # 手动确认
        concurrency: 2
        max-concurrency: 5
        prefetch: 1
        default-requeue-rejected: false
    publisher-confirm-type: correlated  # 生产者确认
    publisher-returns: true             # 消息无法路由时回调

四、完整示例:Spring Boot + RabbitMQ 异步导入

4.1 依赖(pom.xml)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2 配置(application.yml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
app:
  mq:
    exchange: app.exchange
    queue: app.import-task
    routing-key: import-task

4.3 RabbitMQ 资源声明(自动创建 Exchange、Queue、Binding)

package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMQ 资源配置.
 * 应用启动时自动创建 Exchange、Queue 并建立绑定关系.
 */
@Configuration
public class RabbitMqConfig {
  @Value("${app.mq.exchange}")
  private String exchangeName;
  @Value("${app.mq.queue}")
  private String queueName;
  @Value("${app.mq.routing-key}")
  private String routingKey;
  /** 声明持久化交换机. */
  @Bean
  public DirectExchange importExchange() {
    return new DirectExchange(exchangeName, true, false);
  }
  /** 声明持久化队列. */
  @Bean
  public Queue importQueue() {
    return new Queue(queueName, true);
  }
  /** 建立绑定关系. */
  @Bean
  public Binding importBinding(Queue importQueue, DirectExchange importExchange) {
    return BindingBuilder.bind(importQueue).to(importExchange).with(routingKey);
  }
}

4.4 消息体 DTO

package com.example.dto;
import java.io.Serializable;
import java.util.List;
import lombok.Data;
/**
 * 导入任务消息体.
 */
@Data
public class ImportTaskMessage implements Serializable {
  /** 批次ID. */
  private Long batchId;
  /** 批次号. */
  private String batchNo;
  /** 待插入的数据(JSON序列化后体积较大时,可改为只传batchId,消费端自己查数据). */
  private List<ImportRecord> records;
}

4.5 生产者(发送消息)

package com.example.mq;
import com.example.dto.ImportTaskMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
/**
 * 导入任务消息生产者.
 */
@Slf4j
@Component
public class ImportTaskProducer {
  @Resource
  private RabbitTemplate rabbitTemplate;
  @Value("${app.mq.exchange}")
  private String exchange;
  @Value("${app.mq.routing-key}")
  private String routingKey;
  /**
   * 发送导入任务消息.
   */
  public void send(ImportTaskMessage message) {
    log.info("发送导入任务消息,批次号:{}", message.getBatchNo());
    rabbitTemplate.convertAndSend(exchange, routingKey, message);
    log.info("导入任务消息发送完成");
  }
}

4.6 消费者(接收并处理消息)

package com.example.mq;
import com.example.dto.ImportRecord;
import com.example.dto.ImportTaskMessage;
import com.example.mapper.ImportRecordMapper;
import jakarta.annotation.Resource;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * 导入任务消息消费者.
 */
@Slf4j
@Component
public class ImportTaskConsumer {
  private static final int BATCH_SIZE = 2000;
  @Resource
  private ImportRecordMapper importRecordMapper;
  /**
   * 消费导入任务.
   * 使用 @RabbitListener 注解声明监听哪个队列.
   */
  @RabbitListener(queues = "${app.mq.queue}")
  public void consume(ImportTaskMessage message) {
    log.info("收到导入任务,批次号:{},数据量:{}", 
        message.getBatchNo(), message.getRecords().size());
    try {
      List<ImportRecord> records = message.getRecords();
      int total = records.size();
      // 分批插入
      for (int i = 0; i < total; i += BATCH_SIZE) {
        int end = Math.min(i + BATCH_SIZE, total);
        List<ImportRecord> batch = records.subList(i, end);
        importRecordMapper.batchInsert(batch);
      }
      log.info("导入任务处理完成,批次号:{},共插入{}条", message.getBatchNo(), total);
    } catch (Exception e) {
      log.error("导入任务处理失败,批次号:{}", message.getBatchNo(), e);
      // 抛出异常后,根据 default-requeue-rejected 配置决定是否重新入队
      throw e;
    }
  }
}

4.7 Service 层(校验通过后发消息)

package com.example.service.impl;
import com.example.dto.ImportRecord;
import com.example.dto.ImportTaskMessage;
import com.example.mq.ImportTaskProducer;
import com.example.service.ImportService;
import jakarta.annotation.Resource;
import java.util.List;
import org.springframework.stereotype.Service;
@Service
public class ImportServiceImpl implements ImportService {
  @Resource
  private ImportTaskProducer producer;
  @Override
  public String importData(List<ImportRecord> records, String operatorId) {
    // 同步校验
    for (int i = 0; i < records.size(); i++) {
      if (records.get(i).getAmount() <= 0) {
        throw new RuntimeException("第" + (i + 1) + "行:数量必须大于0");
      }
    }
    // 保存主表...(略)
    String batchNo = "20260610001";
    // 发送MQ消息,异步执行插入
    ImportTaskMessage message = new ImportTaskMessage();
    message.setBatchId(1L);
    message.setBatchNo(batchNo);
    message.setRecords(records);
    producer.send(message);
    return "导入任务已提交,批次号:" + batchNo;
  }
}

4.8 执行时序

请求线程                      RabbitMQ Broker              消费者线程
   │                              │                          │
   │── 同步校验 ──→                │                          │
   │── 保存主表 ──→                │                          │
   │── producer.send(msg) ──→     │                          │
   │                              │── 持久化消息 ──→           │
   │← 返回"任务已提交" ──          │                          │
   │                              │── 推送消息 ──→            │
   │  (HTTP已响应)                 │                          │── 批量INSERT
   │                              │                          │── ...
   │                              │                          │── INSERT完成
   │                              │←── ACK ──                │
   │                              │── 删除消息               │

五、生产环境最佳实践

5.1 消息可靠投递(不丢消息)

Producer ──确认──→ Exchange ──确认──→ Queue ──确认──→ Consumer
   │                  │                │               │
   │ publisher-confirm │ mandatory     │ 持久化        │ 手动ACK
  • Publisher Confirm:Broker 收到消息后回调确认
  • Mandatory:消息无法路由到任何 Queue 时回调
  • Queue 持久化 + 消息持久化:Broker 重启不丢
  • 手动 ACK:Consumer 处理完才确认

6.2 消费幂等性

消息可能被重复投递(网络抖动、Consumer 超时未ACK),消费逻辑必须幂等:

  • 用唯一业务ID做去重判断
  • 使用数据库唯一约束
  • 先查后插的 “SELECT + INSERT” 模式

6.3 死信队列(DLQ)

消费失败的消息不要无限重试,设置死信队列:

正常 Queue ──(消费失败/超时/被拒绝)──→ 死信 Exchange ──→ 死信 Queue

运维人员可以查看死信队列中的消息,分析失败原因后手动处理或重新投递。

6.4 消息体大小

  • RabbitMQ 默认单条消息上限 128MB,但建议控制在 1MB 以内
  • 大量数据(如12万条记录)不要直接放消息体里
  • 推荐做法:消息体只放 batchId,消费者根据 ID 去数据库/文件系统取数据

七、总结

阶段关键动作
申请资源创建 RabbitMQ 实例(云服务或自建)→ 获取连接信息
资源规划设计 Exchange、Queue、Binding 的命名和路由关系
配置接入application.yml 中配置连接信息和消费者参数
代码开发声明资源 → 生产者发送 → 消费者处理
可靠性保障Publisher Confirm + 消息持久化 + 手动ACK + 死信队列
运维监控Management UI 监控队列积压、消费速率、死信数量

到此这篇关于RabbitMQ 从零到实战:概念、配置与 Spring Boot 集成实践指南的文章就介绍到这了,更多相关RabbitMQ集成Spring Boot 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java8中的HashMap原理详解

    java8中的HashMap原理详解

    这篇文章主要介绍了java8中的HashMap原理详解,HashMap是日常开发中非常常用的容器,HashMap实现了Map接口,底层的实现原理是哈希表,HashMap不是一个线程安全的容器,需要的朋友可以参考下
    2023-09-09
  • Java FileInputStream读中文乱码问题解决方案

    Java FileInputStream读中文乱码问题解决方案

    这篇文章主要介绍了Java FileInputStream读中文乱码问题解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • Java8 函数式编程stream流使用详解

    Java8 函数式编程stream流使用详解

    这篇文章主要介绍了Java8 函数式编程stream流使用详解的相关资料,需要的朋友可以参考下
    2023-07-07
  • java计算指定日期为本年第几周的实例

    java计算指定日期为本年第几周的实例

    本文介绍了在编程中按周统计数据时遇到的问题,由于mysql中按所在月的周数周统计比较麻烦,采用所在年的周数作为分组条件,并通过java计算日期属于年的第一周来进行二次计算,提高性能,同时,作者也分享了计算指定日期为本年第几周的方法,并指出了一些常见方法的缺陷
    2025-11-11
  • windows下使用 intellij idea 编译 kafka 源码环境

    windows下使用 intellij idea 编译 kafka 源码环境

    这篇文章主要介绍了使用 intellij idea 编译 kafka 源码的环境,本文是基于windows下做的项目演示,需要的朋友可以参考下
    2021-10-10
  • SpringMVC RESTful支持实现过程演示

    SpringMVC RESTful支持实现过程演示

    这篇文章主要介绍了SpringMVC RESTful支持实现过程演示,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java使用Runnable接口创建线程的示例代码

    Java使用Runnable接口创建线程的示例代码

    在Java中,多线程编程是实现并发操作的重要手段之一,通过多线程,程序可以同时执行多个任务,从而提高应用程序的效率和响应速度,Java提供了多种创建线程的方式,其中实现Runnable接口是最常见且推荐的方式之一,本文将详细介绍如何使用Runnable接口创建线程
    2025-02-02
  • SpringBoot中5种服务可用性保障技术分享

    SpringBoot中5种服务可用性保障技术分享

    服务可用性已成为系统设计的核心关注点,SpringBoot作为Java生态系统中流行的应用开发框架,提供了丰富的工具和库来保障服务的高可用性,本文将介绍SpringBoot中5种关键的服务可用性保障技术,需要的朋友可以参考下
    2025-05-05
  • Java实现读取TXT和CSV文件内容

    Java实现读取TXT和CSV文件内容

    这篇文章主要为大家详细介绍了如何利用Java语言实现读取TXT和CSV文件内容的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-02-02
  • Java圆通物流轨迹推送服务接口文档及流程

    Java圆通物流轨迹推送服务接口文档及流程

    这篇文章主要介绍了圆通物流轨迹推送服务接口Java文档,主要用来接收圆通推送的订单状态,本文给大家分享详细流程,感兴趣的朋友跟随小编一起看看吧
    2022-02-02

最新评论