Java中RabbitMQ延迟队列实现详解

 更新时间:2023年09月20日 10:13:11   作者:CD4356  
这篇文章主要介绍了Java中RabbitMQ延迟队列实现详解,消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可,需要的朋友可以参考下

一、RabbitMQ延迟队列实现

1.1、RabbitMQ延迟队列实现流程

cd

  1. 生产者生产一条延迟消息,根据延迟时间的不同,利用不同的routing-key将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性 ( TTL ( Time To Live ) 生存时间 ),并绑定到同一个死信交换机中。
  2. 消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。

1.2、配置RabbitMQ连接

#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#用户密码
spring.rabbitmq.password=guest
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/

1.3、创建配置类

配置两个交换机、四个队列、以及根据路由键配置交换机和队列的绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
    //延迟交换机
    public static final String DELAY_EXCHANGE = "delay_exchange";
    //延迟队列A
    public static final String DELAY_QUEUE_A = "delay_queue_a";
    //延迟队列B
    public static final String DELAY_QUEUE_B = "delay_queue_b";
    //延迟路由键10S
    public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key";
    //延迟路由键60S
    public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信队列A
    public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a";
    //死信队列B
    public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b";
    //死信路由键10S
    public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key";
    //死信路由键60S
    public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key";
    //延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE, true, false);
    }
    //延迟队列A
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 10*1000);
        return new Queue(DELAY_QUEUE_A, true, false, false, args);
    }
    //延迟队列B
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 60*1000);
        return new Queue(DELAY_QUEUE_B, true, false, false, args);
    }
    //延迟队列A的绑定关系
    @Bean("delayBindingA")
    public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY);
    }
    //延迟队列B的绑定关系
    @Bean("delayBindingB")
    public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY);
    }
    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }
    //死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A, true);
    }
    //死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B, true);
    }
    //死信队列A的绑定关系
    @Bean("deadLetterBindingA")
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
                                 @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
    }
    //死信队列B的绑定关系
    @Bean("deadLetterBindingB")
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
                                      @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
    }
}

1.4、创建一个枚举类来配置延迟类型

@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
    //10s
    DELAY_10s(1),
    //60s
    DELAY_60s(2);
    private Integer type;
    /**
     * 延迟类型
     * @param type
     * @return 延迟类型
     */
    public static DelayTypeEnum getDelayTypeEnum(Integer type){
        if(Objects.equals(type, DELAY_10s.type)){
            return DELAY_10s;
        }
        if(Objects.equals(type, DELAY_60s.type)){
            return DELAY_60s;
        }
        return null;
    }
}

1.5、创建生产者类发送消息

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY;
/**
 * 延迟消息生产者
 */
@Component
public class DelayMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送延迟消息
     * @param message  要发送的消息
     * @param type  延迟类型(延时10s的延迟队列 或 延时60s的延迟队列)
     */
    public void sendDelayMessage(String message, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message);
                break;
            default:
                break;
        }
    }
}

1.6、创建消费者类消费消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 监听死信队列A
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_a")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_A)
    public void receiveA(Message message) {
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列A收到的消息:{}", LocalDateTime.now(), msg);
    }
    /**
     * 监听死信队列B
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_b")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B)
    public void receiveB(Message message){
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列B收到的消息:{}", LocalDateTime.now(), msg);
    }
}

1.7、创建控制类

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Objects;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private DelayMessageProducer producer;
    @RequestMapping("/send")
    public void send(String message, Integer delayType){
        // 记录日志
        log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType);
        // 发送延迟消息
        producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
    }
}

1.8、测试

在浏览器中先后提交下面两个请求:

1)localhost:8080/rabbitmq/send?message=测试自定义延迟处理60s&delayType=2

2)localhost:8080/rabbitmq/send?message=测试自定义延迟处理10s&delayType=1

查看idea控制台:

cd

到此这篇关于Java中RabbitMQ延迟队列实现详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • RocketMQ延迟消息超详细讲解

    RocketMQ延迟消息超详细讲解

    延时消息是指发送到 RocketMQ 后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息
    2023-02-02
  • Java线程池如何实现精准控制每秒API请求

    Java线程池如何实现精准控制每秒API请求

    这篇文章主要介绍了Java线程池如何实现精准控制每秒API请求问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 深入解析Java编程中final关键字的作用

    深入解析Java编程中final关键字的作用

    final关键字正如其字面意思一样,意味着最后,比如被final修饰后类不能集成、变量不能被再赋值等,以下我们就来深入解析Java编程中final关键字的作用:
    2016-06-06
  • Spring Cloud之服务监控turbine的示例

    Spring Cloud之服务监控turbine的示例

    这篇文章主要介绍了Spring Cloud之服务监控turbine的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05
  • SpringBoot集成easy-rules规则引擎流程详解

    SpringBoot集成easy-rules规则引擎流程详解

    这篇文章主要介绍了SpringBoot集成easy-rules规则引擎流程,合理的使用规则引擎可以极大的减少代码复杂度,提升代码可维护性。业界知名的开源规则引擎有Drools,功能丰富,但也比较庞大
    2023-03-03
  • Spring 静态变量/构造函数注入失败的解决方案

    Spring 静态变量/构造函数注入失败的解决方案

    我们经常会遇到一下问题:Spring对静态变量的注入为空、在构造函数中使用Spring容器中的Bean对象,得到的结果为空。不要担心,本文将为大家介绍如何解决这些问题,跟随小编来看看吧
    2021-11-11
  • java 遍历Map及Map转化为二维数组的实例

    java 遍历Map及Map转化为二维数组的实例

    这篇文章主要介绍了java 遍历Map及Map转化为二维数组的实例的相关资料,希望通过本文能帮助到大家,实现这样的功能,需要的朋友可以参考下
    2017-08-08
  • JavaIO模型中的BIO,NIO和AIO详解

    JavaIO模型中的BIO,NIO和AIO详解

    这篇文章主要为大家详细介绍了JavaIO模型中的BIO,NIO和AIO,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-02-02
  • Echarts+SpringMvc显示后台实时数据

    Echarts+SpringMvc显示后台实时数据

    这篇文章主要为大家详细介绍了Echarts+SpringMvc显示后台实时数据,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-12-12
  • SpringBoot中的条件注解使用示例详解

    SpringBoot中的条件注解使用示例详解

    SpringBoot条件注解用于动态控制Bean创建与配置加载,基于@Conditional机制,支持按类、Bean、属性等条件判断,广泛应用于多数据源等场景,提升应用灵活性与智能化,接下来通过本文给大家讲解SpringBoot中的条件注解使用,感兴趣的朋友一起看看吧
    2025-08-08

最新评论