springboot 3.x 整合 RocketMQ 5.x的详细过程
RocketMQ 5.x在SpringBoot中的上手使用过程
注意:rocketmq-v5-client-spring-boot-starter对springboot版本有要求,至少2.0.6.RELEASE版本的springboot无法整合。
准备环境
- JDK 17
- Spring Boot 3.2.3
- RocketMQ(服务端) 5.3.1
- rocketmq-v5-client-spring-boot-starter(客户端) 2.3.1
在 SpringBoot 项目中依赖如下配置:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>如果还未搭建服务端,可以先看第5节-服务器环境搭建。
参数配置
按照 SpringBoot 的约定习俗,在上手一个新的 spring-boot-starter项目时,想要知道怎么使用它,看它的 AutoConfiguration 就对了。
在 rocketmq-v5-client-spring-boot中,对应的 AutoConfiguration 类为 RocketMQAutoConfiguration,其类定义部分代码如下:
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class,
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
// ... 省略
@Bean(PRODUCER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(ProducerBuilderImpl.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"producer.endpoints"})
public ProducerBuilder producerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}
@Bean(SIMPLE_CONSUMER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(SimpleConsumerBuilder.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"simple-consumer.endpoints"})
public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}
@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
// ... 省略
}
}可以发现,在rocketmq-v5-client-spring-boot中,根据 RocketMQ 5.x 在架构上做的改进,使用了 endpoints 来替代传统的 namesrvAddr,以支持更灵活的网络拓扑和云原生架构。endpoints 通常指向 RocketMQ 的 Broker 或 Nameserver 地址,用于生产者与 RocketMQ 集群建立连接。endpoints 是一个 URL 或 IP 地址(ip:host)列表(使用;分割)。
⚠️注意:在 RocketMQ 5.x 中,现已默认使用gRPC作为通信协议,entpoints更建议指向 Proxy 地址,一般默认端口为8081。
因此,现在想要启用默认的生产者(ProducerBuilder),只需要配置rocketmq.producer.endpoints即可。
想要启用默认的消费者(SimpleConsumerBuilder),只需要配置rocketmq.simple-consumer.endpoints即可。
而RocketMQClientTemplate则是通过判断当前应用上下文是否含有ProducerBuilder或SimpleConsumerBuilder Bean对象生成而来。它属于rocketmq-v5-client-spring-boot模块下,也就是说它利用了Spring特性,提供了Spring风格的API,方便开发者通过 Spring 的编程模型来进行消息发送和接收。
既然是原生态的简易使用教程,那么就尽可能在不写多的代码的情况下,实现生产环境中使用MQ。
因此,本次项目就只配置 rocketmq.producer.endpoints 用于启用默认的生产者,消费者使用Push消费模式,所以配置rocketmq.push-consumer.endpoints。配置如下:
rocketmq:
producer:
endpoints: localhost:8081
push-consumer:
endpoints: localhost:8081topic在代码中指定,不使用rocketmq.producer.topic和rocketmq.push-consumer.topic配置默认的topic。
tips: 在启动客户端服务时,topic需要先创建,否则会启动报错。
生产者生产消息
生产消息通过SpringBoot自动装配的RocketMQClientTemplate对象实现,发送Message对象,示例代码如下:
@Service
public class MyService {
@Autowired
private RocketMQClientTemplate rocketMQClientTemplate;
public void sendMessage() {
byte[] bytes = "这是一个字符串".getBytes(StandardCharsets.UTF_8);
Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
rocketMQClientTemplate.send("MyTopic", message);
}
}⚠️注意:在 RocketMQ 5.x 中,
Message对象已从自定义对象改为spring-messaging包中的Message对象。一般通过MessageBuilder构建,实例对象类型为GenericMessage。
消费者消费消息
消费者通过@RocketMQMessageListener注解,并实现RocketMQListener接口消费消息,示例代码如下:
@Service
@RocketMQMessageListener(consumerGroup = "MyTopic-service", topic = "MyTopic", tag = "*")
public class MyService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
// 从 MessageView 中获取 ByteBuffer
ByteBuffer byteBuffer = messageView.getBody();
// 转换 ByteBuffer 为字节数组
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);
// 处理字节数组,例如转换为字符串
String messageBody = new String(body, StandardCharsets.UTF_8);
System.out.println("消费消息内容:" + messageBody);
return ConsumeResult.SUCCESS;
}
}服务端环境搭建
下载二进制包
在 Apache RocketMQ 本地部署 RocketMQ 文档中,可以找到最新的二进制包,位置如下:

如果想保持跟本文相同版本,可以直接点击链接下载RocketMQ 5.3.1版本。
启动NameServer
#### 启动namesrv $ nohup sh bin/mqnamesrv & #### 验证namesrv是否启动成功 $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
本地模式启动Broker+Proxy
#### 先启动broker $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & #### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a $ tail -f ~/logs/rocketmqlogs/proxy.log The broker[broker-a,192.169.1.2:10911] boot success...
mqbroker脚本默认会读取 conf/broker.conf 配置用于Broker服务。在 conf/rmq-proxy.json 中是Proxy服务的配置,通过 --enable-proxy 命令启动时,需要加上 -pc conf/rmq-proxy.json 参数指定配置文件位置。
broker.conf的监听端口key为listenPort,管理端口key为brokerAdminPort。
rmq.proxy.json的gRPC请求端口key为grpcServerPort,传统的消息发送和接收请求的端口key为remotingListenPort。
- 关闭服务
- 停止Broker:
sh bin/mqshutdown broker - 停止NameServer:
sh bin/mqshutdown namesrv
- 停止Broker:
关于RocketMQ的管理命令可以参考Admin Tool。
links:
RocketMQ 5.x在SpringBoot中的上手使用过程
到此这篇关于springboot 3.x 整合 RocketMQ 5.x的详细过程的文章就介绍到这了,更多相关springboot 3.x 整合 RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
servlet之ServletContext简介_动力节点Java学院整理
这篇文章主要介绍了servlet之ServletContext简介,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧2017-07-07


最新评论