springboot集成RocketMQ过程及使用示例详解
1、 说明
Springboot集成RocketMQ时需要特别注意版本问题,否则会出现各类启动报错问题,这里使用的springboot 版本:2.2.7.RELEASE, RocketMq版本:2.2.3
2、集成过程
pom文件引用
这里网上有单独在引用rocketmq-client依赖包的,是不需要的,已经包含在rocketmq-spring-boot-starter依赖包中。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.7.RELEASE</version>
</parent>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>yml文件配置
spring:
# 数据库信息
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/longfamily?useUnicode=true&characterEncoding=utf-8&userSSL=false
username: root
password: 123456
##RocketMq配置
rocketmq:
#nameservice服务器地址(多个以英文逗号隔开)
# 注意高版本下使用的是name-server,低版本使用的是 nameServer。
name-server: 10.0.164.31:9876
producer:
group: test这里把spring配置也贴出来,是想说明rocketMq与spring在yaml文件中是同一级别(容易当成spring节点下面的子节点)。否则会启动报错。
消费者
@Component
@RocketMQMessageListener(consumerGroup = "H", topic = RocketMQTopic.TOPIC_ORDER)
public class TestMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者收到信息:==="+message);
}
}消费者是通过RocketMQMessageListener来监听Topc, 通过实现RocketMQListener <T>接口实现其onMessage(T t)方法来处理接收到的消息
生产者
这里同意封装了RocketMQ发送消息的工具类。
@Slf4j
@Component
public class RocketMQUtils {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 异步发送MQ消息
*/
public void sendMessage(Long id, final String topic, final String context, Boolean isExternal) {
try {
log.debug("Sending message to MQ topic {}, context {}", topic, context);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//打印msgId用来以备查验,外部消息发送mq成功则任务置为成功
log.info("Success sending message to MQ: {}, context: {}, msgId: {}", topic, context, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("Failed to send message to MQ {}, msg {}, cause {}", topic, context, Throwables.getStackTraceAsString(e));
}
};
rocketMQTemplate.asyncSend(topic, context, callback);
} catch (Exception e) {
log.error("Failed to send message to MQ! message: {}, stackTrace: {}", context, Throwables.getStackTraceAsString(e));
}
}
}测试发送消息接口
@RestController
@RequestMapping("/api/test")
public class CommonTestController {
@Autowired
private RocketMQUtils rocketMQUtils;
// mq
@PostMapping("/mq/send")
public void operateRocketMQ(){
rocketMQUtils.sendMessage(1L, RocketMQTopic.TOPIC_ORDER,"firstContext",true);
}运行结果
[2023-02-27 10:17:30.741] [DEBUG] [T:http-nio-8061-exec-3][tid=][Class:c.g.c.u.RocketMQUtils -> sendMessage]|Sending message to MQ topic topic-order, context firstContext
[2023-02-27 10:17:30.760] [INFO] [T:NettyClientPublicExecutor_2][tid=][Class:c.g.c.u.RocketMQUtils -> onSuccess]|Success sending message to MQ: topic-order, context: firstContext, msgId: 0000010169EE18B4AAC2881AB17B0001
消费者收到信息:===firstContext
以上就是springboot集成RocketMQ过程及使用示例详解的详细内容,更多关于springboot集成RocketMQ的资料请关注脚本之家其它相关文章!
相关文章
Mybatis中ResultMap解决属性名和数据库字段名不一致问题
我们Pojo类的属性名和数据库中的字段名不一致的现象时有发生,本文就详细的介绍一下Mybatis中ResultMap解决属性名和数据库字段名不一致问题,感兴趣的可以了解一下2021-10-10
springboot实现以代码的方式配置sharding-jdbc水平分表
这篇文章主要介绍了springboot实现以代码的方式配置sharding-jdbc水平分表,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-11-11
Spring框架中ImportBeanDefinitionRegistrar的应用详解
这篇文章主要介绍了Spring框架中ImportBeanDefinitionRegistrar的应用详解,如果实现了ImportSelector接口,在配置类中被@Import加入到Spring容器中以后,Spring容器就会把ImportSelector接口方法返回的字符串数组中的类new出来对象然后放到工厂中去,需要的朋友可以参考下2024-01-01
SpringBoot快速接入DeepSeek api(带页面)保姆级教程
这篇文章主要介绍了如何在Java端接入DeepSeek API,包括申请APIkey、项目结构展示、编写controller和前端界面、以及测试启动项目的过程,文中通过代码介绍的非常详细,需要的朋友可以参考下2025-03-03


最新评论