RocketMQ中消费者概念和消费流程详解

 更新时间:2023年10月11日 09:22:55   作者:金甲虫Scarb  
这篇文章主要介绍了RocketMQ中消费者概念和消费流程详解,RocketMQ是一款高性能、高可靠性的分布式消息中间件,消费者是RocketMQ中的重要组成部分,消费者负责从消息队列中获取消息并进行处理,需要的朋友可以参考下

1. 背景

RocketMQ 的消费可以算是 RocketMQ 的业务逻辑中最复杂的一块。这里面涉及到许多消费模式和特性。本想一篇文章写完,写到后面发现消费涉及到的内容太多,于是决定分多篇来写。本文作为消费系列的第一篇,主要讲述 RocketMQ 消费涉及到的模式和特性,也会概括性地讲一下消费流程。

我将 RocketMQ 的消费流程大致分成 4 个步骤

重平衡消费者拉取消息Broker 接收拉取请求后从存储中查询消息并返回消费者消费消息

每个步骤都会用一篇文章来讲解。

先了解一下 RocketMQ 消费涉及到地概念

2. 概念简述

2.1 消费组概念与消费模式

和大多数消息队列一样,RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。在了解它们之前,需要先引入消费组的概念。

2.1.1 消费组

一个消费者实例即是一个消费者进程,负责消费消息。单个消费者速度有限,在实际使用中通常会采用多个消费者共同消费同样的 Topic 以加快消费速度。这多个消费同样 Topic 的消费者组成了消费者组。

消费组是一个逻辑概念,它包含了多个同一类的消费者实例,通常这些消费者都消费同一类消息(都消费相同的 Topic)且消费逻辑一致。

消费组的引入是用来在消费消息时更好地进行负载均衡和容错。

2.1.2 广播消费模式(BROADCASTING)

广播消费模式即全部的消息会广播分发到所有的消费者实例,每个消费者实例会收到全量的消息(即便消费组中有多个消费者都订阅同一 Topic)。

如下图所示,生产者发送了 5 条消息,每个消费组中的消费者都收到全部的 5 条消息。

广播模式使用较少,适合各个消费者都需要通知的场景,如刷新应用中的缓存。

广播消费模式

注意事项:

  1. 广播消费模式下不支持 顺序消息。
  2. 广播消费模式下不支持 重置消费位点。
  3. 每条消息都需要被相同订阅逻辑的多台机器处理。
  4. 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。如果消费进度文件丢失,存在消息丢失的可能。
  5. 广播模式下,消息队列 RocketMQ 版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  6. 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  7. 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  8. 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

2.1.3 集群消费模式(CLUSTERING)

集群消费模式下,同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

更具体一点,在同一消费组中的不同消费者会根据负载机制来平均地订阅 Topic 中的每个 Queue。(默认 AVG 负载方式)

广播消费模式

RocketMQ 默认使用集群消费模式,这也是大部分场景下会使用到的消费模式。

2.2 消费者拉取消息模式

2.2.1 Pull

指消费者主动拉取消息进行消费,主动从 Broker 拉取消息,主动权由消费者应用控制。

2.2.2 Push

Broker 主动将消息 Push 给消费者,Broker 收到消息就会主动推送到消费者端。该模式的消费实时性较高,也是主流场景中普遍采用的消费形式。

消费者组中的消费者实例会根据预设的负载均衡算法对 Topic 中的 Queue 进行均匀的订阅,每个 Queue 最多只能被一个消费者订阅。

在 RocketMQ 中,Push 消费其实也是由 Pull 消费(拉取)实现。Push 消费只是通过客户端 API 层面的封装让用户感觉像是 Broker 在推送消息给消费者。

2.2.3 POP

RocketMQ 5.0 引入的新消费形式,是 Pull 拉取的另一种实现。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分别消费重试 Topic 和普通 Topic)。

POP 与 Pull 可以通过一个开关实时进行切换。POP 模式下,Broker 来控制每个消费者消费的队列和拉取的消息,把重平衡逻辑从客户端移到了服务端。

主要解决了原来 Push 模式消费的以下痛点:

富客户端:客户端逻辑比较重,多语言支持不友好队列独占:Topic 中的一个 Queue 最多只能被 1 个 Push 消费者消费,消费者数量无法无限扩展。且消费者 hang 住时该队列的消息会堆积。消费后更新 offset:本地消费成功才会提交 offset

RocketMQ 5.0 的轻量化 gRPC 客户端就是基于 POP 消费模式开发

2.3 队列负载机制与重平衡

