关于RocketMQ使用事务消息

 更新时间:2023年05月08日 09:23:35   作者:乐观男孩  
RocketMQ是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大,需要的朋友可以参考下

说明

事务消息:

1、不支持延时消息和批量消息
2、如果消息没有及时提交,默认check 15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果,将会打印异常信息,具体的处理策略可以通过复写AbstractTransactionCheckListener类实现
3、每次check的时间间隔可以通过Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS属性指定
4、事务状态:LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。

原理

事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:

第一阶段:发送半事务消息,消息发送后,消息是对消费者透明的,也就是该消息还不属于可消费消息,消费者无法消费。

第二阶段:执行本地事务,本地执行事务后提交消息。

(1)、如果事务执行失败,则回滚消息;
(2)、如果事务执行成功,则提交消息,提交后消费者可消费到消息;
(3)、如果事务执行成功,但消息提交失败,RocketMQ还提供了回查机制:如果一段时间过后,没有提交/回滚半事务消息,RocketMQ会定时回查一定的次数,获取本地事务的状态以决定是提交还是回滚消息。

如果回查一定的次数后依然没有获取到本地事务的明确状态,则消息会被放到死信队列,由人工确认如何处理。

事务消息处理流程

在这里插入图片描述

1、生产端发送半事务消息到服务端
2、服务端返回半事务消息发送成功响应。注意,此时的消息对消费端是不可见的,不可被消费
3、发送方执行本地事务
4、执行完本地事务后,客户端同步服务端提交/回滚消息
5、如果服务端在一定的时间内,等不到4的回应,则定时进行回查,询问客户端的本地事务状态。
6、客户端检查本地事务状态
7、根据本地事务执行情况,告知服务端,服务端决定是提交消息还是丢弃消息。

生产端

@Test
    public void sendMessage() throws Exception {
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
        producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
        //设置检查本地事务状态的线程池
        //producer.setExecutorService(null);
        //本地事务执行监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
        //发送事务消息
        producer.sendMessageInTransaction(message, null);
    }
    class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            //执行本地事务(数据库)操作......
            int num = new Random().nextInt(10);
            if (num < 3) {
                //本地事务执行成功,提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //本地事务执行失败,删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //等待本地事务check,即执行checkLocalTransaction()方法
            return LocalTransactionState.UNKNOW;
        }
        /**
         * 回查逻辑
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            int num = new Random().nextInt(10);
            if (num < 3) {
                //提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }
    }

发送事务消息步骤:

1、初始化TransactionMQProducer实例
2、指定check线程池(回查线程池)
3、为Producer添加自定义事务监听器。自定义事务监听器需实现TransactionListener接口,通过覆盖接口的executeLocalTransaction方法执行本地事务,返回事务状态,客户端会根据本地事务状态通知服务端,决定是否提交消息;通过覆盖接口的checkLocalTransaction方法提供回查机制,当在一定的时间内服务端获取不到本地事务执行状态,将通过该方法回查事务状态,以决定消失是否需要提交。
4、通过Producer.sendMessageInTransaction发送事务消息。

消费者正常消费逻辑

消费端

@Test
    public void consumeMessage() throws Exception {
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                log.info("消费到消息条数:{}", list.size());
                list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                        .map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
    }

消费端正常消费消息即可。

到此这篇关于关于RocketMQ使用事务消息的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • JDK1.8安装与配置超详细教程

    JDK1.8安装与配置超详细教程

    JDK1.8即为JDK8,JDK8是目前是最成熟最稳定的版本,本文将详细介绍JDK1.8的安装与配置,需要的朋友可以参考下
    2023-03-03
  • Java获取当地的日出日落时间代码分享

    Java获取当地的日出日落时间代码分享

    这篇文章主要介绍了Java获取当地的日出日落时间代码分享,国外猿友写的一个类,需要的朋友可以参考下
    2014-06-06
  • SpringMVC常用注解载入与处理方式详解

    SpringMVC常用注解载入与处理方式详解

    这篇文章主要介绍了SpringMVC常用注解载入的方式和处理的方式,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • 深入了解JAVA泛型

    深入了解JAVA泛型

    这篇文章主要介绍了JAVA泛型的相关知识,文中代码非常详细,供大家参考和学习,感兴趣的朋友可以了解下
    2020-06-06
  • 一文带你探究Spring中Bean的线程安全性问题

    一文带你探究Spring中Bean的线程安全性问题

    很多人都想spring中的bean是线程安全的吗?本文将带你探究Spring中Bean的线程安全性问题,感兴趣的同学可以参考阅读下
    2023-05-05
  • Mybatis SqlSessionFactory与SqlSession详细讲解

    Mybatis SqlSessionFactory与SqlSession详细讲解

    SqlSessionFactory是MyBatis的核心类之一,其最重要的功能就是提供创建MyBatis的核心接口SqlSession,所以我们需要先创建SqlSessionFactory,为此我们需要提供配置文件和相关的参数
    2022-11-11
  • JAVA各种加密与解密方式汇总

    JAVA各种加密与解密方式汇总

    在项目开发中,我们常需要用到加解密算法,本文主要介绍了JAVA各种加密与解密方式汇总,具有一定的参考价值,感兴趣的可以了解一下
    2023-11-11
  • springboot集成nacos无法动态获取nacos配置的问题

    springboot集成nacos无法动态获取nacos配置的问题

    这篇文章主要介绍了springboot集成nacos无法动态获取nacos配置的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Linux下Java开发环境搭建以及第一个HelloWorld

    Linux下Java开发环境搭建以及第一个HelloWorld

    这篇文章主要介绍了Linux下Java开发环境搭建以及第一个HelloWorld的实现过程,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2015-09-09
  • Java数据结构之图的领接矩阵详解

    Java数据结构之图的领接矩阵详解

    图的领接矩阵存储方式是用两个数组来表示图。一个一位数组存储图中顶点信息,一个二维数组存储图中的边或弧的信息。本文将为大家重点介绍一下数据结构中的图的邻接矩阵,快来跟随小编一起学习吧
    2021-11-11

最新评论