RabbitMQ保证消息的可靠性问题及解决
消息可靠性问题
在消息队列,任何一个环节出问题都会导致消息的不可靠(消息丢失),如何确保消息的可靠性呢,需要考虑到其中的每个角色,生产者可靠性、MQ可靠性、消费者可靠性。
生产者可靠性
生产者重试
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
max-attempts: 3 # 总共尝试次数耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息
生产者确认
1.Publisher Return
消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。
2.Publisher Confirm
消息投递成功返回ack,投递失败返回nack。
注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。
默认两种机制都是关闭状态,需要通过配置文件来开启。
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制MQ可靠性
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
在配置的时候默认都会持久化
消费者可靠性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。
即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态。
回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息nack:消息处理失败,RabbitMQ需要再次投递消息reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式:
none:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回
nack; - 如果是消息处理或校验异常,自动返回
reject,返回的异常包括:MessageConversionException、MethodArgumentTypeMismatchException等
通过下面的配置可以修改消息确认的处理方式为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ackauto模式就是平常的写法
manual模式需要手写
Message:是spring AMQP封装的底层消息对象。Channel:是消费端与MQ基于通道的操作对象。
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg, Channel channel, Message message) throws InterruptedException, IOException {
log.info("spring 消费者接收到消息:【" + msg + "】");
//返回nack
//每个参数的意义:1.消息的标记 2.是否确认之前所有未确认的消息 3.是否重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// log.info("消息处理完成");
// //返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}失败重试机制
本地重试
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力。
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的投递到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
max-attempts: 3 # 最大重试次数- 开启本地重试时,消息处理过程中抛出异常,不会请求到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败消息入队
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个固定交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。
//在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//定义一个RepublishMessageRecoverer,指定失败消息投递交换机的名称及routingkey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
监听失败消息队列将失败消息写入数据库中,由人工定期处理
业务幂等性
幂等性:在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。
在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。
例如:
- 根据id删除数据
- 查询数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行,然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况。
例如:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
因此,我们必须想办法保证消息处理的幂等性。
这里给出两种方案:
- 唯一消息ID
- 业务状态判断
唯一消息ID思路非常简单:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库或Redis
- 如果下次又收到相同消息,去数据库或Redis查询判断是否存在,存在则为重复消息放弃处理。
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。
例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,使用唯一消息ID的方案需要操作数据库或Redis保存消息ID,所以更推荐使用业务判断的方案。
1.创建交换机,队列,消息进行持久化
2.生产者:
- 开启消息发送失败的重试策略,设置重试次数和间隔比例,耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行错误补偿.如使用定时任务去扫描这个表,重新发送消息
- 开启confirm机制,保证消息正确到达交换机,到达返回ack,没有到达返回nack,写入数据库,后期重试
- 开启return机制,保证消息正确到达队列,没有到达调用ReturnCallback,写入数据库,后期重试
3.消费者:
- 开启手动ack,让消费者消费成功后,手动提交.使用Redis来记录消费失败的次数,如果到达阈值,则记录到数据库,后期使用人工干预
- 自动ack + 重试耗尽的失败策略,定义错误交换机队列,后期通过人工进行干预
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
SpringBoot 二维码生成base64并上传OSS的实现示例
本文主要介绍了SpringBoot 二维码生成base64并上传OSS的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2022-05-05
JAVA编程实现随机生成指定长度的密码功能【大小写和数字组合】
这篇文章主要介绍了JAVA编程实现随机生成指定长度的密码功能,可生成带有大小写和数字组合的随机字符串,需要的朋友可以参考下2017-07-07


最新评论