在集群消费模式下,消费组中的消费者共同消费订阅的 Topic 中的所有消息,这里就存在 Topic 中的队列如何分配给消费者的问题。

2.3.1 队列负载机制

RocketMQ Broker 中的队列负载机制将一个 Topic 的不同队列按照算法尽可能平均地分配给消费者组中的所有消费者。RocketMQ 预设了多种负载算法供不同场景下的消费。

AVG:将队列按数量平均分配给多个消费者,按 Broker 顺序先分配第一个 Broker 的所有队列给第一个消费者,然后给第二个。

AVG_BY_CIRCLE:将 Broker 上的队列轮流分给不同消费者,更适用于 Topic 在不同 Broker 之间分布不均匀的情况。

默认采用 AVG 负载方式。

2.3.2 重平衡(Rebalance)

为消费者分配队列消费的这一个负载过程并不是一劳永逸的,比如当消费者数量变化、Broker 掉线等情况发生后,原先的负载就变得不再均衡,此时就需要重新进行负载均衡,这一过程被称为重平衡机制。

每隔 20s,RocketMQ 会进行一次检查,检查队列数量、消费者数量是否发生变化,如果变化则触发消费队列重平衡,重新执行上述负载算法。

2.4 消费端高可靠

2.4.1 重试-死信机制

在实际使用中,消息的消费可能出现失败。RocketMQ 拥有重试机制和死信机制来保证消息消费的可靠性。

正常消费:消费成功则提交消费位点

重试机制:如果正常消费失败,消息会被消费者发回 Broker,放入重试 Topic: %RETRY%消费者组 。最多重试消费 16 次,重试的时间间隔逐渐变长。(消费者组会自动订阅重试 Topic)。

这里地延迟重试采用了 RocketMQ 的延迟消息,重试的 16 次时间间隔为延迟消息配置的每个延迟等级的时间(从第三个等级开始)。如果修改延迟等级时间的配置,重试的时间间隔也会相应发生变化。但即便延迟等级时间间隔配置不足 16 个,仍会重试 16 次,后面按照最大的时间间隔来重试。

死信机制:如果正常消费和重试 16 次均失败,消息会保存到死信 Topic %DLQ%消费者组 中,此时需人工介入处理

2.4.2 队列负载机制与重平衡

当发生 Broker 挂掉或者消费者挂掉时,会引发重平衡,可以自动感知有组件挂掉的情况并重新调整消费者的订阅关系。

2.5 并发消费与顺序消费

在消费者客户端消费时,有两种订阅消息的方式,分别是并发消费和顺序消费。广播模式不支持顺序消费,仅有集群模式能使用顺序消费。

需要注意的是,这里所说的顺序消费指的是队列维度的顺序,即在消费一个队列时,消费消息的顺序和消息发送的顺序一致。如果一个 Topic 有多个队列, 是不可能达成 Topic 级别的顺序消费的,因为无法控制哪个队列的消息被先消费。Topic 只有一个队列的情况下能够实现 Topic 级别的顺序消费。

具体顺序生产和消费代码见 官方文档。

顺序生产的方式为串行生产,并在生产时指定队列。

并发消费的方式是调用消费者的指定 MessageListenerConcurrently 作为消费的回调类,顺序消费则使用 MessageListenerOrderly 类进行回调。处理这两种消费方式的消费服务也不同,分别是 ConsumeMessageConcurrentlyService ConsumeMessageOrderlyService

顺序消费的大致原理是依靠两组锁,一组在 Broker 端(Broker 锁),锁定队列和消费者的关系,保证同一时间只有一个消费者在消费;在消费者端也有一组锁(消费队列锁)以保证消费的顺序性。

2.6 消费进度保存和提交

消费者消费一批消息完成之后,需要保存消费进度。如果是集群消费模式,还需要将消费进度让其他消费者知道,所以需要提交消费进度。这样在消费者重启或队列重平衡时可以根据消费进度继续消费。

不同模式下消费进度保存方式的不同:

  • 广播模式:保存在消费者本地。因为每个消费者都需要消费全量消息消息。在 LocalfileOffsetStore 当中。
  • 集群模式:保存在 Broker,同时消费者端缓存。因为一个 Topic 的消息只要被消费者组中的一个消费者消费即可,所以消息的消费进度需要统一保存。通过 RemoteBrokerOffsetStore 存储。

