RocketMQ特性Broker存储事务消息实现

 更新时间:2022年08月17日 14:07:09   作者:奔跑的毛球  
这篇文章主要为大家介绍了RocketMQ特性Broker存储事务消息实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。

TransactionalMessageService类定义如下。内部属性已加注释标明。

public interface TransactionalMessageService {
    //用于保存Half事务消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //删除事务消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事务消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滚事务消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打开事务消息
    boolean open();
    //关闭事务消息
    void close();
}

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。

但是有两点针对事务消息的特殊处理

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { 
    //判断当前Broker配置是否支持事务消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二处:

存储事务消息前的预处理,对应方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //将原消息的topic保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //将原消息的QueueId保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //将原消息的SysFlag保存在扩展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid为0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

以上就是RocketMQ特性Broker存储事务消息实现的详细内容,更多关于RocketMQ Broker存储事务消息的资料请关注脚本之家其它相关文章!

相关文章

  • java简单工厂模式实例及讲解

    java简单工厂模式实例及讲解

    这篇文章主要为大家详细介绍了java简单工厂模式实例,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • Java微信公众平台之素材管理

    Java微信公众平台之素材管理

    这篇文章主要为大家详细介绍了Java微信公众平台之素材管理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-05-05
  • SpringBoot3中数据库集成实践详解

    SpringBoot3中数据库集成实践详解

    项目工程中,集成数据库实现对数据的增晒改查管理,是最基础的能力,所以下面小编就来和大家讲讲SpringBoot3如何实现数据库集成,需要的可以参考下
    2023-08-08
  • maven <repositories>标签和<pluginRepositories>标签的使用

    maven <repositories>标签和<pluginRepositories>标签的使用

    这篇文章主要介绍了maven <repositories>标签和<pluginRepositories>标签的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • 基于mybatis注解动态sql中foreach工具的方法

    基于mybatis注解动态sql中foreach工具的方法

    这篇文章主要介绍了mybatis注解动态sql中foreach工具方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java开发实例之图书管理系统的实现

    Java开发实例之图书管理系统的实现

    图书管理的功能大体包括:增加书籍、借阅书籍、删除书籍、查看书籍列表、退出系统、查找书籍、返还书籍这些,本文主要给大家介绍该系统的数据库语句,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-10-10
  • 浅谈缓冲字符流 BufferedReader BufferedWriter用法

    浅谈缓冲字符流 BufferedReader BufferedWriter用法

    这篇文章主要介绍了缓冲字符流 BufferedReader BufferedWriter的用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Layui前后台交互数据获取java实例

    Layui前后台交互数据获取java实例

    下面小编就为大家分享一篇Layui前后台交互数据获取java实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • Spring Boot 整合 Thymeleaf 实例分享

    Spring Boot 整合 Thymeleaf 实例分享

    这篇文章主要分享了Spring Boot整合Thymeleaf,Thymeleaf是新一代的Java模板引擎,类似于Velocity、FreeMarker等传统引擎,关于其更多相关内容,需要的小伙伴可以参考一下
    2022-05-05
  • Java e.printStackTrace()案例讲解

    Java e.printStackTrace()案例讲解

    这篇文章主要介绍了Java e.printStackTrace()案例讲解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08

最新评论