SpringBoot+RocketMQ实现延迟消息的示例代码

 更新时间:2025年10月24日 11:08:29   作者:匆匆忙忙游刃有余  
本文主要介绍了SpringBoot+RocketMQ实现延迟消息案例详解,包括基于延迟级别和基于具体时间两种方式的完整实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

下面将详细介绍如何在SpringBoot中使用RocketMQ实现延迟消息,包括基于延迟级别和基于具体时间两种方式的完整实现。

一、延迟消息概述

RocketMQ提供了两种类型的延迟消息机制:

  1. 延迟消息:消息发送后延迟指定的时间长度再被消费
  2. 定时消息:消息在指定的具体时间点被消费

这两种机制在订单超时取消、会议提醒、定时任务调度等场景中有广泛应用。

二、环境准备

1. 添加Maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件设置

application.yml中配置RocketMQ连接信息:

rocketmq:
  name-server: localhost:9876
  producer:
    group: delay-message-producer-group

三、延迟级别机制实现

1. 默认延迟级别

RocketMQ默认提供18个延迟级别,定义在MessageStoreConfig类中:

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

对应关系:

  • level=1: 延迟1秒
  • level=2: 延迟5秒
  • level=3: 延迟10秒
  • level=4: 延迟30秒
  • level=5: 延迟1分钟
  • level=6: 延迟2分钟
  • ...以此类推
  • level=18: 延迟2小时

2. 基于延迟级别的生产者实现

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class DelayLevelProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送基于延迟级别的消息
     * @param topic 主题
     * @param tag 标签
     * @param message 消息内容
     * @param delayLevel 延迟级别(1-18)
     */
    public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
        // 创建消息
        Message<String> springMessage = MessageBuilder.withPayload(message).build();
        
        // 发送延迟消息
        SendResult sendResult = rocketMQTemplate.syncSend(
            topic + ":" + tag, 
            springMessage, 
            3000, // 超时时间
            delayLevel // 延迟级别
        );
        
        System.out.println("延迟级别消息发送成功: " + sendResult);
    }
    
    /**
     * 发送订单超时取消消息(延迟15分钟)
     */
    public void sendOrderTimeoutMessage(String orderId) {
        String message = "订单超时取消: " + orderId;
        // 15分钟对应level=14(根据默认配置)
        sendMessageByDelayLevel("OrderTopic", "Timeout", message, 14);
    }
}

四、基于具体时间的延迟消息实现

1. 定时消息生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送延迟指定毫秒数的消息
     */
    public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
        // 计算投递时间
        long deliverTimeMs = System.currentTimeMillis() + delayMs;
        
        // 创建消息并设置投递时间
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("延迟毫秒消息发送成功: " + sendResult);
    }
    
    /**
     * 发送指定时间点投递的消息
     */
    public void sendMessageAtTime(String topic, String message, Date deliverTime) {
        long deliverTimeMs = deliverTime.getTime();
        
        // 创建消息并设置投递时间
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("定时投递消息发送成功: " + sendResult);
    }
    
    /**
     * 发送10秒后投递的消息
     */
    public void sendTenSecondsLaterMessage(String topic, String message) {
        sendMessageWithDelayMs(topic, message, 10000L);
    }
}

五、消费者实现

延迟消息的消费者与普通消息消费者相同,无需特殊配置:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(
    topic = "OrderTopic",
    consumerGroup = "delay-message-consumer-group",
    selectorExpression = "Timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println("[" + now + "] 接收到订单超时消息: " + message);
        
        // 处理订单取消逻辑
        processOrderCancellation(message);
    }
    
    private void processOrderCancellation(String message) {
        // 提取订单ID
        String orderId = message.substring(message.indexOf(":") + 2);
        System.out.println("执行订单取消操作,订单ID: " + orderId);
        // 这里可以调用订单服务进行取消操作
    }
}

六、Controller层实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/api/delay")
public class DelayMessageController {
    
    @Autowired
    private DelayLevelProducer delayLevelProducer;
    
    @Autowired
    private ScheduledMessageProducer scheduledMessageProducer;
    
    /**
     * 发送基于延迟级别的消息
     */
    @PostMapping("/level")
    public String sendByDelayLevel(
            @RequestParam String topic,
            @RequestParam String tag,
            @RequestParam String message,
            @RequestParam(defaultValue = "3") int delayLevel) {
        
        delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
        return "延迟级别消息发送成功,延迟级别: " + delayLevel;
    }
    
    /**
     * 发送订单超时取消消息
     */
    @PostMapping("/order/timeout")
    public String sendOrderTimeout(@RequestParam String orderId) {
        delayLevelProducer.sendOrderTimeoutMessage(orderId);
        return "订单超时取消消息已发送,订单ID: " + orderId;
    }
    
