Java RabbitMQ消息队列详解常见问题

 更新时间:2022年07月28日 11:52:05   作者:kaico2018  
消息队列是最古老的中间件之一,从系统之间有通信需求开始,就自然产生了消息队列。本文告诉什么是消息队列,为什么需要消息队列,常见的消息队列有哪些,RabbitMQ的部署和使用

消息堆积

消息堆积的产生场景:

  • 生产者产生的消息速度大于消费者消费的速度。解决:增加消费者的数量或速度。
  • 没有消费者进行消费的时候。解决:死信队列、设置消息有效期。相当于对我们的消息设置有效期,在规定的时间内如果没有消费的话,自动过期,过期的时候会执行客户端回调监听的方法将消息存放到数据库表记录,后期实现补偿。

保证消息不丢失

1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。

2、MQ服务器端应该将消息持久化到硬盘

3、消费者使用手动ack机制确认消息消费成功

如果MQ服务器容量满了怎么办?

使用死信队列将消息存到数据库中去,后期补偿消费。

死信队列

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生背景:

  • 消息投递到MQ中存放 消息已经过期
  • 队列达到最大的长度 (队列容器已经满了)生产者拒绝接收消息
  • 消费者消费多次消息失败,就会转移存放到死信队列中

代码案例:

maven依赖

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml配置

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

队列配置类

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信队列消费者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通队列消费者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

后台队列管理页面如下:

部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。

延迟队列

订单30分钟未支付,系统自动超时关闭的实现方案。

基于任务调度实现,效率是非常低。

基于redis过期key实现,key失效时会回调客户端一个方法。

用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;缺点:非常冗余,会在表中存放一个冗余字段。

基于mq的延迟队列(最佳方案)rabbitmq情况下。

原理:在我们下单的时候,往mq投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否已经支付。

实现逻辑:

主要使用死信队列来实现。

想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制

当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿。因为重试失败次数之后,队列会自动删除这个消息。

消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。

如果消费多次还是失败的情况下:

1、自动删除该消息;(消息可能丢失)

解决办法:

如果充实多次还是失败的情况下,最终存放到死信队列;

采用表日志记,消费失败错误日志的日志记录,后期人工自动对该消息实现补偿。

合理的选择重试机制

消费者获取消息后,调用第三方接口(HTTP请求),但是调用第三方接口失败呢?是否需要重试 ?

答:有时是因为网络异常调用失败,应该需要重试几次。

消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

答:不需要重试,代码异常需要重新修改代码发布项目。

消费者开启手动ack模式

第一步、springboot项目配置需要开启ack模式

acknowledge-mode: manual

第二步、消费者Java代码

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解决消息幂等问题

什么是消息幂等性?MQ消费者如何保证幂等性?

产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现

解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。

消费者代码逻辑:

RabbitMQ解决分布式事务问题

分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。

解决分布式事务核心思想:数据最终一致性。

分布式领域中名词:

强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;

强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。

弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。

最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。

基于RabbitMQ解决分布式事务的思路

基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)

  • 确认我们的生产者消息一定要投递到MQ中(消息确认机制)投递失败 就继续重试
  • 消费者采用手动ack的形式确认消息实现消费 注意幂等性问题,消费失败的情况下,mq自动帮消费者重试。
  • 保证我们的生产者第一事务先执行,如果执行失败采用补单队列(给生产者自己事务补充,确保生产者第一事务执行完成【数据最终一致性】)。

解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。

到此这篇关于Java RabbitMQ详解常见问题的解决的文章就介绍到这了,更多相关Java RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java数据结构之二分查找法 binarySearch的实例

    java数据结构之二分查找法 binarySearch的实例

    这篇文章主要介绍了java数据结构之二分查找法 binarySearch的实例的相关资料,希望通过本文能帮助到大家,让大家理解掌握这部分内容,需要的朋友可以参考下
    2017-10-10
  • Java实现简单台球游戏

    Java实现简单台球游戏

    这篇文章主要为大家详细介绍了Java实现简单台球游戏,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-07-07
  • java彩色瓷砖编程题分析

    java彩色瓷砖编程题分析

    这篇文章主要介绍了java彩色瓷砖编程题的详细解题思路以及解决方法分享,对此有兴趣的参考下。
    2018-02-02
  • SpringBoot自定义Banner使用详解

    SpringBoot自定义Banner使用详解

    这篇文章主要介绍了SpringBoot自定义Banner使用详解,启动 Spring Boot 时,几乎总是能在控制台上方看到如下横幅,这个也叫字符画、英文ASCII艺术字,这就是banner,我们来看一下如何使用,需要的朋友可以参考下
    2024-01-01
  • Java中创建线程的两种方式详细说明

    Java中创建线程的两种方式详细说明

    这篇文章主要介绍了Java中创建线程的两种方式详细说明,Java使用java.lang.Thread类代表线程,所有的线程对象都必须是Thread类或其子类的实例,每个线程的作用是完成一定的任务,实际上就是执行一段程序流即一段顺序执行的代码,需要的朋友可以参考下
    2023-11-11
  • Java创建类模式_动力节点Java学院整理

    Java创建类模式_动力节点Java学院整理

    这篇文章主要为大家详细介绍了Java创建类模式的相关方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • 完美解决gson将Integer默认转换成Double的问题

    完美解决gson将Integer默认转换成Double的问题

    下面小编就为大家带来一篇完美解决gson将Integer默认转换成Double的问题。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-03-03
  • java高效打印一个二维数组的实例(不用递归,不用两个for循环)

    java高效打印一个二维数组的实例(不用递归,不用两个for循环)

    下面小编就为大家带来一篇java高效打印一个二维数组的实例(不用递归,不用两个for循环)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-03-03
  • 详解在Spring3中使用注解(@Scheduled)创建计划任务

    详解在Spring3中使用注解(@Scheduled)创建计划任务

    本篇文章主要介绍了详解在Spring3中使用注解(@Scheduled)创建计划任务,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-03-03
  • 最小树形图模板朱刘算法分享

    最小树形图模板朱刘算法分享

    这篇文章主要介绍了最小树形图模板朱刘算法,有需要的朋友可以参考一下
    2014-01-01

最新评论