关于SpringBoot整合RabbitMQ实现死信队列

 更新时间:2023年05月20日 10:54:31   作者:叫我二蛋  
这篇文章主要介绍了关于SpringBoot整合RabbitMQ实现死信队列,死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已,需要的朋友可以参考下

概念介绍

什么是死信

死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:

  1. 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
  2. 消息到达生存时间还未被消费。
  3. 队列超过长度限制,消息被丢弃。

这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图

在这里插入图片描述

死信队列应用

  • 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
  • 可以实现延迟消费功能。比如,订单15分钟内未支付。

注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:

  • 维护多个队列,每个队列维护一个TTL时间。
  • 使用延迟交换机。这种方式需要下载插件支持

工程搭建

环境说明

  • RabbitMQ环境
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

搭建步骤

1.创建SpringBoot项目。

2.pom.xml文件导入RabbitMQ依赖。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.application.yml文件添加RabbitMQ配置。

spring:
  # rabbitmq配置信息 RabbitProperties类
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启confirm机制
    publisher-confirm-type: correlated
    # 开启return机制
    publisher-returns: true
    #全局配置,局部配置存在就以局部为准
    listener:
      simple:
        acknowledge-mode: manual # 手动ACK

实现死信

准备Exchange&Queue

@Configuration
public class RabbitMQConfig {
    /**
     * 正常队列
     */
    public static final String EXCHANGE = "boot-exchange";
    public static final String QUEUE = "boot-queue";
    public static final String ROUTING_KEY = "boot-rout";
    /**
     * 死信队列
     */
    public static final String DEAD_EXCHANGE = "dead-exchange";
    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead-rout";
    /**
     * 声明死信交换机
     *
     * @return
     */
    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
    }
    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    /**
     * 绑定死信的队列和交换机
     *
     * @param deadExchange
     * @param deadQueue
     * @return
     */
    @Bean
    public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
    /**
     * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
     *
     * @return
     */
    @Bean
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE).build();
    }
    /**
     * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
     * 绑定死信交换机及路由key
     *
     * @return
     */
    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                //.ttl(10000)
                //队列最大长度
                .maxLength(1)
                .build();
    }
    /**
     * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
     *
     * @param bootExchange
     * @param bootQueue
     * @return
     */
    @Bean
    public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }
}

监听死信队列

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
    public void listener_dead(String msg, Channel channel, Message message) throws IOException {
        System.out.println("死信接收到消息" + msg);
        System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
        System.out.println("messageID:" + message.getMessageProperties().getMessageId());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

方式一、消费者拒绝&否认

  • 拒绝消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息" + msg);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
    }
  • 否认消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息" + msg);
 		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }

方式二、超过消息TTL 发送消息时设置TTL

@SpringBootTest
public class Publisher {
    @Autowired
    private RabbitTemplate template;
        /**
     * 5秒未被消费会路由到死信队列
     */
    @Test
    public void publish_expir() {
        template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
            message.getMessageProperties().setExpiration("5000");
            return message;
        });
    }
}
  • 设置队列所有消息的TTL

更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改

    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                .ttl(10000)
                .build();
    }

方式三、超过队列长度限制

设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。

    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                .maxLength(1)
                .build();
    }

代码仓库 点我

到此这篇关于关于SpringBoot整合RabbitMQ实现死信队列的文章就介绍到这了,更多相关RabbitMQ实现死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 用JAVA 设计生成二维码详细教程

    用JAVA 设计生成二维码详细教程

    本文主要介绍用JAVA 设计生成二维码,这里一步一步详细介绍用 java 如何设计二维码,并附有代码示例以便参考,有需要的小伙伴可以参考下
    2016-08-08
  • 一文深入理解Java中的深拷贝机制

    一文深入理解Java中的深拷贝机制

    在Java编程中,我们经常需要处理对象的复制问题,深拷贝和浅拷贝是两种常见的复制方式,它们在内存管理和对象引用方面存在不同特点,本文将带大家深入探究Java中的深拷贝机制,需要的朋友可以参考下
    2023-09-09
  • MyBatis入门程序

    MyBatis入门程序

    MyBatis是支持普通SQL查询,存储过程和高级映射的优秀持久层框架。接下来本文给大家带来了MyBatis入门程序,感兴趣的朋友一起学习吧
    2016-08-08
  • SpringBoot集成EasyExcel的应用场景分析

    SpringBoot集成EasyExcel的应用场景分析

    这篇文章主要介绍了SpringBoot集成EasyExcel的应用场景,java领域解析、生成excel比较有名的框架有apache poi、jxl等,今天通过实例代码给大家详细介绍,需要的朋友可以参考下
    2021-07-07
  • Spring Boot应用程序中如何使用Keycloak详解

    Spring Boot应用程序中如何使用Keycloak详解

    这篇文章主要为大家介绍了Spring Boot应用程序中如何使用Keycloak详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-05-05
  • Java如何通过File类方法删除指定文件夹中的全部文件

    Java如何通过File类方法删除指定文件夹中的全部文件

    这篇文章主要给大家介绍了关于Java如何通过File类方法删除指定文件夹中的全部文件的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • spring中向一个单例bean中注入非单例bean的方法详解

    spring中向一个单例bean中注入非单例bean的方法详解

    Spring是先将Bean对象实例化之后,再设置对象属性,所以会先调用他的无参构造函数实例化,每个对象存在一个map中,当遇到依赖,就去map中调用对应的单例对象,这篇文章主要给大家介绍了关于spring中向一个单例bean中注入非单例bean的相关资料,需要的朋友可以参考下
    2021-07-07
  • java实现树形菜单对象

    java实现树形菜单对象

    这篇文章主要为大家详细介绍了java实现树形菜单对象,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • 浅谈一下数据库连接池Druid德鲁伊

    浅谈一下数据库连接池Druid德鲁伊

    数据库连接池就是一个容器持有多个数据库连接,当程序需要操作数据库的时候直接从池中取出连接,使用完之后再还回去,和线程池一个道理,需要的朋友可以参考下
    2023-05-05
  • Spring boot JPA实现分页和枚举转换代码示例

    Spring boot JPA实现分页和枚举转换代码示例

    这篇文章主要介绍了Spring boot JPA实现分页和枚举转换代码示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09

最新评论