RabbitMQ保证消息的可靠性问题及解决

 更新时间:2026年04月29日 09:42:47   作者:又旅行又开拓的绳匠..  
文章主要讨论了如何确保消息队列中消息的可靠性,包括生产者重试机制、确认机制、MQ持久化配置和消费者确认机制等,还介绍了消费者本地重试和失败消息入队的处理策略,以及幂等性的实现方案,最后给出了一个综合的配置示例

消息可靠性问题

在消息队列,任何一个环节出问题都会导致消息的不可靠(消息丢失),如何确保消息的可靠性呢,需要考虑到其中的每个角色,生产者可靠性、MQ可靠性、消费者可靠性。

生产者可靠性

生产者重试

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
        max-attempts: 3 # 总共尝试次数

耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息

生产者确认

1.Publisher Return

消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。

2.Publisher Confirm

消息投递成功返回ack,投递失败返回nack。

注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。

默认两种机制都是关闭状态,需要通过配置文件来开启。

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

MQ可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

在配置的时候默认都会持久化

消费者可靠性

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。

即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态。

回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式:

none:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

auto:自动模式。当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

  • 如果是业务异常,会自动返回nack
  • 如果是消息处理或校验异常,自动返回reject,返回的异常包括:MessageConversionException、MethodArgumentTypeMismatchException等

通过下面的配置可以修改消息确认的处理方式为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

auto模式就是平常的写法

manual模式需要手写

  • Message:是spring AMQP封装的底层消息对象。
  • Channel:是消费端与MQ基于通道的操作对象。
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg, Channel channel, Message message) throws InterruptedException, IOException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        //返回nack
        //每个参数的意义:1.消息的标记 2.是否确认之前所有未确认的消息 3.是否重新入队
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//        log.info("消息处理完成");
//        //返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

失败重试机制

本地重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的投递到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
          max-attempts: 3 # 最大重试次数
  • 开启本地重试时,消息处理过程中抛出异常,不会请求到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

失败消息入队

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个固定交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。

//在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//定义一个RepublishMessageRecoverer,指定失败消息投递交换机的名称及routingkey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

监听失败消息队列将失败消息写入数据库中,由人工定期处理

业务幂等性

幂等性:在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

例如:

  • 根据id删除数据
  • 查询数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行,然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况。

例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

因此,我们必须想办法保证消息处理的幂等性。

这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID思路非常简单:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库或Redis
  • 如果下次又收到相同消息,去数据库或Redis查询判断是否存在,存在则为重复消息放弃处理。

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。

例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,使用唯一消息ID的方案需要操作数据库或Redis保存消息ID,所以更推荐使用业务判断的方案。

1.创建交换机,队列,消息进行持久化

2.生产者:

  • 开启消息发送失败的重试策略,设置重试次数和间隔比例,耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行错误补偿.如使用定时任务去扫描这个表,重新发送消息
  • 开启confirm机制,保证消息正确到达交换机,到达返回ack,没有到达返回nack,写入数据库,后期重试
  • 开启return机制,保证消息正确到达队列,没有到达调用ReturnCallback,写入数据库,后期重试

3.消费者:

  • 开启手动ack,让消费者消费成功后,手动提交.使用Redis来记录消费失败的次数,如果到达阈值,则记录到数据库,后期使用人工干预
  • 自动ack + 重试耗尽的失败策略,定义错误交换机队列,后期通过人工进行干预

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Mybatis 简介与原理

    Mybatis 简介与原理

    MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集
    2017-05-05
  • SpringBoot 二维码生成base64并上传OSS的实现示例

    SpringBoot 二维码生成base64并上传OSS的实现示例

    本文主要介绍了SpringBoot 二维码生成base64并上传OSS的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • JAVA编程实现随机生成指定长度的密码功能【大小写和数字组合】

    JAVA编程实现随机生成指定长度的密码功能【大小写和数字组合】

    这篇文章主要介绍了JAVA编程实现随机生成指定长度的密码功能,可生成带有大小写和数字组合的随机字符串,需要的朋友可以参考下
    2017-07-07
  • Java实现单例设计模式方法解析

    Java实现单例设计模式方法解析

    这篇文章主要介绍了Java实现单例设计模式方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • java去除空格、标点符号的方法实例

    java去除空格、标点符号的方法实例

    这篇文章主要给大家介绍了关于java去除空格、标点符号的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Java并发编程之Lock锁机制从使用到源码实现

    Java并发编程之Lock锁机制从使用到源码实现

    这篇文章主要介绍了Java并发编程之Lock锁机制从使用到源码实现,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2025-10-10
  • 5分钟教你使用java搞定网站登录验证码

    5分钟教你使用java搞定网站登录验证码

    这篇文章主要为大家介绍了使用java搞定网站登录验证码的快速实现方法示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • Java异常详解_动力节点Java学院整理

    Java异常详解_动力节点Java学院整理

    异常是Java语言中的一部分,它代表程序中由各种原因引起的“不正常”因素。下面通过本文给大家介绍java异常的相关知识,感兴趣的朋友一起看看吧
    2017-06-06
  • 详解SpringBoot 应用如何提高服务吞吐量

    详解SpringBoot 应用如何提高服务吞吐量

    这篇文章主要介绍了Spring Boot 应用如何提高服务吞吐量,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • 详解重试框架Spring retry实践

    详解重试框架Spring retry实践

    spring retry是从spring batch独立出来的一个能功能,主要实现了重试和熔断。这篇文章主要介绍了详解重试框架Spring retry实践,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-05-05

最新评论