RocketMQ-延迟消息的处理流程介绍
概述
RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
Broker处理延迟消息
CommitLog.putMessage()
//获取消息的sysflag
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//非事务消息 或 已commit事务消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 判断消息是否设置延迟
if (msg.getDelayTimeLevel() > 0) {
//判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//延迟消息的topic为 SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//获取延迟级别,一个延迟级别对应一个Queue
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//消息原始的topic,queueid保存到消息的property中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息
2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息
3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX
5、获取延迟级别,一个延迟级别对应一个Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
启动延迟消息定时任务
ScheduleMessageService.start()

延迟消息投递



以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
JAVAEE model1模型实现商品浏览记录(去除重复的浏览记录)(一)
这篇文章主要为大家详细介绍了JAVAEE model1模型实现商品浏览记录,去除重复的浏览记录,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2016-11-11
鸿蒙HarmonyOS App开发造轮子之自定义圆形图片组件的实例代码
这篇文章主要介绍了鸿蒙HarmonyOS App开发造轮子之自定义圆形图片组件,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2021-01-01
Java Big Number操作BigInteger及BigDecimal类详解
这篇文章主要为大家介绍了Java Big Number操作BigInteger及BigDecimal类详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-07-07
十大常见Java String问题_动力节点Java学院整理
本文介绍Java中关于String最常见的10个问题,需要的朋友参考下吧2017-04-04
SpringMVC 中配置 Swagger 插件的教程(分享)
下面小编就为大家分享一篇SpringMVC 中配置 Swagger 插件的教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2017-12-12


最新评论