RocketMQ 延时级别配置方式
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。
其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
如何配置:
在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
这个配置项配置了从1级开始各级延时的时间,如1表示延时1s,2表示延时5s,14表示延时10m,可以修改这个指定级别的延时时间;
时间单位支持:s、m、h、d,分别表示秒、分、时、天;
默认值就是上面声明的,可手工调整;
默认值已经够用,不建议调整【仅供参考,还是根据实际需要调整。调整默认值时注意同时要修改时间对应的level级别的值】
如何发送延时消息:
发送延时消息只需要在客户端(rocketmq-client端)待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可。
Message msg = new Message(topicName,"",keys,message.getBytes()); msg.setDelayTimeLevel(delayLevel); SendResult sendResult = getMQProducer.send(msg);
RocketMQ定时(延迟)消息
RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。
预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
延时消息的使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
生产
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class ProducerDelay { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.10.11:9876"); producer.start(); Message msg1 = new Message( JmsConfig.TOPIC, "订单001".getBytes()); msg1.setDelayTimeLevel(2);//延迟5秒 Message msg2 = new Message( JmsConfig.TOPIC, "订单001".getBytes()); msg2.setDelayTimeLevel(4);//延迟30秒 SendResult sendResult1 = producer.send(msg1); SendResult sendResult2 = producer.send(msg2); System.out.println("Product1-同步发送-Product信息={}" + sendResult1); System.out.println("Product2-同步发送-Product信息={}" + sendResult2); producer.shutdown(); } }
消费
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ConsumerDelay { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer的地址 consumer.setNamesrvAddr("192.168.10.11:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
spring boot定时任务接收邮件并且存储附件的方法讲解
今天小编就为大家分享一篇关于spring boot定时任务接收邮件并且存储附件的方法讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧2019-03-03解决SpringBoot使用@Value获取不到yaml中配置值的问题
在最近的开发中遇到一个问题,使用@Value获取yml文件中配置的属性时始终获取不到值,所以本文给大家详细介绍了SpringBoot使用@Value获取不到yaml中值的问题分析及解决方法,需要的朋友可以参考下2024-01-01java8 对象转Map时重复 key Duplicate key xxxx的解决
这篇文章主要介绍了java8 对象转Map时重复 key Duplicate key xxxx的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09Java如何获取JSONObject内指定字段key的value值
这篇文章主要介绍了Java如何获取JSONObject内指定字段key的value值问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2023-12-12
最新评论