Springboot使用RabbitMq延迟队列和死信队列详解

 更新时间:2025年10月24日 10:55:38   作者:满小超的代码世界  
文章介绍了在项目中实现文件分片上传功能时遇到的问题,以及使用Quartz定时器、Redis定时器和RabbitMQ延迟队列和死信队列三种方法来解决定时任务和删除临时桶的问题,文章详细描述了RabbitMQ延迟队列的配置和使用,包括延迟时间、死信交换机和监听器的设置

前言

在最近的项目中,结合minio文件服务器的一些特性。

需要做一个分片上传的功能:用户上传文件到md5的桶下,合并文件后删除这个临时桶。

会出现这样一种情况,用户上传文件传到一半就不再上传了,那么如何去删除,什么时候去删除时需要解决问题。

一、业务解决方案

1.quartz定时器

如果是单体项目,可以考虑使用quartz定时器。在创建桶的时候加入到定时任务里。

2.redis定时器

redis定时器需要修改配置文件,并且对redis进行监听,在创建桶时,设置过时时间,一旦时间超时,可以对key进行捕捉,最好对名字进行规范设计以便于业务

3.mq消息队列

使用延迟队列和死信队列进行定时任务

这篇主要讲解mq的方式解决问题

二、RabbitMq延迟队列

1.延迟队列

延迟队列也是一个普通的队列,和普通的队列相比,他多了几个属性,比如:

1)延迟的时间:表示队列中消息的生命周期,在指定时间后,要么抛弃这个消息,要么投递到死信队列中

2)指定死信交换机:如果不希望丢弃这个消息,那么可以将这个过期的消息丢到死信队列中

定义一个延迟队列

    //桶延迟队列
    @Bean(BUCKET_TTL_QUEUE)
    public Queue bucketTtlQueue(){
        Map<String,Object> deadParamsMap = new HashMap<>();
        // 设置死信队列的Exchange
        deadParamsMap.put("x-dead-letter-exchange",BUCKET_DEAD_EXCHANGE);
        //设置死信队列的RouteKey
        deadParamsMap.put("x-dead-letter-routing-key",BUCKET_DEAD_QUEUE);
        // 设置对接过期时间"x-message-ttl"
        deadParamsMap.put("x-message-ttl",60000*5);//5分钟
        // 设置对接可以存储的最大消息数量
        //deadParamsMap.put("x-max-length",10);
        return new Queue(BUCKET_TTL_QUEUE,true,false,false,deadParamsMap);
    }

延迟队列交换机

如上所说,延迟队列本就是一个普通的队列,如果你想更细粒的对他进行控制,那么需要绑定交换机,如果不绑定交换机,会绑定到默认交换机,在发送消息时,交换机写""就行,默认交换机为直连交换机

我这里指定了延迟队列的交换机,因为没有做消息幂等性,所以采用直连交换机应对在集群下消息只被消费一次

    //桶延迟交换机
    @Bean(BUCKET_TTL_EXCHANGE)
    public DirectExchange bucketTtlExchange() {
        return new DirectExchange(BUCKET_TTL_EXCHANGE,true,false);
    }

    // 绑定
    @Bean
    public Binding bucketTtlBinding() {
        return BindingBuilder.bind(bucketTtlQueue())
                .to(bucketTtlExchange())
                .with(BUCKET_TTL_QUEUE);
    }

2.死信交换机

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

死信交换机也是普通交换机,他只是你指定接收过期消息的交换机而已

    /**
     * 死信队列
     *
     * @return
     */
    @Bean(BUCKET_DEAD_QUEUE)
    public Queue bucketDeadQueue() {
        //属性参数 队列名称 是否持久化
        return new Queue(BUCKET_DEAD_QUEUE, true);
    }

    /**
     * 死信队列交换机
     *
     * @return
     */
    @Bean(BUCKET_DEAD_EXCHANGE)
    public DirectExchange bucketDeadExchange() {
        return new DirectExchange(BUCKET_DEAD_EXCHANGE,true,false);
    }

    /**
     * 给死信队列绑定交换机
     *
     * @return
     */
    @Bean
    public Binding bucketDeadBinding() {
        return BindingBuilder.bind(bucketDeadQueue()).to(bucketDeadExchange()).with(BUCKET_DEAD_QUEUE);
    }

3.监听器

