关于RocketMQ使用事务消息

 更新时间:2023年05月08日 09:23:35   作者:乐观男孩  
RocketMQ是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大,需要的朋友可以参考下

说明

事务消息:

1、不支持延时消息和批量消息
2、如果消息没有及时提交,默认check 15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果,将会打印异常信息,具体的处理策略可以通过复写AbstractTransactionCheckListener类实现
3、每次check的时间间隔可以通过Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS属性指定
4、事务状态:LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。

原理

事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:

第一阶段:发送半事务消息,消息发送后,消息是对消费者透明的,也就是该消息还不属于可消费消息,消费者无法消费。

第二阶段:执行本地事务,本地执行事务后提交消息。

(1)、如果事务执行失败,则回滚消息;
(2)、如果事务执行成功,则提交消息,提交后消费者可消费到消息;
(3)、如果事务执行成功,但消息提交失败,RocketMQ还提供了回查机制:如果一段时间过后,没有提交/回滚半事务消息,RocketMQ会定时回查一定的次数,获取本地事务的状态以决定是提交还是回滚消息。

如果回查一定的次数后依然没有获取到本地事务的明确状态,则消息会被放到死信队列,由人工确认如何处理。

事务消息处理流程

在这里插入图片描述

1、生产端发送半事务消息到服务端
2、服务端返回半事务消息发送成功响应。注意,此时的消息对消费端是不可见的,不可被消费
3、发送方执行本地事务
4、执行完本地事务后,客户端同步服务端提交/回滚消息
5、如果服务端在一定的时间内,等不到4的回应,则定时进行回查,询问客户端的本地事务状态。
6、客户端检查本地事务状态
7、根据本地事务执行情况,告知服务端,服务端决定是提交消息还是丢弃消息。

生产端

@Test
    public void sendMessage() throws Exception {
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
        producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
        //设置检查本地事务状态的线程池
        //producer.setExecutorService(null);
        //本地事务执行监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
        //发送事务消息
        producer.sendMessageInTransaction(message, null);
    }
    class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            //执行本地事务(数据库)操作......
            int num = new Random().nextInt(10);
            if (num < 3) {
                //本地事务执行成功,提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //本地事务执行失败,删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //等待本地事务check,即执行checkLocalTransaction()方法
            return LocalTransactionState.UNKNOW;
        }
        /**
         * 回查逻辑
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            int num = new Random().nextInt(10);
            if (num < 3) {
                //提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }
    }

发送事务消息步骤:

1、初始化TransactionMQProducer实例
2、指定check线程池(回查线程池)
3、为Producer添加自定义事务监听器。自定义事务监听器需实现TransactionListener接口,通过覆盖接口的executeLocalTransaction方法执行本地事务,返回事务状态,客户端会根据本地事务状态通知服务端,决定是否提交消息;通过覆盖接口的checkLocalTransaction方法提供回查机制,当在一定的时间内服务端获取不到本地事务执行状态,将通过该方法回查事务状态,以决定消失是否需要提交。
4、通过Producer.sendMessageInTransaction发送事务消息。

消费者正常消费逻辑

消费端

@Test
    public void consumeMessage() throws Exception {
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                log.info("消费到消息条数:{}", list.size());
                list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                        .map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
    }

消费端正常消费消息即可。

到此这篇关于关于RocketMQ使用事务消息的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 一文教你掌握Java如何实现判空

    一文教你掌握Java如何实现判空

    实际项目中我们会有很多地方需要判空校验,如果不做判空校验则可能产生NullPointerException异常。所以本文小编为大家整理了Java中几个常见的判空方法,希望对大家有所帮助
    2023-04-04
  • Java链表中元素删除的实现方法详解【只删除一个元素情况】

    Java链表中元素删除的实现方法详解【只删除一个元素情况】

    这篇文章主要介绍了Java链表中元素删除的实现方法,结合实例形式分析了java只删除链表中一个元素的相关操作原理、实现方法与注意事项,需要的朋友可以参考下
    2020-03-03
  • 基于web项目log日志指定输出文件位置配置方法

    基于web项目log日志指定输出文件位置配置方法

    下面小编就为大家分享一篇基于web项目log日志指定输出文件位置配置方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-04-04
  • 详解java中if语句和switch的使用

    详解java中if语句和switch的使用

    这篇文章主要介绍了java中if语句和switch的使用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • 详解Java 包扫描实现和应用(Jar篇)

    详解Java 包扫描实现和应用(Jar篇)

    这篇文章主要介绍了详解Java 包扫描实现和应用(Jar篇),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • 详解如何实现SpringBoot的底层注解

    详解如何实现SpringBoot的底层注解

    今天给大家带来的文章是如何实现SpringBoot的底层注解,文中有非常详细的介绍及代码示例,对正在学习java的小伙伴很有帮助,需要的朋友可以参考下
    2021-06-06
  • 调用java.lang.Runtime.exec的正确姿势分享

    调用java.lang.Runtime.exec的正确姿势分享

    这篇文章主要介绍了调用java.lang.Runtime.exec的正确姿势,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 使用Feign动态设置header和原理分析

    使用Feign动态设置header和原理分析

    这篇文章主要介绍了使用Feign动态设置header和原理分析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Java集合框架之List ArrayList LinkedList使用详解刨析

    Java集合框架之List ArrayList LinkedList使用详解刨析

    早在 Java 2 中之前,Java 就提供了特设类。比如:Dictionary, Vector, Stack, 和 Properties 这些类用来存储和操作对象组。虽然这些类都非常有用,但是它们缺少一个核心的,统一的主题。由于这个原因,使用 Vector 类的方式和使用 Properties 类的方式有着很大不同
    2021-10-10
  • springboot文件上传保存路径的问题

    springboot文件上传保存路径的问题

    这篇文章主要介绍了springboot文件上传保存路径的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09

最新评论