springboot 3.x 整合 RocketMQ 5.x的详细过程

 更新时间:2025年12月15日 09:37:01   作者:学亮编程手记  
本文介绍了如何在SpringBoot中使用RocketMQ 5.x客户端,包括依赖配置、参数设置、生产者和消费者的消息发送与接收示例以及服务端环境搭建,感兴趣的朋友跟随小编一起看看吧

RocketMQ 5.x在SpringBoot中的上手使用过程

注意:rocketmq-v5-client-spring-boot-starter对springboot版本有要求,至少2.0.6.RELEASE版本的springboot无法整合。

准备环境

  • JDK 17
  • Spring Boot 3.2.3
  • RocketMQ(服务端) 5.3.1
  • rocketmq-v5-client-spring-boot-starter(客户端) 2.3.1

在 SpringBoot 项目中依赖如下配置:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
    <version>2.3.1</version>
</dependency>

如果还未搭建服务端,可以先看第5节-服务器环境搭建

参数配置

按照 SpringBoot 的约定习俗,在上手一个新的 spring-boot-starter项目时,想要知道怎么使用它,看它的 AutoConfiguration 就对了。

rocketmq-v5-client-spring-boot中,对应的 AutoConfiguration 类为 RocketMQAutoConfiguration,其类定义部分代码如下:

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class,
        ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
  	// ... 省略
    @Bean(PRODUCER_BUILDER_BEAN_NAME)
    @ConditionalOnMissingBean(ProducerBuilderImpl.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"producer.endpoints"})
    public ProducerBuilder producerBuilder(RocketMQProperties rocketMQProperties) {
      // ... 省略
    }
    @Bean(SIMPLE_CONSUMER_BUILDER_BEAN_NAME)
    @ConditionalOnMissingBean(SimpleConsumerBuilder.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"simple-consumer.endpoints"})
    public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQProperties) {
       // ... 省略
    }
    @Bean(destroyMethod = "destroy")
    @Conditional(ProducerOrConsumerPropertyCondition.class)
    @ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
    public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
       // ... 省略
    }
}

可以发现,在rocketmq-v5-client-spring-boot中,根据 RocketMQ 5.x 在架构上做的改进,使用了 endpoints 来替代传统的 namesrvAddr,以支持更灵活的网络拓扑和云原生架构。endpoints 通常指向 RocketMQ 的 Broker 或 Nameserver 地址,用于生产者与 RocketMQ 集群建立连接。endpoints 是一个 URL 或 IP 地址(ip:host)列表(使用;分割)。

⚠️注意:在 RocketMQ 5.x 中,现已默认使用gRPC作为通信协议,entpoints更建议指向 Proxy 地址,一般默认端口为8081。

因此,现在想要启用默认的生产者(ProducerBuilder),只需要配置rocketmq.producer.endpoints即可。

想要启用默认的消费者(SimpleConsumerBuilder),只需要配置rocketmq.simple-consumer.endpoints即可。

RocketMQClientTemplate则是通过判断当前应用上下文是否含有ProducerBuilderSimpleConsumerBuilder Bean对象生成而来。它属于rocketmq-v5-client-spring-boot模块下,也就是说它利用了Spring特性,提供了Spring风格的API,方便开发者通过 Spring 的编程模型来进行消息发送和接收。

既然是原生态的简易使用教程,那么就尽可能在不写多的代码的情况下,实现生产环境中使用MQ。

因此,本次项目就只配置 rocketmq.producer.endpoints 用于启用默认的生产者,消费者使用Push消费模式,所以配置rocketmq.push-consumer.endpoints。配置如下:

rocketmq:
  producer:
    endpoints: localhost:8081
  push-consumer:
  	endpoints: localhost:8081

topic在代码中指定,不使用rocketmq.producer.topicrocketmq.push-consumer.topic配置默认的topic。

tips: 在启动客户端服务时,topic需要先创建,否则会启动报错。

生产者生产消息

生产消息通过SpringBoot自动装配的RocketMQClientTemplate对象实现,发送Message对象,示例代码如下:

@Service
public class MyService {
  @Autowired
  private RocketMQClientTemplate rocketMQClientTemplate;
  public void sendMessage() {
      byte[] bytes = "这是一个字符串".getBytes(StandardCharsets.UTF_8);
      Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
      rocketMQClientTemplate.send("MyTopic", message);
  }
}

⚠️注意:在 RocketMQ 5.x 中,Message对象已从自定义对象改为spring-messaging包中的Message对象。一般通过MessageBuilder构建,实例对象类型为GenericMessage

消费者消费消息

消费者通过@RocketMQMessageListener注解,并实现RocketMQListener接口消费消息,示例代码如下:

