Java中RabbitMQ延迟队列实现详解

 更新时间:2023年09月20日 10:13:11   作者:CD4356  
这篇文章主要介绍了Java中RabbitMQ延迟队列实现详解,消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可,需要的朋友可以参考下

一、RabbitMQ延迟队列实现

1.1、RabbitMQ延迟队列实现流程

cd

  1. 生产者生产一条延迟消息,根据延迟时间的不同,利用不同的routing-key将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性 ( TTL ( Time To Live ) 生存时间 ),并绑定到同一个死信交换机中。
  2. 消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。

1.2、配置RabbitMQ连接

#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#用户密码
spring.rabbitmq.password=guest
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/

1.3、创建配置类

配置两个交换机、四个队列、以及根据路由键配置交换机和队列的绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
    //延迟交换机
    public static final String DELAY_EXCHANGE = "delay_exchange";
    //延迟队列A
    public static final String DELAY_QUEUE_A = "delay_queue_a";
    //延迟队列B
    public static final String DELAY_QUEUE_B = "delay_queue_b";
    //延迟路由键10S
    public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key";
    //延迟路由键60S
    public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信队列A
    public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a";
    //死信队列B
    public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b";
    //死信路由键10S
    public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key";
    //死信路由键60S
    public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key";
    //延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE, true, false);
    }
    //延迟队列A
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 10*1000);
        return new Queue(DELAY_QUEUE_A, true, false, false, args);
    }
    //延迟队列B
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 60*1000);
        return new Queue(DELAY_QUEUE_B, true, false, false, args);
    }
    //延迟队列A的绑定关系
    @Bean("delayBindingA")
    public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY);
    }
    //延迟队列B的绑定关系
    @Bean("delayBindingB")
    public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY);
    }
    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }
    //死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A, true);
    }
    //死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B, true);
    }
    //死信队列A的绑定关系
    @Bean("deadLetterBindingA")
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
                                 @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
    }
    //死信队列B的绑定关系
    @Bean("deadLetterBindingB")
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
                                      @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
    }
}

1.4、创建一个枚举类来配置延迟类型

@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
    //10s
    DELAY_10s(1),
    //60s
    DELAY_60s(2);
    private Integer type;
    /**
     * 延迟类型
     * @param type
     * @return 延迟类型
     */
    public static DelayTypeEnum getDelayTypeEnum(Integer type){
        if(Objects.equals(type, DELAY_10s.type)){
            return DELAY_10s;
        }
        if(Objects.equals(type, DELAY_60s.type)){
            return DELAY_60s;
        }
        return null;
    }
}

1.5、创建生产者类发送消息

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY;
/**
 * 延迟消息生产者
 */
@Component
public class DelayMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送延迟消息
     * @param message  要发送的消息
     * @param type  延迟类型(延时10s的延迟队列 或 延时60s的延迟队列)
     */
    public void sendDelayMessage(String message, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message);
                break;
            default:
                break;
        }
    }
}

1.6、创建消费者类消费消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 监听死信队列A
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_a")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_A)
    public void receiveA(Message message) {
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列A收到的消息:{}", LocalDateTime.now(), msg);
    }
    /**
     * 监听死信队列B
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_b")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B)
    public void receiveB(Message message){
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列B收到的消息:{}", LocalDateTime.now(), msg);
    }
}

1.7、创建控制类

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Objects;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private DelayMessageProducer producer;
    @RequestMapping("/send")
    public void send(String message, Integer delayType){
        // 记录日志
        log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType);
        // 发送延迟消息
        producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
    }
}

1.8、测试

在浏览器中先后提交下面两个请求:

1)localhost:8080/rabbitmq/send?message=测试自定义延迟处理60s&delayType=2

2)localhost:8080/rabbitmq/send?message=测试自定义延迟处理10s&delayType=1

查看idea控制台:

cd

到此这篇关于Java中RabbitMQ延迟队列实现详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 老生常谈Scanner的基本用法

    老生常谈Scanner的基本用法

    下面小编就为大家带来一篇老生常谈Scanner的基本用法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Java数据结构 递归之迷宫回溯案例讲解

    Java数据结构 递归之迷宫回溯案例讲解

    这篇文章主要介绍了Java数据结构递归之迷宫回溯案例讲解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • 使用Java获取系统信息的常用代码整理总结

    使用Java获取系统信息的常用代码整理总结

    这篇文章主要介绍了使用Java获取系统信息的常用代码整理总结,在服务器端一般经常能够用到,欢迎收藏,需要的朋友可以参考下
    2015-11-11
  • spring boot 3使用 elasticsearch 提供搜索建议的实例详解

    spring boot 3使用 elasticsearch 提供搜索建议的实例详解

    这篇文章主要介绍了spring boot3使用elasticsearch提供搜索建议,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • Spring boot的上传图片功能实例详解

    Spring boot的上传图片功能实例详解

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。这篇文章主要介绍了Spring boot 上传图片,需要的朋友可以参考下
    2018-03-03
  • springboot+vue+elementsUI实现分角色注册登录界面功能

    springboot+vue+elementsUI实现分角色注册登录界面功能

    这篇文章主要给大家介绍了关于springboot+vue+elementsUI实现分角色注册登录界面功能的相关资料,Spring Boot和Vue.js是两个非常流行的开源框架,可以用来构建Web应用程序,需要的朋友可以参考下
    2023-07-07
  • 聊聊mybatis sql的括号问题

    聊聊mybatis sql的括号问题

    这篇文章主要介绍了mybatis sql的括号问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • 深入理解Java并发编程之ThreadLocal

    深入理解Java并发编程之ThreadLocal

    本文主要介绍了Java并发编程之ThreadLocal,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • 指定jdk启动jar包的方法总结

    指定jdk启动jar包的方法总结

    这篇文章主要给大家总结介绍了关于指定jdk启动jar包的方法,文中通过实例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2023-07-07
  • 解决Lombok使用@Builder无法build父类属性的问题

    解决Lombok使用@Builder无法build父类属性的问题

    这篇文章主要介绍了解决Lombok使用@Builder无法build父类属性的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-09-09

最新评论