RabbitMQ死信机制实现延迟队列的实战

 更新时间:2021年11月11日 11:00:50   作者:wxd_1024  
本文主要介绍了RabbitMQ死信机制实现延迟队列的实战,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

应用场景

三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果。实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列没有设置消费者,过了有效期后消息会重新路由到指定的的队列,有消费者去执行订单查询。

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL)   和   Dead Letter Exchanges(DLX)

Time To Live(TTL)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(死信)RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

A: 通过队列属性设置,队列中所有消息都有相同的过期时间。

<!-- 将消息放入此队列里,此队列设置过期时间,不制造消费者让其过期,过期后变成死信,消息会放入指定的新队列里,实现消息的延迟消费 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
           <value type="java.lang.Long">${qrcode.expire.icbc}</value>
        </entry>
        <!--消息过期根据重新路由 -->
        <entry key="x-dead-letter-exchange" value="directExchange"/>
        <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
    </rabbit:queue-arguments>
</rabbit:queue>

B: 对消息进行单独设置,每条消息TTL可以不同。

<!-- 将消息放入此队列里,次队列设置过期时间,不制造消费者让其过期,过期后变成死信,消息会放入指定的新队列里,实现消息的延迟消费 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
    <rabbit:queue-arguments>
        <!--消息过期根据重新路由 -->
        <entry key="x-dead-letter-exchange" value="directExchange"/>
        <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
    </rabbit:queue-arguments>
</rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer;
 
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
 
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
	private final Long ttl;
 
	public ExpirationMessagePostProcessor(Long ttl) {
		this.ttl = ttl;
	}
 
	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
		message.getMessageProperties().setExpiration(ttl.toString());
		return message;
	}
}

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

 Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:指定routing-key发送
队列出现dead letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

注意一:ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。否则会报错。

注意二:消费者中,抛异常了没处理,会一直重复消费

注意三:下面的代码我模拟了1-10号消息,消息的内容里面是1-10。过期的时间是10-1秒。这里要注意,虽然10是第一个发送,但是它过期的时间最长。

过了10s以后,消费者开始收到数据,但是它是一次性收到如下结果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一个收到的还是10。10是第一个放进队列,但是它的过期时间最长。所以由此可见,即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。(也可以考虑缓存队列,DelayQueue实现定时延迟执行任务,但是也有缺点:就是项目重启缓存里的数据就会丢失,DelayQueue的使用详见其他博文)

for(int i = 10; i>0; i-- ){
	amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
}

到此这篇关于RabbitMQ死信机制实现延迟队列的实战的文章就介绍到这了,更多相关RabbitMQ 延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot 对接飞书多维表格事件回调监听流程分析

    SpringBoot 对接飞书多维表格事件回调监听流程分析

    本文介绍了如何通过飞书事件订阅机制和SpringBoot项目集成,对多维表数据的记录变更进行对接的详细流程,包括如何创建应用、配置参数、编写订阅代码、订阅文档事件以及在SpringBoot工程中集成的步骤,感兴趣的朋友跟随小编一起看看吧
    2024-12-12
  • Maven项目更换本地仓库过程图解

    Maven项目更换本地仓库过程图解

    这篇文章主要介绍了Maven项目更换本地仓库过程图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • 关于MyBatis中Mapper XML热加载优化

    关于MyBatis中Mapper XML热加载优化

    大家好,本篇文章主要讲的是关于MyBatis中Mapper XML热加载优化,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-01-01
  • Spring框架基于注解开发CRUD详解

    Spring框架基于注解开发CRUD详解

    这篇文章主要为大家详细介绍了Spring框架基于注解开发CRUD,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-08-08
  • MyBatis反向生成Example类的使用方式

    MyBatis反向生成Example类的使用方式

    今天小编就为大家分享一篇MyBatis反向生成Example类的使用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • Java项目防止SQL注入的几种方法总结

    Java项目防止SQL注入的几种方法总结

    SQL注入是比较常见的网络攻击方式之一,在客户端在向服务器发送请求的时候,sql命令通过表单提交或者url字符串拼接传递到后台持久层,最终达到欺骗服务器执行恶意的SQL命令,下面这篇文章主要给大家总结介绍了关于Java项目防止SQL注入的几种方法,需要的朋友可以参考下
    2023-04-04
  • IntelliJ IDEA最佳配置(推荐)

    IntelliJ IDEA最佳配置(推荐)

    这篇文章主要介绍了IntelliJ IDEA最佳配置,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • Netty学习之理解selector原理示例

    Netty学习之理解selector原理示例

    这篇文章主要为大家介绍了Netty学习之理解selector原理示例使用分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪<BR>
    2023-07-07
  • springboot使用redisTemplate操作lua脚本

    springboot使用redisTemplate操作lua脚本

    本文主要介绍了springboot使用redisTemplate操作lua脚本,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载(替换过程)

    Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载(替换过程)

    Apache 开源项目 Log4j 的远程代码执行漏洞细节被公开,由于 Log4j 的广泛使用,该漏洞一旦被攻击者利用会造成严重危害,下面小编给大家带来了Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载,感兴趣的朋友一起看看吧
    2021-12-12

最新评论