Springboot使用RabbitMq延迟队列和死信队列详解

 更新时间:2025年10月24日 10:55:38   作者:满小超的代码世界  
文章介绍了在项目中实现文件分片上传功能时遇到的问题,以及使用Quartz定时器、Redis定时器和RabbitMQ延迟队列和死信队列三种方法来解决定时任务和删除临时桶的问题,文章详细描述了RabbitMQ延迟队列的配置和使用,包括延迟时间、死信交换机和监听器的设置

前言

在最近的项目中,结合minio文件服务器的一些特性。

需要做一个分片上传的功能:用户上传文件到md5的桶下,合并文件后删除这个临时桶。

会出现这样一种情况,用户上传文件传到一半就不再上传了,那么如何去删除,什么时候去删除时需要解决问题。

一、业务解决方案

1.quartz定时器

如果是单体项目,可以考虑使用quartz定时器。在创建桶的时候加入到定时任务里。

2.redis定时器

redis定时器需要修改配置文件,并且对redis进行监听,在创建桶时,设置过时时间,一旦时间超时,可以对key进行捕捉,最好对名字进行规范设计以便于业务

3.mq消息队列

使用延迟队列和死信队列进行定时任务

这篇主要讲解mq的方式解决问题

二、RabbitMq延迟队列

1.延迟队列

延迟队列也是一个普通的队列,和普通的队列相比,他多了几个属性,比如:

1)延迟的时间:表示队列中消息的生命周期,在指定时间后,要么抛弃这个消息,要么投递到死信队列中

2)指定死信交换机:如果不希望丢弃这个消息,那么可以将这个过期的消息丢到死信队列中

定义一个延迟队列

    //桶延迟队列
    @Bean(BUCKET_TTL_QUEUE)
    public Queue bucketTtlQueue(){
        Map<String,Object> deadParamsMap = new HashMap<>();
        // 设置死信队列的Exchange
        deadParamsMap.put("x-dead-letter-exchange",BUCKET_DEAD_EXCHANGE);
        //设置死信队列的RouteKey
        deadParamsMap.put("x-dead-letter-routing-key",BUCKET_DEAD_QUEUE);
        // 设置对接过期时间"x-message-ttl"
        deadParamsMap.put("x-message-ttl",60000*5);//5分钟
        // 设置对接可以存储的最大消息数量
        //deadParamsMap.put("x-max-length",10);
        return new Queue(BUCKET_TTL_QUEUE,true,false,false,deadParamsMap);
    }

延迟队列交换机

如上所说,延迟队列本就是一个普通的队列,如果你想更细粒的对他进行控制,那么需要绑定交换机,如果不绑定交换机,会绑定到默认交换机,在发送消息时,交换机写""就行,默认交换机为直连交换机

我这里指定了延迟队列的交换机,因为没有做消息幂等性,所以采用直连交换机应对在集群下消息只被消费一次

    //桶延迟交换机
    @Bean(BUCKET_TTL_EXCHANGE)
    public DirectExchange bucketTtlExchange() {
        return new DirectExchange(BUCKET_TTL_EXCHANGE,true,false);
    }

    // 绑定
    @Bean
    public Binding bucketTtlBinding() {
        return BindingBuilder.bind(bucketTtlQueue())
                .to(bucketTtlExchange())
                .with(BUCKET_TTL_QUEUE);
    }

2.死信交换机

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

死信交换机也是普通交换机,他只是你指定接收过期消息的交换机而已

    /**
     * 死信队列
     *
     * @return
     */
    @Bean(BUCKET_DEAD_QUEUE)
    public Queue bucketDeadQueue() {
        //属性参数 队列名称 是否持久化
        return new Queue(BUCKET_DEAD_QUEUE, true);
    }

    /**
     * 死信队列交换机
     *
     * @return
     */
    @Bean(BUCKET_DEAD_EXCHANGE)
    public DirectExchange bucketDeadExchange() {
        return new DirectExchange(BUCKET_DEAD_EXCHANGE,true,false);
    }

    /**
     * 给死信队列绑定交换机
     *
     * @return
     */
    @Bean
    public Binding bucketDeadBinding() {
        return BindingBuilder.bind(bucketDeadQueue()).to(bucketDeadExchange()).with(BUCKET_DEAD_QUEUE);
    }

3.监听器

消息处理的逻辑,在消息过期后,送到死信交换机里,监听器监听到死信交换机的消息进行删除桶以及文件的业务逻辑处理

