Java的RocketMq队列之消息可靠性详解

 更新时间:2024年01月08日 09:18:24   作者:猎户星座。  
这篇文章主要介绍了Java的RocketMq队列之消息可靠性详解,生产者通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer,所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失,需要的朋友可以参考下

1. 消息的发送流程

一条消息从生产到被消费,将会经历三个阶段:

  • 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
  • 存储阶段,消息将会存储在 Broker 端磁盘中
  • 消息阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

2. 生产阶段

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

发送模式

可靠同步发送

  • 原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
  • 场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
  • 类似推拉的形式 发送 ->同步返回 ->发送 ->同步返回

可靠异步发送

  • 原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
  • 场景:异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 耗时比较长的 可以不需要同步返回给用户的

单向(Oneway)发送

  • 原理:单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
  • 场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

RocketMQ 发送消息示例代码如下:

DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
try {
    SendResult sendResult = mqProducer.send(msg);
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

send 方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。

消息发送成功仅代表消息已经到了 Broker 端,Broker 在不同配置下,可能会返回不同响应状态:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

引用官方状态说明:

image-20200319220927210

另外 RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。

DefaultMQProducer mqProducer = new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.setRetryTimesWhenSendFailed(5);
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
try {
    // 异步发送消息到,主线程不会被阻塞,立刻会返回
    mqProducer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息发送成功,
        }
        @Override
        public void onException(Throwable e) {
            // 消息发送失败,可以持久化这条数据,后续进行补偿处理
        }
    });
} catch (RemotingException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:

// 同步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 异步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);

总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,producer同步等待broker响应消息的发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。 

3. Broker 存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改 Broker 端配置如下:

## 默认情况为 ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH

若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

注:master 配置:flushDiskType = SYNC_FLUSH

此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

异步复制与同步复制区别:

  • Sync Broker:生产者发送的每一条消息都至少同步复制到一个slave后才返回告诉生产者成功,即“同步双写”。
  • Async Broker:生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。

Broker master 节点 同步复制配置如下:

## 默认为 ASYNC_MASTER 
brokerRole=SYNC_MASTER

如果 slave 节点未在指定时间内同步返回响应,生产者将会收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。

总结

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。 RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:

## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
 
## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

4. 消费阶段

从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。

何为At least Once?

Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。

通常消费消息的ack机制一般分为两种思路:

1、先提交后消费;

2、先消费,消费成功后再提交;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

消息消费的代码如下:

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
 
// 设置NameServer的地址
consumer.setNamesrvAddr("namesrvAddr");
 
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("test_topic", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 执行业务逻辑
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动消费者实例
consumer.start();

以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

5. 总结

最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性

到此这篇关于Java的RocketMq队列之消息可靠性详解的文章就介绍到这了,更多相关RocketMq消息可靠性内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java异常处理try catch的基本使用

    Java异常处理try catch的基本使用

    大家好,本篇文章主要讲的是Java异常处理try catch的基本使用,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-02-02
  • 深入理解 Java注解及实例

    深入理解 Java注解及实例

    这篇文章主要介绍了深入理解 Java注解及实例的相关资料,希望通过本文大家能够掌握java注解的知识,需要的朋友可以参考下
    2017-09-09
  • java 将方法作为传参--多态的实例

    java 将方法作为传参--多态的实例

    下面小编就为大家带来一篇java 将方法作为传参--多态的实例。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • Java 堆排序实例(大顶堆、小顶堆)

    Java 堆排序实例(大顶堆、小顶堆)

    下面小编就为大家分享一篇Java 堆排序实例(大顶堆、小顶堆),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-12-12
  • 实例解析如何正确使用Java数组

    实例解析如何正确使用Java数组

    同一种类型数据的集合。其实数组就是一个容器。运算的时候有很多数据参与运算,那么首先需要做的是什么下面我们就一起来看看。
    2016-07-07
  • 详解Springboot分布式限流实践

    详解Springboot分布式限流实践

    这篇文章主要介绍了详解Springboot分布式限流实践 ,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-06-06
  • Mybatis-plus自动填充不生效或自动填充数据为null原因及解决方案

    Mybatis-plus自动填充不生效或自动填充数据为null原因及解决方案

    本文主要介绍了Mybatis-plus自动填充不生效或自动填充数据为null原因及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • 详解Java如何进行Base64的编码(Encode)与解码(Decode)

    详解Java如何进行Base64的编码(Encode)与解码(Decode)

    这篇文章主要介绍了详解Java如何进行Base64的编码(Encode)与解码(Decode),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03
  • SpringCloud使用Zookeeper作为配置中心的示例

    SpringCloud使用Zookeeper作为配置中心的示例

    这篇文章主要介绍了SpringCloud使用Zookeeper作为配置中心的示例,帮助大家更好的理解和学习使用SpringCloud,感兴趣的朋友可以了解下
    2021-04-04
  • Java实现线性表的顺序存储

    Java实现线性表的顺序存储

    这篇文章主要为大家详细介绍了Java实现线性表的顺序存储,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-10-10

最新评论