消息处理的逻辑,在消息过期后,送到死信交换机里,监听器监听到死信交换机的消息进行删除桶以及文件的业务逻辑处理

/**
 * @description:死信队列监听器,用来删除过期的桶
 * @author manchao
 * @date 2022/2/17 9:52
 */
@Configuration
public class BucketDeadConsumer {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    private MinioTemplate minioTemplate;

    @Bean
    public SimpleMessageListenerContainer BucketDeadListenerContainer() {
        SimpleMessageListenerContainer container = new  SimpleMessageListenerContainer(cachingConnectionFactory);
        // 监听队列名
        container.setQueueNames(MyMqConfig.BUCKET_DEAD_QUEUE);
        // 当前消费者数量 开启几个线程去处理数据 支持运行时动态修改
        container.setConcurrentConsumers(5);
        // 最大消费者数量  ,  消息堵塞太多的时候,会帮我自动扩展到我的最大消费者数量
        container.setMaxConcurrentConsumers(10);
        // 是否重回队列
        container.setDefaultRequeueRejected(true);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置监听器
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, Channel channel) throws IOException {
                // 消息的唯一性ID deliveryTag:该消息的index 自增长
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                byte[] messageBody = message.getBody();
                String s = new String(messageBody);
                System.out.println("消息: " + s);
                System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
                System.out.println("交换机: "+message.getMessageProperties().getReceivedExchange());
                //删除桶
                try {
                    minioTemplate.removeBuckets(s, "");
                    channel.basicAck(deliveryTag, false);
                } catch (IOException e) {
                    e.printStackTrace();
                    channel.basicReject(deliveryTag, false);
                }
            }
        });
        return container;
    }
}

注:我设置了消息发送和确认的回调函数,为什么没有触发这个函数?因为我是从后台管理页面发的消息,没有通过rabbitteplate进行发送,不会是这个原因吧!

总结

业务的解决方法有太多种了,找到一个高可用以及简便的方法才是解决问题的关键

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java super关键字知识点详解

    java super关键字知识点详解

    在本篇文章里小编给大家整理的是一篇关于java super关键字知识点详解内容,有兴趣的朋友们可以参考下。
    2021-01-01
  • JavaWeb如何实现禁用浏览器缓存

    JavaWeb如何实现禁用浏览器缓存

    这篇文章主要介绍了JavaWeb如何实现禁用浏览器缓存,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • java基于控制台的学生学籍管理系统

    java基于控制台的学生学籍管理系统

    这篇文章主要为大家详细介绍了java基于控制台的学生学籍管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-07-07
  • 基于java实现websocket代码示例

    基于java实现websocket代码示例

    这篇文章主要介绍了基于java实现websocket代码示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12
  • Java简易学生成绩系统写法实例

    Java简易学生成绩系统写法实例

    在本篇文章里小编给大家分享的是关于Java简易学生成绩系统写法实例以及相关知识点,有需要的朋友们可以学习下。
    2019-09-09
  • SpringBoot集成MQTT实现交互服务通信

    SpringBoot集成MQTT实现交互服务通信

    MQTT非常适用于物联网领域,本文主要介绍了SpringBoot集成MQTT实现交互服务通信,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-08-08
  • java实现堆排序以及时间复杂度的分析

    java实现堆排序以及时间复杂度的分析

    本文主要介绍了java实现堆排序以及时间复杂度,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • java在hashmap初始化时赋初值过程解析

    java在hashmap初始化时赋初值过程解析

    这篇文章主要介绍了java在hashmap初始化时赋初值过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • 使用Java实现KMZ和KML数据的直接解析

    使用Java实现KMZ和KML数据的直接解析

    本文主要讲解如何用JAVA语言,直接解析KMZ数据,文章首先介绍google地图中的KMZ和KML数据,然后使用代码的方式实现数据的解析,最后展示解析成果以及如何将数据转换成空间WKT数据,需要的朋友可以参考下
    2024-06-06
  • 图解Java经典算法快速排序的原理与实现

    图解Java经典算法快速排序的原理与实现

    快速排序是基于二分的思想,对冒泡排序的一种改进。主要思想是确立一个基数,将小于基数的数放到基数左边,大于基数的数字放到基数的右边,然后在对这两部分进一步排序,从而实现对数组的排序
    2022-09-09

最新评论