RocketMQ-延迟消息的处理流程介绍

 更新时间:2021年07月03日 09:30:01   作者:pigcoffee  
这篇文章主要介绍了RocketMQ-延迟消息的处理流程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

CommitLog.putMessage()

//获取消息的sysflag
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //非事务消息 或 已commit事务消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 判断消息是否设置延迟
            if (msg.getDelayTimeLevel() > 0) {
                //判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //延迟消息的topic为 SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //获取延迟级别,一个延迟级别对应一个Queue
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
                // Backup real topic, queueId
                //消息原始的topic,queueid保存到消息的property中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息

2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息

3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级

4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX

5、获取延迟级别,一个延迟级别对应一个Queue

6、消息原始的topic,queueid保存到消息的property中

7、修改消息的topci、queueid

启动延迟消息定时任务

ScheduleMessageService.start()

延迟消息投递

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • mybatis中${}和#{}取值的区别分析

    mybatis中${}和#{}取值的区别分析

    mybatis中使用sqlMap进行sql查询时,经常需要动态传递参数,在动态SQL解析阶段, #{ } 和 ${ } 会有不同的表现,这篇文章主要给大家介绍了关于mybatis中${}和#{}取值区别的相关资料,需要的朋友可以参考下
    2021-09-09
  • Java synchronized偏向锁的概念与使用

    Java synchronized偏向锁的概念与使用

    因为在我们写的程序当中可能会经常使用到synchronized关键字,因此JVM对synchronized做出了很多优化,而在本篇文章当中我们将仔细介绍JVM对synchronized的偏向锁的细节
    2023-02-02
  • 详解JAVA使用Comparator接口实现自定义排序

    详解JAVA使用Comparator接口实现自定义排序

    这篇文章主要介绍了JAVA使用Comparator接口实现自定义排序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Maven项目修改JDK版本全过程

    Maven项目修改JDK版本全过程

    这篇文章主要介绍了Maven项目修改JDK版本全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • 基于JAVA中Jersey处理Http协议中的Multipart的详解

    基于JAVA中Jersey处理Http协议中的Multipart的详解

    之前在基于C#开发彩信用最原始的StringBuilder拼接字符串方式处理过Multipart。现在在做一个项目的时候,由于之前的技术路线都是使用Jersey处理Http这块,为了保持技术路线一致,研究了一下如何使用Jersey处理Http协议中的Multipart
    2013-05-05
  • 详解如何把Java中if-else代码重构成高质量代码

    详解如何把Java中if-else代码重构成高质量代码

    这篇文章主要介绍了详解如何把Java中if-else代码重构成高质量代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • servlet实现简单的权限管理和敏感词过滤功能

    servlet实现简单的权限管理和敏感词过滤功能

    JavaEE课要求用servlet和过滤器实现权限管理和敏感词过滤功能,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • Java中toString()、String.valueOf、(String)强转区别

    Java中toString()、String.valueOf、(String)强转区别

    相信大家在日常开发中这三种方法用到的应该很多,本文主要介绍了Java中toString()、String.valueOf、(String)强转区别,感兴趣的可以了解一下
    2021-09-09
  • IntelliJ IDEA Run时报“无效的源发行版:16“错误问题及解决方法

    IntelliJ IDEA Run时报“无效的源发行版:16“错误问题及解决方法

    这篇文章主要介绍了IntelliJ IDEA Run时报“无效的源发行版:16“错误问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-05-05
  • 详解maven中profiles使用实现

    详解maven中profiles使用实现

    本文主要介绍了maven中profiles使用实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02

最新评论