RocketMQ特性Broker存储事务消息实现

 更新时间:2022年08月17日 14:07:09   作者:奔跑的毛球  
这篇文章主要为大家介绍了RocketMQ特性Broker存储事务消息实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。

TransactionalMessageService类定义如下。内部属性已加注释标明。

public interface TransactionalMessageService {
    //用于保存Half事务消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //删除事务消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事务消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滚事务消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打开事务消息
    boolean open();
    //关闭事务消息
    void close();
}

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。

但是有两点针对事务消息的特殊处理

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { 
    //判断当前Broker配置是否支持事务消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二处:

存储事务消息前的预处理,对应方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //将原消息的topic保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //将原消息的QueueId保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //将原消息的SysFlag保存在扩展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid为0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

以上就是RocketMQ特性Broker存储事务消息实现的详细内容,更多关于RocketMQ Broker存储事务消息的资料请关注脚本之家其它相关文章!

相关文章

  • 详解Spring Boot 部署jar和war的区别

    详解Spring Boot 部署jar和war的区别

    本篇文章主要介绍了详解Spring Boot 部署jar和war的区别,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • 在Filter中不能注入bean的问题及解决

    在Filter中不能注入bean的问题及解决

    这篇文章主要介绍了在Filter中不能注入bean的问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java使用Swing实现一个模拟电脑计算器

    Java使用Swing实现一个模拟电脑计算器

    Java Swing 是一个用于创建 Java GUI(图形用户界面)的框架,它提供了一系列的 GUI 组件和工具,可以用于创建桌面应用程序,包括按钮、文本框、标签、表格等等,本文给大家介绍了Java使用Swing实现一个模拟计算器,感兴趣的同学可以自己动手尝试一下
    2024-05-05
  • 从零开始讲解Java微信公众号消息推送实现

    从零开始讲解Java微信公众号消息推送实现

    微信公众号分为订阅号和服务号,无论有没有认证,订阅号每天都能推送一条消息,也就是每天只能推送一次消息给粉丝,这篇文章主要给大家介绍了关于Java微信公众号消息推送实现的相关资料,需要的朋友可以参考下
    2022-09-09
  • Java中的回调

    Java中的回调

    这篇文章主要介绍了Java中回调的相关资料,帮助大家更好的理解和学习java,感兴趣的朋友可以了解下
    2020-08-08
  • IDEA中实体类(POJO)与JSON快速互转问题

    IDEA中实体类(POJO)与JSON快速互转问题

    这篇文章主要介绍了IDEA中实体类(POJO)与JSON快速互转,本文通过图文实例代码相结合给大家介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • JAVA设计模式----建造者模式详解

    JAVA设计模式----建造者模式详解

    这篇文章主要为大家详细介绍了java实现建造者模式Builder Pattern,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • 关于mybatis一对一查询一对多查询遇到的问题

    关于mybatis一对一查询一对多查询遇到的问题

    这篇文章主要介绍了关于mybatis一对一查询,一对多查询遇到的错误,接下来是对文章进行操作,要求查询全部文章,并关联查询作者,文章标签,本文给大家介绍的非常详细,需要的朋友可以参考下
    2022-05-05
  • 解决后端传long类型数据到前端精度丢失问题

    解决后端传long类型数据到前端精度丢失问题

    这篇文章主要介绍了解决后端传long类型数据到前端精度丢失问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • SpringBoot日志进阶实战之Logback配置经验和方法

    SpringBoot日志进阶实战之Logback配置经验和方法

    本文给大家介绍在SpringBoot中使用Logback配置日志的经验和方法,并提供了详细的代码示例和解释,包括:滚动文件、异步日志记录、动态指定属性、日志级别、配置文件等常用功能,覆盖日常Logback配置开发90%的知识点,感兴趣的朋友跟随小编一起看看吧
    2023-06-06

最新评论