/**
 * @description:死信队列监听器,用来删除过期的桶
 * @author manchao
 * @date 2022/2/17 9:52
 */
@Configuration
public class BucketDeadConsumer {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    private MinioTemplate minioTemplate;

    @Bean
    public SimpleMessageListenerContainer BucketDeadListenerContainer() {
        SimpleMessageListenerContainer container = new  SimpleMessageListenerContainer(cachingConnectionFactory);
        // 监听队列名
        container.setQueueNames(MyMqConfig.BUCKET_DEAD_QUEUE);
        // 当前消费者数量 开启几个线程去处理数据 支持运行时动态修改
        container.setConcurrentConsumers(5);
        // 最大消费者数量  ,  消息堵塞太多的时候,会帮我自动扩展到我的最大消费者数量
        container.setMaxConcurrentConsumers(10);
        // 是否重回队列
        container.setDefaultRequeueRejected(true);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置监听器
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, Channel channel) throws IOException {
                // 消息的唯一性ID deliveryTag:该消息的index 自增长
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                byte[] messageBody = message.getBody();
                String s = new String(messageBody);
                System.out.println("消息: " + s);
                System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
                System.out.println("交换机: "+message.getMessageProperties().getReceivedExchange());
                //删除桶
                try {
                    minioTemplate.removeBuckets(s, "");
                    channel.basicAck(deliveryTag, false);
                } catch (IOException e) {
                    e.printStackTrace();
                    channel.basicReject(deliveryTag, false);
                }
            }
        });
        return container;
    }
}

注:我设置了消息发送和确认的回调函数,为什么没有触发这个函数?因为我是从后台管理页面发的消息,没有通过rabbitteplate进行发送,不会是这个原因吧!

总结

业务的解决方法有太多种了,找到一个高可用以及简便的方法才是解决问题的关键

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Spring MVC各种参数进行封装的方法实例

    Spring MVC各种参数进行封装的方法实例

    这篇文章主要给大家介绍了关于Spring MVC各种参数进行封装的相关资料,SpringMVC内置多种数据类型转换器,可以根据请求中的参数与后端控制器方法的参数的关系为我们实现简单的数据封装,需要的朋友可以参考下
    2023-06-06
  • java二维码生成的方法

    java二维码生成的方法

    这篇文章主要为大家详细介绍了java二维码生成的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • Mybatis-Plus如何配置分页对象

    Mybatis-Plus如何配置分页对象

    本文主要介绍了Mybatis-Plus如何配置分页对象,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • 解决Java项目中request流只能获取一次的问题

    解决Java项目中request流只能获取一次的问题

    Java项目开发中可能存在以下几种情况,你需要在拦截器中统一拦截请求和你项目里可能需要搞一个统一的异常处理器,这两种情况是比较常见的,本文将给大家介绍如何解决Java项目中request流只能获取一次的问题,需要的朋友可以参考下
    2024-02-02
  • java swing框架实现贪吃蛇游戏

    java swing框架实现贪吃蛇游戏

    这篇文章主要为大家详细介绍了java swing框架实现贪吃蛇游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-12-12
  • SpringBoot如何读取配置文件中的数据到map和list

    SpringBoot如何读取配置文件中的数据到map和list

    这篇文章主要介绍了SpringBoot如何读取配置文件中的数据到map和list,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • 深入了解与实例应用pinyin4j库

    深入了解与实例应用pinyin4j库

    在Java开发中,pinyin4j库是处理汉字与拼音转换的重要工具,提供将汉字转换为全拼音和提取汉字首字母的核心功能,本文详细探讨了pinyin4j库的使用方法、核心功能和实例应用,感兴趣的朋友跟随小编一起看看吧
    2025-08-08
  • 使用1招搞定maven打包空间不足的问题

    使用1招搞定maven打包空间不足的问题

    这篇文章主要介绍了使用1招搞定maven打包空间不足的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-09-09
  • Java Socket编程服务器响应客户端实例代码

    Java Socket编程服务器响应客户端实例代码

    这篇文章主要介绍了Java Socket编程服务器响应客户端实例代码,具有一定借鉴价值,需要的朋友可以参考下
    2017-12-12
  • SpringBoot与Quartz集成实现分布式定时任务集群的代码实例

    SpringBoot与Quartz集成实现分布式定时任务集群的代码实例

    今天小编就为大家分享一篇关于SpringBoot与Quartz集成实现分布式定时任务集群的代码实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03

最新评论