spring rocketmq集成方案

 更新时间:2026年03月13日 16:51:05   作者:金坷拉  
本文详细介绍了如何在Spring Boot项目中集成RocketMQ 5.x,包括前置条件、核心依赖、配置、生产者和消费者实现、测试验证以及关键注意事项,感兴趣的朋友跟随小编一起看看吧

在 Spring 项目中集成 RocketMQ 是非常常见的消息队列应用场景,我会以 Spring Boot + RocketMQ 5.x(当前主流版本)为例,提供完整、可直接运行的集成方案,包括生产者、消费者的核心代码和配置说明。

一、前置条件

  1. 已安装并启动 RocketMQ(NameServer + Broker),默认端口:NameServer 9876
  2. Spring Boot 版本建议:2.7.x3.x(兼容 RocketMQ 官方 starter)
  3. 开发环境:JDK 8+

二、核心依赖引入

pom.xml 中添加 RocketMQ 与 Spring Boot 集成的官方 starter:

<!-- Spring Boot 基础依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ Spring Boot Starter(官方推荐) -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version> <!-- 适配 RocketMQ 5.x,兼容 Spring Boot 2/3 -->
</dependency>
<!-- 可选:测试依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

三、核心配置(application.yml)

resources 目录下配置 RocketMQ 连接信息:

spring:
  application:
    name: rocketmq-demo # 应用名称
# RocketMQ 核心配置
rocketmq:
  name-server: 127.0.0.1:9876 # NameServer 地址(集群用逗号分隔)
  producer:
    group: demo-producer-group # 生产者组(必填,标识同一类生产者)
    send-message-timeout: 3000 # 发送超时时间,默认3000ms
    compress-message-body-threshold: 4096 # 消息压缩阈值,默认4096字节
    max-message-size: 4194304 # 最大消息大小,默认4MB
    retry-times-when-send-failed: 2 # 同步发送失败重试次数
    retry-times-when-send-async-failed: 2 # 异步发送失败重试次数

四、生产者实现(3 种发送方式)

1. 基础同步发送(最常用)

