一文讲透RabbitMQ 消息队列中的拒绝机制

 更新时间:2025年07月21日 10:18:53   作者:逸水心  
在消息队列系统中,如果消费者由于某些原因无法处理当前接收到的消息,可以通过以下机制拒绝消息,下面就来介绍了RabbitMQ 消息队列中的拒绝机制,感兴趣的可以了解一下

在消息队列系统中,如果消费者由于某些原因无法处理当前接收到的消息,可以通过以下机制拒绝消息,并控制消息的后续处理方式(如重新入队或丢弃)。以下是具体实现和注意事项:

一. RabbitMQ 中的拒绝机制

1. RabbitMQ 中的拒绝机制

RabbitMQ 中,消费者可以通过以下两种方式拒绝消息:

(1)basic.reject(拒绝单条消息)

功能:拒绝单条消息,并指定是否将消息重新放回队列。

参数

  • deliveryTag:消息的唯一标识符(由 RabbitMQ 分配)。
  • requeue:布尔值,决定消息是否重新入队(true 表示重新入队,false 表示丢弃)。

示例代码(Python):

import pika

def callback(ch, method, properties, body):
    print(f"Received message: {body}")
    # 模拟处理失败
    if some_condition:
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='my_queue', on_message_callback=callback)

(2)basic.nack(批量拒绝消息)

功能:拒绝多条消息,支持批量操作。

参数

  • deliveryTag:消息的唯一标识符。
  • multiple:布尔值,决定是否拒绝 deliveryTag 之前的所有未确认消息。
  • requeue:布尔值,决定消息是否重新入队。

示例代码(Python):

def callback(ch, method, properties, body):
    print(f"Received message: {body}")
    # 模拟处理失败
    if some_condition:
        ch.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True)
    else:
        ch.basic_ack(delivery_tag=method.delivery_tag)

2. 拒绝消息后的处理方式

拒绝消息后,消息的处理方式取决于 requeue 参数的值:

requeue 值消息处理方式
True消息重新入队,可能被其他消费者或当前消费者再次消费(需确保队列未满)。
False消息直接丢弃,或进入 死信队列(需提前配置死信队列规则)。

3. 死信队列(Dead Letter Queue, DLQ)

如果消息被拒绝且 requeue=False,消息可能被丢弃。为了避免消息丢失,可以通过配置 死信队列 将消息转发到另一个队列,供后续分析或重试。

配置死信队列的步骤(RabbitMQ):

声明队列时绑定死信交换器

channel.queue_declare(
    queue='my_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',  # 死信交换器名称
        'x-message-ttl': 60000  # 可选:消息过期时间(毫秒)
    }
)

声明死信交换器和队列

channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlx_queue')
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue', routing_key='dlx_key')

消费者处理失败时拒绝消息并进入死信队列

def callback(ch, method, properties, body):
    if some_condition:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    else:
        ch.basic_ack(delivery_tag=method.delivery_tag)

二. Spring AMQP 中的拒绝机制

4. Spring AMQP 中的拒绝机制

Spring AMQP 中,可以通过以下方式实现消息拒绝:

(1)手动确认模式(AcknowledgeMode.MANUAL)

代码示例

