关于Rabbitmq死信队列及延时队列的实现

 更新时间:2023年08月17日 10:43:29   作者:信仰_273993243  
这篇文章主要介绍了关于Rabbitmq死信队列及延时队列的实现,TTL就是消息或者队列的过期功能,当消息过期就会进到死信队列,死信队列和普通队列没啥区别,然后我们只需要配置一个消费者来消费死信队列里面的消息就可以了,需要的朋友可以参考下

什么是延迟队列

我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。

但RabbitMq中并没有提供延迟队列功能

那么RabbitMQ如何实现延迟队列

通过:死信队列 + RabbitMQ的TTL特性实现。

实现原理

给一个普通带有过期功能的队列绑定一个死信队列,消息先进延时队列,过期了后消息进入死信队列,死信队列的消息会转发到对应的queue里面,我们只需要消费死信的queue里面的消息就可以了。

一、TTL特性说明

TTL就是消息或者队列的过期功能。

当消息过期就会进到死信队列,死信队列和普通队列没啥区别,然后我们只需要配置一个消费者来消费死信队列里面的消息就可以了。

如果不设置x-message-ttl,则表示消息不会过期如果x-message-ttl设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
如果x-message-ttl设置不为0,表明消息或队列中所有消息的最大存活时间(过期时间),单位是毫秒。

需要注意: RabbitMQ只会对队列头部的消息进行过期淘汰,消息是否过期是在即将投递消息到消费者之前判定的,如果队列出现消息堆积情况,则已过期的消息还是会继续存活的

比如过期时间设置在消息上,由于消息进队列是先进先出,假设先进去的消息过期时间长,后进的消息过期时间短,先进来的消息还没过期,后面进来的消息就算到了过期时间,消息也不会过期也就不会从队列中剔除,从而导致队列里面的消息积压。

二、死信队列

简单理解就是要被丢弃的消息才会放到死信队列。

1、消息什么时候变为死信

1、消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue重回队列属性设为false

2、消息在队列里的时间超过了该消息设置的过期时间(TTL)

3、消息队列到达了它的最大长度,之后再收到的消息。

2、死信队列的原理

当一个消息在队列里变为死信,它会被重新publish到绑定的死信队列所对应的exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个死信交换机所绑定队列就可以了。

下面看个案例: 正常情况,我们配置交换机-A,会为其绑定路由键-A,和路由键-A所映射的队列-A,并设置这个队列-A的死信交换机-B、死信路由键-B。

消息消费者-A,监听队列-A,基本上就没死信交换机什么事了,特殊时候队列满了,或者接收后给的回执是false才会进到死信交换机-B。

现在我们把消费者-A删了,并且设置队列-A里面消息过期时间,那么所有进到队列A里面消息就自然过期,然后被推给死信交换机,那么监听死信交换机-B的消息者-B,就可以得到过期后的消息了,就实现了延时功能。

3、死信交换机

死信交换机DLX实际上也是普通的交换机,说白的就是将死信(过期消息)路由到死信队列的交换机。

4、死信队列

死信队列DLQ(Dead Letter Queue)实际上也是普通的队列,只不过它存储的是死信交换机路由过来的死信(过期消息)

三、代码

1、给队列绑定死信队列

Map<String, Object> args = new HashMap<>(2);
args.put(x-dead-letter-exchange,死信队列交换机)
args.put(x-dead-letter-routing-key,死信routeKey)
args.put(x-message-ttl,队列里面的消息最大存活时间) 
//申明一个普通队列,并且给这个普通队里绑定一个死信队列。当消息过期就会转到死信队列里面去。
Queue queue = QueueBuilder.durable(queue).withArguments(args).build();

2、设置过期时间

目前有两种方式可以设置消息的TTL。

第一种是通过队列的属性设置,队列中的所有消息都有相同的过期时间。

第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。

如果两种方法同时设置,则TTL以两者之间较小的那个数值为准。

消息在队列中的生存时间一旦超过设置的TTL值时就会进到死信。

1、通过队列属性设置消息过期时间

Map<String, Object> arguments = Maps.newHashMap();
//设置消息发送到队列中在被丢弃之前可以存活的时间,单位:毫秒
arguments.put("x-message-ttl", 5000);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

2、发送消息的时候,对消息本身设定过期时间

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setExpiration("5000");//设置消息多久没有被消费在过期。
        message.getMessageProperties().setContentEncoding("UTF-8");
        return message;
    }
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);

上面底层代码是:

AMQP.BasicProperties.Builder properties = MessageProperties.PERSISTENT_TEXT_PLAIN.builder();
properties.expiration("5000");//设置消息多久没有被消费在过期。
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties.build(), message.getBytes());

到此这篇关于关于Rabbitmq死信队列及延时队列的实现的文章就介绍到这了,更多相关Rabbitmq死信队列延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Mybatis-plus通过添加拦截器实现简单数据权限

    Mybatis-plus通过添加拦截器实现简单数据权限

    系统需要根据用户所属的公司,来做一下数据权限控制,具体一点,就是通过表中的company_id进行权限控制,项目使用的是mybatis-plus,所以通过添加拦截器的方式,修改查询sql,实现数据权限,本文就通过代码给大家详细的讲解一下,需要的朋友可以参考下
    2023-08-08
  • Logger.error打印错误异常的详细堆栈信息

    Logger.error打印错误异常的详细堆栈信息

    这篇文章主要介绍了Logger.error打印错误异常的详细堆栈信息,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-02-02
  • Java实现将容器 Map中的内容保存到数组

    Java实现将容器 Map中的内容保存到数组

    这篇文章主要介绍了Java实现将容器 Map中的内容保存到数组,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Springmvc Controller接口代码示例

    Springmvc Controller接口代码示例

    这篇文章主要介绍了Springmvc Controller接口代码示例,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • Java正则表达式matcher.group()用法代码

    Java正则表达式matcher.group()用法代码

    这篇文章主要给大家介绍了关于Java正则表达式matcher.group()用法的相关资料,最近在做一个项目,需要使用matcher.group()方法匹配出需要的内容,文中给出了详细的代码示例,需要的朋友可以参考下
    2023-08-08
  • java通过MySQL驱动拦截器实现执行sql耗时计算

    java通过MySQL驱动拦截器实现执行sql耗时计算

    本文主要介绍了java通过MySQL驱动拦截器实现执行sql耗时计算,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-03-03
  • JDBC数据库连接过程及驱动加载与设计模式详解

    JDBC数据库连接过程及驱动加载与设计模式详解

    这篇文章主要介绍了JDBC数据库连接过程及驱动加载与设计模式详解,需要的朋友可以参考下
    2016-10-10
  • Java中BeanUtils.copyProperties的11个坑总结

    Java中BeanUtils.copyProperties的11个坑总结

    我们日常开发中,经常涉及到DO、DTO、VO对象属性拷贝赋值,很容易想到org.springframework.beans.BeanUtils的copyProperties,它会自动通过反射机制获取源对象和目标对象的属性,pyProperties,会有好几个坑呢,本文将给大家总结一下遇到的坑,需要的朋友可以参考下
    2023-05-05
  • Springboot敏感字段脱敏的实现思路

    Springboot敏感字段脱敏的实现思路

    这篇文章主要介绍了Springboot敏感字段脱敏的实现思路,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-09-09
  • 全网最全Mybatis-Plus详解

    全网最全Mybatis-Plus详解

    Mybatis-Plus是一个Mybatis(opens new window)的增强工具,在Mybatis的基础上只做增强不做改变,为简化开发,这篇文章主要介绍了全网最全Mybatis-Plus详解,需要的朋友可以参考下
    2024-05-05

最新评论