    /**
     * 发送延迟指定毫秒的消息
     */
    @PostMapping("/milliseconds")
    public String sendByDelayMs(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam long delayMs) {
        
        scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
        return "延迟毫秒消息发送成功,延迟: " + delayMs + "ms";
    }
    
    /**
     * 发送指定时间点的消息
     */
    @PostMapping("/scheduled")
    public String sendScheduled(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date deliverTime) {
        
        scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
        return "定时消息发送成功,投递时间: " + deliverTime;
    }
}

七、自定义延迟级别配置

在Broker的配置文件中可以自定义延迟级别:

# 在broker.conf文件中添加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h

重启Broker使其生效。注意,修改延迟级别后,所有使用延迟级别的消息都会使用新的配置。

八、两种实现方式对比

特性基于延迟级别基于具体时间
灵活性较低,只能使用预定义级别高,可以精确到毫秒
适用版本全版本支持RocketMQ 5.x及以上版本完整支持
使用场景固定延迟时间的场景需要精确控制投递时间的场景
配置复杂度简单,无需额外配置可能需要在Broker端开启相关功能

九、使用注意事项

  1. 延迟精度

    • 延迟消息的投递时间不是完全精确的,有一定误差
    • 在高并发场景下,误差可能会增大
  2. 版本兼容性

    • 基于具体时间的延迟消息在RocketMQ 5.x版本支持更完善
    • 在低版本中可能需要使用延迟级别机制
  3. 性能考虑

    • 大量延迟消息可能会增加Broker的负担
    • 对于长时间延迟的消息,考虑使用其他方案(如定时任务+消息队列组合)
  4. 消息可靠性

    • 延迟消息同样支持持久化,确保Broker重启后不会丢失
    • 建议开启消息确认机制确保消息可靠投递

十、测试示例

  1. 发送订单超时取消消息(延迟15分钟):

    POST /api/delay/order/timeout?orderId=ORDER123456
    
  2. 发送10秒后投递的消息:

    POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
    
  3. 发送指定时间点的消息:

    POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
    

通过以上配置和代码,您可以在SpringBoot项目中轻松实现基于RocketMQ的延迟消息功能,满足各种定时任务和延迟处理的业务需求。

到此这篇关于SpringBoot+RocketMQ实现延迟消息的示例代码的文章就介绍到这了,更多相关SpringBoot RocketMQ 延迟内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 关于Java 中 Future 的 get 方法超时问题

    关于Java 中 Future 的 get 方法超时问题

    这篇文章主要介绍了Java 中 Future 的 get 方法超时,最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?本文给大家详细介绍,需要的朋友参考下吧
    2022-06-06
  • RabbitMQ工作模式中的发布确认模式示例详解

    RabbitMQ工作模式中的发布确认模式示例详解

    发布确认模式用于确保消息已经被正确地发送到RabbitMQ服务器,并被成功接收和持久化,本文通过实例代码给大家介绍RabbitMQ工作模式之发布确认模式,感兴趣的朋友一起看看吧
    2025-05-05
  • mybatis中返回主键一直为1的问题

    mybatis中返回主键一直为1的问题

    这篇文章主要介绍了mybatis中返回主键一直为1的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • 详解JAVA的封装

    详解JAVA的封装

    Java面向对象的三大特性:封装、继承、多态。下面对三大特性之一封装进行了总结,需要的朋友可以参考下
    2017-04-04
  • 定时任务@Scheduled用法及其参数使用

    定时任务@Scheduled用法及其参数使用

    这篇文章主要介绍了定时任务@Scheduled用法及其参数使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 关于Struts2文件上传与自定义拦截器

    关于Struts2文件上传与自定义拦截器

    本篇文章,小编将为大家介绍关于Struts2文件上传与自定义拦截器,有需要的朋友可以参考一下
    2013-04-04
  • java实现计算器加法小程序(图形化界面)

    java实现计算器加法小程序(图形化界面)

    这篇文章主要介绍了Java实现图形化界面的计算器加法小程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • Java包装类之自动装箱与拆箱

    Java包装类之自动装箱与拆箱

    这篇文章主要介绍了Java包装类之自动装箱与拆箱,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • springboot如何使用MybatisPlus

    springboot如何使用MybatisPlus

    MyBatisPlus是一个强大的数据库操作框架,其代码生成器可以快速生成实体类、映射文件等,本文介绍了如何导入MyBatisPlus相关依赖,创建代码生成器,并配置数据库信息以逆向生成代码,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • MyBatis-Plus 使用枚举自动关联注入

    MyBatis-Plus 使用枚举自动关联注入

    本文主要介绍了MyBatis-Plus 使用枚举自动关联注入,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-06-06

最新评论