SpringBoot配置RocketMQ的详细过程

 更新时间:2025年09月18日 17:25:17   作者:弄个昵称  
这篇文章主要介绍了SpringBoot配置RocketMQ的详细过程,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
  1. 引入maven
  		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
  1. 配置yml
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: demo-producer-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      test-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
  1. 配置变量
/**
 * 消息队列相关常亮配置,包括group、topic、tag
 **/
public class MqTopicConstant {
    /**
     * 示例消息队列,topic1个
     */
    public static final String DEMO_TOPIC = "test-top-1";
    /**
     * 注册tag
     */
    public static final String DEMO_TAG_REGISTERED = "registered";
    /**
     * 修改tag
     */
    public static final String DEMO_TAG_MODIFY = "modify";
}
  1. 创建Service
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class RocketMQService {
    private static final Logger log = LoggerFactory.getLogger(RocketMQService.class);
    @Resource
    private RocketMQTemplate template;
    /**
     * 发送普通消息
     *
     * @param topic   topic
     * @param message 消息体
     */
    public void sendMessage(String topic, Object message) {
        this.template.convertAndSend(topic, message);
        log.info("普通消息发送完成:message = {}", message);
    }
    /**
     * 发送同步消息
     *
     * @param topic   topic
     * @param message 消息体
     */
    public void syncSendMessage(String topic, Object message) {
        SendResult sendResult = this.template.syncSend(topic, message);
        log.info("同步发送消息完成:message = {}, sendResult = {}", message, sendResult);
    }
    /**
     * 发送同步消息
     *
     * @param topic   topic
     * @param message 消息体
     */
    public SendResult syncSendMessageR(String topic, Object message) {
        SendResult sendResult = this.template.syncSend(topic, message);
        log.info("同步发送消息完成:message = {}, sendResult = {}", message, sendResult);
        SendStatus sendStatus = sendResult.getSendStatus();
        log.info("状态打印 : {}" , sendStatus);
        //SEND_OK
        return sendResult;
    }
    /**
     * 发送异步消息
     *
     * @param topic   topic
     * @param message 消息体
     */
    public void asyncSendMessage(String topic, final Object message) {
        this.template.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功, SendStatus = {}", sendResult.getSendStatus());
//                log.info("异步消息发送成功,message = {}, SendStatus = {}", message, sendResult.getSendStatus());
            }
            @Override
            public void onException(Throwable e) {
                log.info("异步消息发送异常,exception = {}", e.getMessage());
            }
        });
    }
    /**
     * 发送单向消息
     *
     * @param topic   topic
     * @param message 消息体
     */
    public void sendOneWayMessage(String topic, Object message) {
        this.template.sendOneWay(topic, message);
        log.info("单向发送消息完成:message = {}", message);
    }
    /**
     * 同步发送批量消息
     *
     * @param topic       topic
     * @param messageList 消息集合
     * @param timeout     超时时间(毫秒)
     */
    public void syncSendMessages(String topic, List<Message<?>> messageList, long timeout) {
        this.template.syncSend(topic, messageList, timeout);
        log.info("同步发送批量消息完成:message = {}", JSON.toJSONString(messageList));
    }
    /**
     * 发送携带 tag 的消息(过滤消息)
     *
     * @param topic   topic,RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行
     *                拆分再组装。只要在指定 topic 时跟上 {:tags} 就可以指定tag
     *                例如 test-topic:tagA
     * @param message 消息体
     */
    public void syncSendMessageWithTag(String topic, Object message) {
        this.template.syncSend(topic, message);
        log.info("发送带 tag 的消息完成:message = {}", message);
    }
    /**
     * 同步发送延时消息
     *
     * @param topic      topic
     * @param message    消息体
     * @param timeout    超时
     * @param delayLevel 延时等级:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,
     *                   从1s到2h分别对应着等级 1 到 18,消息消费失败会进入延时消息队列
     *                   "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     */
    public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) {
        this.template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
        log.info("已同步发送延时消息 message = {}", message);
    }
    /**
     * 异步发送延时消息
     *
     * @param topic      topic
     * @param message    消息对象
     * @param timeout    超时时间
     * @param delayLevel 延时等级
     */
    public void asyncSendDelay(String topic, final Object message, long timeout, int delayLevel) {
        this.template.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送延时消息成功,message = {}", message);
            }
            @Override
            public void onException(Throwable throwable) {
                log.error("异步发送延时消息发生异常,exception = {}", throwable.getMessage());
            }
        }, timeout, delayLevel);
        log.info("已异步发送延时消息 message = {}", message);
    }
    /**
     * 发送事务消息
     *
     * @param topic       topic,RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行
     *                	  拆分再组装。只要在指定 topic 时跟上 {:tags} 就可以指定tag
     *                	  例如 test-topic:tagA
     * @param message    消息对象
     * @param arg        传给事务监听器的参数(可以作为事务处理的唯一ID,来验证本地事务)
     */
    public void sendMessageInTransaction(String topic, final Object message ,final Object arg) {
        TransactionSendResult res = this.template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), arg);
        if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
            log.info("【生产者】事物消息发送成功;成功结果:{}", res);
        } else {
            log.info("【生产者】事务发送失败:失败原因:{}", res);
        }
    }
}
  1. 事务监听器
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
   		 // 获取第三个参数(用户自定义参数)
        Integer customArg = (Integer) arg;
        log.info("执行本地事务,自定义参数: {}", customArg);
        String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
        log.info("这里是校验TAG: {}" , tag  );
		//RocketMQLocalTransactionState.COMMIT
		//RocketMQLocalTransactionState.ROLLBACK
		//RocketMQLocalTransactionState.UNKNOWN
        return RocketMQLocalTransactionState.UNKNOWN;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("检查本地交易: {}", message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

状态解释

 - RocketMQLocalTransactionState.COMMIT:
   	含义: 表示本地事务已成功执行,允许提交消息。这意味着消息将对消费者可见,可以被正常消费。
	使用场景: 当您的业务逻辑成功执行且希望该消息能够被下游系统处理时,应返回此状态。
 - RocketMQLocalTransactionState.ROLLBACK:
 	含义: 表示本地事务执行失败,要求回滚消息。即该消息不会被发送给任何消费者。
	使用场景: 如果您的业务逻辑执行过程中遇到错误或异常情况,不希望该消息影响下游系统,则应回滚事务,返回此状态。
 - RocketMQLocalTransactionState.UNKNOWN:
	含义: 表示当前无法确定事务的状态,可能是因为网络问题或其他原因导致暂时无法判断。RocketMQ 会定期调用 checkLocalTransaction 方法来检查事务的状态。
	使用场景: 当您不确定事务是否成功完成时(例如,远程服务调用超时),可以返回此状态。RocketMQ 将通过 checkLocalTransaction 方法尝试再次确认事务状态。
  1. 测试发送消息
@Autowired
    private RocketMQService rocketMQService;
	## 发送事务消息
    rocketMQService.sendMessageInTransaction(GpsConstants.DEDUCTION_MESSP_TOPIC, "这是测试消息");
	## 消费事务消息
   	@Service
    @RocketMQMessageListener(topic = GpsConstants.DEDUCTION_MESSP_TOPIC
            , consumerGroup = GpsConstants.DEDUCTION_MESSP_GROUP)
    public class deductionTopic implements RocketMQListener<String> {
        @Override
        public void onMessage(String msg) {
             System.out.println("msg : " + msg);
			//这里就会打印msg : 这是测试消息
        }
    }

提示 : 同一个topic,不同的consumerGroup都会消费,根据自己的业务指定不同的 consumerGroup 处理不同的业务,如果不需要,则一个topic只能用一次

到此这篇关于SpringBoot配置RocketMQ的详细过程的文章就介绍到这了,更多相关SpringBoot配置RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 系统运维问题排查-java内存过高分析及说明

    系统运维问题排查-java内存过高分析及说明

    本文总结了监控Java进程内存与线程状态的常用命令及参数,包括top排序、jmap查看内存分布、jstat分析GC数据、jstack解析线程状态等,强调需使用与进程一致的用户执行,并解析了线程状态和内存区域的含义
    2025-07-07
  • PostgreSQL Docker部署+SpringBoot集成方式

    PostgreSQL Docker部署+SpringBoot集成方式

    本文介绍了如何在Docker中部署PostgreSQL和pgadmin,并通过SpringBoot集成PostgreSQL,主要步骤包括安装PostgreSQL和pgadmin,配置防火墙,创建数据库和表,以及在SpringBoot中配置数据源和实体类
    2024-12-12
  • java 字符串池的深入理解

    java 字符串池的深入理解

    这篇文章主要介绍了java 字符串池的深入理解的相关资料,这里提供实例代码帮助大家学习理解这部分内容,希望大家能够掌握,需要的朋友可以参考下
    2017-08-08
  • 基于SpringBoot整合SSMP案例(开启日志与分页查询条件查询功能实现)

    基于SpringBoot整合SSMP案例(开启日志与分页查询条件查询功能实现)

    这篇文章主要介绍了基于SpringBoot整合SSMP案例(开启日志与分页查询条件查询功能实现),本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋参考下吧
    2023-11-11
  • JAVA异常体系结构详解

    JAVA异常体系结构详解

    Java把异常当作对象来处理,并定义一个基类java.lang.Throwable作为所有异常的超类,下面通过本文给大家分享JAVA异常体系结构,感兴趣的朋友一起看看吧
    2017-11-11
  • 一次因Java应用造成CPU过高的排查实践过程

    一次因Java应用造成CPU过高的排查实践过程

    一个应用占用CPU很高,除了确实是计算密集型应用之外,通常原因都是出现了死循环。下面这篇文章主要给大家介绍了一次因Java应用造成CPU过高的排查实践过程,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2018-11-11
  • Mybatis如何获取insert新增数据id值

    Mybatis如何获取insert新增数据id值

    这篇文章主要介绍了Mybatis如何获取insert新增数据id值问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • 面向对象和面向过程的区别(动力节点java学院整理)

    面向对象和面向过程的区别(动力节点java学院整理)

    很多朋友不清楚面向对象和面向过程有什么区别,接下来小编给大家整理了关于面向对象和面向过程的区别讲解,感兴趣的朋友可以参考下
    2017-04-04
  • Java异常处理中的一些特殊情况举例

    Java异常处理中的一些特殊情况举例

    这篇文章主要介绍了Java异常处理中的一些特殊情况举例,分别是只用try和finally不用catch,以及finally语句不被执行的情况,需要的朋友可以参考下
    2015-11-11
  • java数字和中文算数验证码的实现

    java数字和中文算数验证码的实现

    这篇文章主要介绍了java数字和中文算数验证码的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07

最新评论