适用于需要确认发送结果的场景(如订单创建、库存扣减):

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
    // 注入官方封装的 RocketMQ 模板(类似 RabbitTemplate)
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 同步发送消息(阻塞等待结果)
     * @param topic 消息主题(必填,需提前创建)
     * @param message 消息内容
     * @return 发送结果
     */
    public SendResult sendSyncMessage(String topic, String message) {
        try {
            // 发送格式:"topic:tag"(tag可选,用于消息过滤)
            SendResult sendResult = rocketMQTemplate.syncSend(topic + ":demoTag", message);
            System.out.println("同步发送成功,消息ID:" + sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            System.err.println("同步发送失败:" + e.getMessage());
            // 业务异常处理(如重试、记录日志、告警)
            throw new RuntimeException("消息发送失败", e);
        }
    }
    /**
     * 异步发送消息(非阻塞,回调通知结果)
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendAsyncMessage(String topic, String message) {
        rocketMQTemplate.asyncSend(
            topic + ":demoTag",
            message,
            // 发送成功回调
            sendResult -> System.out.println("异步发送成功,消息ID:" + sendResult.getMsgId()),
            // 发送失败回调
            throwable -> System.err.println("异步发送失败:" + throwable.getMessage())
        );
    }
    /**
     * 单向发送消息(无回调,适用于日志、埋点等不关心结果的场景)
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendOneWayMessage(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic + ":demoTag", message);
        System.out.println("单向消息发送请求已提交");
    }
}

2. 发送自定义对象消息

如果需要发送 Java 对象(而非字符串),只需保证对象可序列化:

// 自定义消息实体(实现 Serializable)
public class OrderMessage implements Serializable {
    private Long orderId;
    private String orderNo;
    private BigDecimal amount;
    // 省略 getter/setter/toString
}
// 生产者中新增方法
public SendResult sendObjectMessage(String topic, OrderMessage orderMessage) {
    return rocketMQTemplate.syncSend(topic + ":orderTag", orderMessage);
}

五、消费者实现(2 种消费模式)

1. 普通消费(默认集群模式)

适用于多实例负载均衡消费(同一组消费者分摊消息):

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
 * RocketMQ 消费者
 * - topic:订阅的主题(需与生产者一致)
 * - consumerGroup:消费者组(必填,同一组消费同一主题)
 * - messageModel:消费模式(CLUSTERING 集群模式,BROADCASTING 广播模式)
 * - consumeMode:消费方式(CONCURRENTLY 并发消费,ORDERLY 顺序消费)
 */
@Component
@RocketMQMessageListener(
    topic = "demo_topic", // 订阅主题
    consumerGroup = "demo-consumer-group", // 消费者组
    messageModel = MessageModel.CLUSTERING, // 集群模式(默认)
    consumeMode = ConsumeMode.CONCURRENTLY // 并发消费(默认)
)
public class RocketMQConsumer implements RocketMQListener<String> {
    /**
     * 消息消费逻辑(接收到消息时触发)
     * @param message 消息内容(与生产者发送类型一致)
     */
    @Override
    public void onMessage(String message) {
        try {
            // 核心业务逻辑:如解析消息、处理订单、更新库存等
            System.out.println("接收到消息:" + message);
            // 消费成功无需返回,抛出异常则会触发重试
        } catch (Exception e) {
            System.err.println("消息消费失败:" + e.getMessage());
            // 异常抛出后,RocketMQ 会自动重试(默认最多16次)
            throw new RuntimeException("消费失败", e);
        }
    }
}

2. 消费自定义对象消息

如果生产者发送的是自定义对象,消费者需指定泛型为对应类型:

@Component
@RocketMQMessageListener(
    topic = "demo_topic",
    consumerGroup = "order-consumer-group"
)
public class OrderMessageConsumer implements RocketMQListener<OrderMessage> {
    @Override
    public void onMessage(OrderMessage orderMessage) {
        System.out.println("接收到订单消息:" + orderMessage);
        // 处理订单业务逻辑
    }
}

六、测试验证

编写测试类,验证生产者发送、消费者接收是否正常:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RocketMQDemoTest {
    @Autowired
    private RocketMQProducer rocketMQProducer;
    @Test
    public void testSendSyncMessage() {
        // 发送消息到 demo_topic 主题
        rocketMQProducer.sendSyncMessage("demo_topic", "Hello RocketMQ + Spring Boot!");
        // 暂停3秒,确保消费者能接收到消息
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

七、关键注意事项

  1. 主题 / 组命名规范:避免特殊字符,建议用 业务_模块_主题 格式(如 order_pay_topic)。
  2. 重试机制:消费失败默认重试 16 次,可通过 maxReconsumeTimes 配置重试次数。
  3. 消息持久化:RocketMQ 默认持久化消息,即使消费者宕机,重启后仍能消费未处理的消息。
  4. 顺序消费:如需保证消息顺序,需将 consumeMode 设为 ORDERLY,且生产者发送时指定同一消息队列。
  5. 异常处理:生产环境建议对接告警(如钉钉、短信),避免消费失败无感知。

总结

  1. Spring Boot 集成 RocketMQ 的核心是引入官方 rocketmq-spring-boot-starter,配置 NameServer 地址和生产 / 消费组。
  2. 生产者通过 RocketMQTemplate 实现同步 / 异步 / 单向发送,支持字符串和自定义对象消息。
  3. 消费者通过 @RocketMQMessageListener 注解声明订阅关系,实现 RocketMQListener 接口处理消息逻辑,默认集群模式并发消费。

核心关键点:主题与消费组必须配置正确,消费失败抛出异常会触发自动重试,生产环境需做好异常监控和重试次数限制。

到此这篇关于spring rocketmq集成的文章就介绍到这了,更多相关spring rocketmq集成内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决跨域请求,NG返回403(403并不一定是NG问题)

    解决跨域请求,NG返回403(403并不一定是NG问题)

    这篇文章主要介绍了解决跨域请求,NG返回403(403并不一定是NG问题),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Java中ReUtil正则表达式工具库的使用

    Java中ReUtil正则表达式工具库的使用

    ReUtil是Hutool库中的正则表达式工具类,提供了多种常用正则表达式操作方法,下面就来介绍一下ReUtil的使用,具有一定的参考价值,感兴趣的可以了解一下
    2025-02-02
  • Java算法实现杨辉三角的讲解

    Java算法实现杨辉三角的讲解

    今天小编就为大家分享一篇关于Java算法实现杨辉三角的讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • java获取和设置系统变量问题(环境变量)

    java获取和设置系统变量问题(环境变量)

    这篇文章主要介绍了java获取和设置系统变量问题(环境变量),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • java中UDP简单聊天程序实例代码

    java中UDP简单聊天程序实例代码

    这篇文章主要介绍了java中UDP简单聊天程序实例代码,有需要的朋友可以参考一下
    2013-12-12
  • Springboot之restTemplate的配置及使用方式

    Springboot之restTemplate的配置及使用方式

    这篇文章主要介绍了Springboot之restTemplate的配置及使用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • Springboot之整合Socket连接案例

    Springboot之整合Socket连接案例

    这篇文章主要介绍了Springboot之整合Socket连接案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • Java时间工具类Date的常用处理方法

    Java时间工具类Date的常用处理方法

    在Java中获取当前时间,可以使用 java.util.Date 类和 java.util.Calendar 类完成。其中,Date 类主要封装了系统的日期和时间的信息,下面将详细介绍Date类的常用处理方法,需要的可以参考一下
    2022-05-05
  • Java中char[] 和 String 类型占用字节大小问题

    Java中char[] 和 String 类型占用字节大小问题

    这篇文章主要介绍了Java中char[] 和 String 类型占用字节大小问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • java 枚举enum的用法(与在switch中的用法)

    java 枚举enum的用法(与在switch中的用法)

    这篇文章主要介绍了java 枚举enum的用法(与在switch中的用法),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10

最新评论