@Service
@RocketMQMessageListener(consumerGroup = "MyTopic-service", topic = "MyTopic", tag = "*")
public class MyService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        // 从 MessageView 中获取 ByteBuffer
        ByteBuffer byteBuffer = messageView.getBody();
        // 转换 ByteBuffer 为字节数组
        byte[] body = new byte[byteBuffer.remaining()];
        byteBuffer.get(body);
        // 处理字节数组,例如转换为字符串
        String messageBody = new String(body, StandardCharsets.UTF_8);
        System.out.println("消费消息内容:" + messageBody);
     	 	return ConsumeResult.SUCCESS;
    }
}

服务端环境搭建

下载二进制包

Apache RocketMQ 本地部署 RocketMQ 文档中,可以找到最新的二进制包,位置如下:

如果想保持跟本文相同版本,可以直接点击链接下载RocketMQ 5.3.1版本。

启动NameServer

#### 启动namesrv
$ nohup sh bin/mqnamesrv &
#### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

本地模式启动Broker+Proxy

#### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
#### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log 
The broker[broker-a,192.169.1.2:10911] boot success...

mqbroker脚本默认会读取 conf/broker.conf 配置用于Broker服务。在 conf/rmq-proxy.json 中是Proxy服务的配置,通过 --enable-proxy 命令启动时,需要加上 -pc conf/rmq-proxy.json 参数指定配置文件位置。

broker.conf的监听端口key为listenPort,管理端口key为brokerAdminPort

rmq.proxy.json的gRPC请求端口key为grpcServerPort,传统的消息发送和接收请求的端口key为remotingListenPort

  • 关闭服务
    • 停止Broker:sh bin/mqshutdown broker
    • 停止NameServer:sh bin/mqshutdown namesrv

关于RocketMQ的管理命令可以参考Admin Tool

links:

RocketMQ 5.x在SpringBoot中的上手使用过程

到此这篇关于springboot 3.x 整合 RocketMQ 5.x的详细过程的文章就介绍到这了,更多相关springboot 3.x 整合 RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • servlet之ServletContext简介_动力节点Java学院整理

    servlet之ServletContext简介_动力节点Java学院整理

    这篇文章主要介绍了servlet之ServletContext简介,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Java从零编写吃货联盟订餐系统全程讲解

    Java从零编写吃货联盟订餐系统全程讲解

    这篇文章主要介绍了Java订餐系统,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-12-12
  • java格式化数值成货币格式示例

    java格式化数值成货币格式示例

    这篇文章主要介绍了java格式化数值成货币格式示例,格式化一个数值,比如123456789.123,希望显示成"$123,456,789.123",需要的朋友可以参考下
    2014-04-04
  • Spring Boot实现数据访问计数器方案详解

    Spring Boot实现数据访问计数器方案详解

    在Spring Boot项目中,有时需要数据访问计数器,怎么实现数据访问计数器呢?下面小编给大家带来了Spring Boot数据访问计数器的实现方案,需要的朋友参考下吧
    2021-08-08
  • SpringBoot Jpa 自定义查询实现代码详解

    SpringBoot Jpa 自定义查询实现代码详解

    这篇文章主要介绍了SpringBoot Jpa 自定义查询实现代码详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • IDEA 2021.2 激活教程及启动报错问题解决方法

    IDEA 2021.2 激活教程及启动报错问题解决方法

    这篇文章主要介绍了IDEA 2021.2 启动报错及激活教程,文章开头给大家介绍了idea2021最新激活方法,关于idea2021启动报错的问题小编也给大家介绍的非常详细,需要的朋友可以参考下
    2021-10-10
  • Spring Java-based容器配置详解

    Spring Java-based容器配置详解

    这篇文章主要介绍了Spring Java-based容器配置详解,涉及注解和@Configuration类以及@Beans的相关知识,具有一定参考价值,需要的朋友可以了解。
    2017-10-10
  • Java打包ZIP文件的使用

    Java打包ZIP文件的使用

    本文详细介绍了如何使用Java语言进行ZIP文件的创建、读取和操作,通过java.util.zip包和第三方库,可以高效地处理ZIP文件,并应用密码保护和注释等高级功能,在实际应用中,遵循最佳实践可以提高程序的健壮性和安全性
    2025-02-02
  • SpringMVC配置拦截器实现登录控制的方法

    SpringMVC配置拦截器实现登录控制的方法

    这篇文章主要介绍了SpringMVC配置拦截器实现登录控制的方法,SpringMVC读取Cookie判断用户是否登录,对每一个action都要进行判断,有兴趣的可以了解一下。
    2017-03-03
  • Java详解线上内存暴涨问题定位和解决方案

    Java详解线上内存暴涨问题定位和解决方案

    本篇文章介绍了我在开发过程中遇到的线上内存暴涨的问题,以及定位问题原因和解决该问题的过程及思路,通读本篇对大家的学习或工作具有一定的价值,需要的朋友可以参考下
    2021-10-10

最新评论