集群模式下,消费者端有定时任务,定时将内存中的消费进度提交到 Broker,Broker 也有定时任务将内存中的消费偏移量持久化到磁盘。此外,消费者向 Broker 拉取消息时也会提交消费偏移量。注意,消费者线程池提交的偏移量是线程池消费的这一批消息中偏移量最小的消息的偏移量。

  1. 消费完一批消息后将消息消费进度存在本地内存
  2. 消费者中有一个定时线程,每 5s 将内存中所有队列的消费偏移量提交到 Broker
  3. Broker 收到消费进度先缓存到内存,有一个定时任务每隔 5s 将消息偏移量持久化到磁盘
  4. 消费者向 Broker 拉取消息时也会将队列的消息偏移量提交到 Broker

3. 消费流程

这张图是阿里云的文章讲解消费时用到的,能够清晰地表示客户端 Push 模式并发消费流程。

img

从左上角第一个方框开始看

  1. 消费者启动时唤醒重平衡服务 RebalanceService,重平衡服务是客户端开始消费的起点。
  2. 重平衡服务会周期性(每 20s)执行重平衡方法 doRebalance),查询所有注册的 Broker,根据注册的 Broker 数量为自身分配负载的队列 rebalanceByTopic()
  3. 分配完队列后,会为每个分配到的新队列创建一个消息拉取请求 pullRequest,这个拉取请求中保存一个处理队列 processQueue,即图中的红黑树(TreeMap),用来保存拉取到的消息。红黑树保存消息的顺序。
  4. 消息拉取线程应用生产-消费模式,用一个线程从拉取请求队列 pullRequestQueue 中弹出拉取请求,执行拉取任务,将拉取到的消息放入处理队列。
  5. 拉取请求在一次拉取消息完成之后会复用,重新被放入拉取请求队列 pullRequestQueue 中
  6. 拉取完成后,在 NettyClientPublicExecutorThreadPool 线程池异步处理结果,将拉取到的消息放入处理队列,然后调用 consumeMessageService.submitConsumeRequest,将处理队列和 多个消费任务提交到消
  7. 费线程池。每个消费任务消费 1 批消息(1 批默认为 1 条)
  8. 每个消费者都有一个消费线程池 consumeMessageThreadPool ,默认有 20 个消费线程。
  9. 消费线程池的每个消费线程会尝试从消费任务队列中获取消费请求,执行消费业务逻辑 listener.consumeMessage。
  10. 消费完成后,如果消费成功,则更新偏移量 updateOffset(先更新到内存 offsetTable,定时上报到 Broker。Broker 端也先放到内存,定时刷盘)。

到此这篇关于RocketMQ中消费者概念和消费流程详解的文章就介绍到这了,更多相关RocketMQ中的消费者内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈Spring Boot中如何干掉if else的方法

    浅谈Spring Boot中如何干掉if else的方法

    这篇文章主要介绍了Spring Boot中如何干掉if else的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • SpringBoot 文件或图片上传与下载功能的实现

    SpringBoot 文件或图片上传与下载功能的实现

    这篇文章主要介绍了SpringBoot 文件或图片上传与下载功能的实现,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • springboot配置redis过程详解

    springboot配置redis过程详解

    这篇文章主要介绍了springboot配置redis过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • Windows下使用IDEA搭建Hadoop开发环境的详细方法

    Windows下使用IDEA搭建Hadoop开发环境的详细方法

    这篇文章主要介绍了Windows下使用IDEA搭建Hadoop开发环境,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • mybatis if标签使用总结

    mybatis if标签使用总结

    这篇文章主要介绍了mybatis if标签使用总结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-12-12
  • Spring Cloud Gateway重试机制原理解析

    Spring Cloud Gateway重试机制原理解析

    这篇文章主要介绍了Spring Cloud Gateway重试机制原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Java使用MulticastSocket实现群聊应用程序

    Java使用MulticastSocket实现群聊应用程序

    这篇文章主要为大家详细介绍了Java使用MulticastSocket实现群聊应用程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • 解决Java执行Cmd命令出现的死锁问题

    解决Java执行Cmd命令出现的死锁问题

    这篇文章主要介绍了关于Java执行Cmd命令出现的死锁问题解决,解决方法就是在waitfor()方法之前读出窗口的标准输出、输出、错误缓冲区中的内容,本文给大家介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • JavaFX实现界面跳转

    JavaFX实现界面跳转

    这篇文章主要为大家详细介绍了JavaFX实现界面跳转,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-06-06
  • springboot ehcache 配置使用方法代码详解

    springboot ehcache 配置使用方法代码详解

    EhCache是一个比较成熟的Java缓存框架,Springboot对ehcache的使用非常支持,所以在Springboot中只需做些配置就可使用,且使用方式也简易,今天给大家分享springboot ehcache 配置使用教程,一起看看吧
    2021-06-06

最新评论