Springboot结合rabbitmq实现的死信队列

 更新时间:2023年09月05日 09:16:04   作者:西安未央  
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,本文主要介绍了Springboot结合rabbitmq实现的死信队列,具有一定的参考价值,感兴趣的可以了解一下

概述

RabbitMQ是流行的开源消息队列系统,使用erlang语言开发。为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看起来可行,但是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,通过官网文档的学习,才发现对于死信队列存在一些误解,导致配置死信队列之路困难重重。

详细

一、运行效果

image.png

image.png

二、实现过程

①、先创建一个Springboot项目。然后在pom文件中添加  spring-boot-starter-amqp  和  spring-boot-starter-web 的依赖,接下来创建一个Config类,这里是关键:

package com.zyf.rabbitmqdeadletterdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
    public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
    public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
    public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }
    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }
    // 声明业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }
    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }
    // 声明死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }
    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
    // 声明业务队列B绑定关系
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
    // 声明死信队列B绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

②、接下来,是业务队列的消费代码:

@Slf4j@Componentpublic class BusinessMessageReceiver {    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息A:{}", msg);        boolean ack = true;
        Exception exception = null;        try {            if (msg.contains("deadletter")){                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }        if (!ack){
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

③、然后配置死信队列的消费者:

@Componentpublic class DeadLetterMessageReceiver {    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

④、为了方便测试,写一个简单的消息生产者,并通过controller层来生产消息。

@Componentpublic class BusinessMessageSender {    @Autowired
    private RabbitTemplate rabbitTemplate;    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController {    @Autowired
    private BusinessMessageSender sender;    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

三、项目结构图

image.png

四、补充总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

总结一下死信消息的生命周期:

  • 业务消息被投入业务队列
  • 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  • 被nck或reject的消息由RabbitMQ投递到死信交换机中
  • 死信交换机将消息投入相应的死信队列
  • 死信队列的消费者消费死信消息

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

到此这篇关于Springboot结合rabbitmq实现的死信队列的文章就介绍到这了,更多相关Springboot rabbitmq死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring更简单的存储方式与获取方式详解

    Spring更简单的存储方式与获取方式详解

    Spring是一个轻量级的IoC和AOP容器框架,是为Java应用程序提供基础性服务的一套框架,目的是用于简化企业应用程序的开发,它使得开发者只需要关心业务需求,下面这篇文章主要给大家介绍了关于Spring更简单的存储方式与获取方式的相关资料,需要的朋友可以参考下
    2022-06-06
  • Java之Spring注解开发案例详解

    Java之Spring注解开发案例详解

    这篇文章主要介绍了Java之Spring注解开发案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-07-07
  • springboot动态加载jar包动态配置实例详解

    springboot动态加载jar包动态配置实例详解

    这篇文章主要给大家介绍了关于springboot动态加载jar包动态配置的相关资料,在项目开发的过程中,有时候需要动态灵活的加载某个jar包并执行其里面的方法的时候,需要的朋友可以参考下
    2023-11-11
  • Java使用Junit4.jar进行单元测试的方法

    Java使用Junit4.jar进行单元测试的方法

    今天通过本文给大家介绍Java使用Junit4.jar进行单元测试的方法,本文通过图文实例相结合给大家介绍的非常详细,需要的朋友参考下吧
    2021-11-11
  • Kotlin 基础教程之异常

    Kotlin 基础教程之异常

    这篇文章主要介绍了Kotlin 基础教程之异常的相关资料,需要的朋友可以参考下
    2017-06-06
  • Java中ScheduledExecutorService介绍和使用案例(推荐)

    Java中ScheduledExecutorService介绍和使用案例(推荐)

    ScheduledExecutorService是Java并发包中的接口,用于安排任务在给定延迟后运行或定期执行,它继承自ExecutorService,具有线程池特性,可复用线程,提高效率,本文主要介绍java中的ScheduledExecutorService介绍和使用案例,感兴趣的朋友一起看看吧
    2024-10-10
  • 代码详解Java猴子选王问题(约瑟夫环)

    代码详解Java猴子选王问题(约瑟夫环)

    本篇文章通过实例给大家分析了java约瑟夫环这个经典内容,有兴趣的跟着小编一起学习下吧。
    2018-02-02
  • springboot整合Mybatis-plus的实现

    springboot整合Mybatis-plus的实现

    这篇文章主要介绍了springboot整合Mybatis-plus的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • SpringBoot跨域问题的解决方法实例

    SpringBoot跨域问题的解决方法实例

    这篇文章主要给大家介绍了关于SpringBoot跨域问题的解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • Java多输入框查询需求实现方法详解

    Java多输入框查询需求实现方法详解

    这篇文章主要给大家介绍了Java多输入框查询需求实现的相关资料,文中通过代码以及图文介绍的非常详细,对大家学习或者使用Java具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-10-10

最新评论