@RabbitListener(queues = "my_queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
    try {
        // 处理消息
        if (someCondition) {
            // 拒绝消息并重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        } else {
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    } catch (Exception e) {
        // 异常处理
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

(2)自动确认模式(AcknowledgeMode.AUTO)

Spring 会根据方法是否抛出异常自动决定是否发送 basic.nackbasic.ack

配置示例(YAML):

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

三. RocketMQ 中的拒绝机制

5. RocketMQ 中的拒绝机制

RocketMQ 中,消费者无法直接“拒绝”消息,但可以通过以下方式模拟:

(1)消费失败时返回ConsumeConcurrentlyStatus.RECONSUME_LATER

功能:消息会重新投递(默认延迟10秒)。

代码示例(Java):

public class MyConsumer implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            // 处理消息
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 消费失败,重新投递
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

(2)限制重试次数

通过 maxReconsumeTimes 配置最大重试次数,避免无限循环:

consumer.setMaxReconsumeTimes(3); // 最大重试3次

四 、常见场景&注意事项

6.常见场景与处理建议

场景 1:消息格式错误或业务逻辑异常

  • 处理方式
    • 拒绝消息(requeue=False)并记录日志。
    • 配置死信队列,将消息转发到专门的错误队列供人工处理。

场景 2:资源不足或临时故障

  • 处理方式
    • 拒绝消息并重新入队(requeue=True),等待资源恢复后重新消费。
    • 在死信队列中设置重试逻辑(如定时任务重新投递)。

场景 3:消息已过期或无效

  • 处理方式
    • 拒绝消息(requeue=False)并丢弃。

    • 配置死信队列,记录过期消息用于分析。

7. 注意事项

  1. 避免无限循环
    • 如果消息多次被拒绝并重新入队,可能导致无限消费循环。需结合 死信队列重试次数限制 解决。
  2. 资源占用
    • 频繁拒绝消息并重新入队可能增加系统负载,需合理配置 requeueprefetchCount
  3. 消息可靠性
    • 使用 手动确认死信队列 保障消息不丢失。

总结

消费者拒绝消息的核心在于通过 basic.reject/basic.nack(RabbitMQ)或 RECONSUME_LATER(RocketMQ)控制消息的后续处理。结合 死信队列重试机制,可以有效处理异常场景,确保消息的可靠性和系统的健壮性。

到此这篇关于一文讲透RabbitMQ 消息队列中的拒绝机制的文章就介绍到这了,更多相关RabbitMQ 拒绝机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • Hive 判断某个字段长度的示例代码

    Hive 判断某个字段长度的示例代码

    hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制,这篇文章主要介绍了Hive 判断某个字段长度,需要的朋友可以参考下
    2024-08-08
  • ant design vue 图片预览组件自定义样式

    ant design vue 图片预览组件自定义样式

    这篇文章主要为大家介绍了ant design vue 图片预览组件自定义样式方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-03
  • 一个假冒的序列号被用来注册Internet Download Manager,IDM正在退出的解决办法

    一个假冒的序列号被用来注册Internet Download Manager,IDM正在退出的解决办法

    这篇文章主要介绍了一个假冒的序列号被用来注册Internet Download Manager IDM正在退出的解决办法,在文章末尾给大家分享了序列号和绿色软件,大家根据自身情况选择,需要的朋友可以参考下
    2023-01-01
  • 2018年最值得一读的互联网书单

    2018年最值得一读的互联网书单

    2018年已经过去了近三分之二了,是不是感觉时光匆匆、一去不复返,过去的时间我们追不回,但是我们可以更加珍惜以后的时间!下面这篇文章主要给大家分享了2018年最值得一读的互联网书单,希望以后的日子里大家可以多读书,读好书
    2018-08-08
  • MATLAB plot函数功能及用法详解

    MATLAB plot函数功能及用法详解

    plot 函数语法使用plot绘制二维线图,这篇文章主要介绍了MATLAB plot函数详解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-01-01
  • 解决使用IDE Run运行出错package pack/test is not in GOROOT (/usr/local/go/src/pack/test)

    解决使用IDE Run运行出错package pack/test is not in GOROOT (/usr/loca

    这篇文章主要介绍了解决使用IDE Run运行出错package pack/test is not in GOROOT (/usr/local/go/src/pack/test),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • UltraEdit编辑器免费激活方法

    UltraEdit编辑器免费激活方法

    UltraEdit 是一套功能强大的文本编辑器,可以编辑文本、十六进制、ASCII 码,完全可以取代记事本,下面小编把这款UltraEdit编辑器免费激活方法分享给大家,需要的朋友参考下
    2021-08-08
  • git之reflog命令的使用

    git之reflog命令的使用

    git reflog 命令可以用来查看本地仓库的所有操作记录,包括所有分支的提交、合并、重置等操作,本文就详细的介绍一下如何使用,感兴趣的可以了解一下
    2023-08-08
  • 超实用Internet Download Manager(IDM)破解注册码,全版本通用

    超实用Internet Download Manager(IDM)破解注册码,全版本通用

    IDM下载器是一个十分好用的文件下载工具。IDM下载器它能够帮助你提升5倍的下载速度,强大的续传功能,让你不再担心因网络问题、计算机宕机、停电等原因所造成的数据不全问题,下面小编给大家带来了Internet Download Manager(IDM)破解注册码,感兴趣的朋友参考下吧
    2023-01-01
  • DeepSeek本机部署详细步骤(基于Ollama和Docker管理)

    DeepSeek本机部署详细步骤(基于Ollama和Docker管理)

    这篇文章主要介绍了如何利用ollama和docker在本机部署DeepSeek大语言模型,提供了一种高效、便捷且稳定的部署方式,步骤包括硬件和软件安装、模型获取、容器创建和启动等,通过图文介绍的非常详细,需要的朋友可以参考下
    2025-02-02

最新评论