RocketMQ之Consumer整体介绍启动源码分析

 更新时间:2023年05月09日 10:22:33   作者:林师傅  
这篇文章主要为大家介绍了RocketMQ源码分析之Consumer整体介绍启动分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

从本篇文章开始,我们将逐步开始分析Consumer的源码,首先我们将整体介绍Consumer的接口和相关实现类以及DefaultMQPushConsumer的主要API和关键属性,然后我们将分析Consumer的启动过程源码,通过对启动过程的分析,之前我们分析过Producer和Broker的启动源码,Consumer的启动源码与Producer还是有很多相似的地方。

Consumer整体介绍

Consumer实现类

RocketMQ给我们提供的Consumer实现类如下图所示,包括推送式的DefaultMQPushConsumer和拉取式的DefaultMQPullConsumerDefaultLitePullConsumer,从图中可以看到DefaultMQPullConsumer已经被标注为deprecated,如果需要使用拉取式的Consumer,官方推荐使用DefaultLitePullConsumer。

Consumer消费类型

  • 拉取式消费

Consumer主动从Broker拉去消息,消费消息的主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过这种方式实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。

  • 推送式消费

该模式下Broker收到数据后会主动推送给Consumer,这种方式一般实时性比较高。

RocketMQ官方更推荐我们在日常工作中使用DefaultMQPushConsumer,它已经能够满足我们大多数使用场景。从技术上讲,这个DefaultMQPushConsumer客户端实际上是底层拉取服务的包装器。当从代理中提取的消息到达时,它大致调用注册的回调处理程序来馈送消息。本篇文章,我们将介绍DefaultMQPushConsumer的启动流程

DefaultMQPushConsumer主要API

DefaultMQPushConsumer实现了MQConsumer和MQPushConsumer接口,DefaultMQPushConsumer的主要API都在这两个接口中定义了,如下所示

// org.apache.rocketmq.client.consumer.MQConsumer
public interface MQConsumer extends MQAdmin {
    // 如果消费失败,消息将被发送回代理,并延迟消耗一些时间
    void sendMessageBack(final MessageExt msg/*消息*/, final int delayLevel/*延迟级别*/, final String brokerName);  
    // 根据topic从使用者缓存中获取消息队列
    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}
// org.apache.rocketmq.client.consumer.MQPushConsumer
public interface MQPushConsumer extends MQConsumer {
    // 启动Consumer
    void start() throws MQClientException;
    // 关闭Consumer
    void shutdown();
    // 注册并发消息Listener
    void registerMessageListener(final MessageListenerConcurrently messageListener);
    // 注册顺序消息Listener,将会有序地接收消息。一个队列一个线程
    void registerMessageListener(final MessageListenerOrderly messageListener);
    // 订阅Topic
    void subscribe(final String topic, final String subExpression) throws MQClientException;
    // 退订topic
    void unsubscribe(final String topic);
}

DefaultMQPushConsumer关键属性

DefaultMQPushConsumer的关键属性如下所示

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    // DefaultMQPushConsumer的默认实现,DefaultMQPushConsumer中大部分功能都是对它的代理
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    // 相同角色的消费者需要具有完全相同的subscriptions和consumerGroup才能正确实现负载平衡,它需要全局唯一
    private String consumerGroup;
    // 消息模型定义了如何将消息传递到每个消费者客户端的方式,默认是集群模式
    private MessageModel messageModel = MessageModel.CLUSTERING;
  	// 第一次消费时指定的消费策略,默认是CONSUME_FROM_LAST_OFFSET
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
  	// 队列分配算法,指定如何将消息队列分配给每个使用者客户端。
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    // 订阅关系
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
    // 消息监听器
    private MessageListener messageListener;
    // 消息消费进度存储器
    private OffsetStore offsetStore;
  	// 最小消费线程数
    private int consumeThreadMin = 20;
    // 最大消费线程数
    private int consumeThreadMax = 20;
    // 推送模式下拉去消息的间隔时间,默认一次拉取消息完成后立刻继续拉取
    private long pullInterval = 0;
    // 批量消费数量
    private int consumeMessageBatchMaxSize = 1;
    // 批量拉取的数量
    private int pullBatchSize = 32;
  	// 每次拉取时是否更新订阅关系,默认是false
    private boolean postSubscriptionWhenPull = false;
    // 消息最大重试次数,如果消息消费最大次数超过maxReconsumeTimes还未成功,则消息将被转移到一个失败队列
    private int maxReconsumeTimes = -1;
    //延迟将该队列的消息提交到消费者线程的等待时间,默认延迟1s
    private long suspendCurrentQueueTimeMillis = 1000;
    // 消息阻塞消费线程的最大超时时间,默认15分钟
    private long consumeTimeout = 15;
    // 关闭使用者时等待消息的最长时间,0表示没有等待。
    private long awaitTerminationMillisWhenShutdown = 0;
}

Consumer消费模式

Consumer提供下面两种消费模式,由上面DefaultMQPushConsumer的messageModel定义

  • 广播模式(BROADCASTING)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息会被相同Consumer Group中的所有Consumer消费

  • 集群模式(CLUSTERING)

集群模式是Consumer默认的消费模式,集群消费模式下,相同Consumer Group的每个Consumer按照负载均衡策略分摊同一个Topic消息,即每条消息只会被相同Consumer Group中的一个Consumer消费

Consumer消费策略

Consumer主要提了下面三种消费策略

  • CONSUME_FROM_LAST_OFFSET

这是Consumer默认的消费策略,它分为两种情况,如果Broker的磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期,并被删除,则从最大偏移量开始消费。

  • CONSUME_FROM_FIRST_OFFSET

从最早可用的消息开始消费

  • CONSUME_FROM_TIMESTAMP

从指定的时间戳开始消费,这意味着在consumeTimestamp之前生成的消息将被忽略

Consumer使用

要使用Consumer开始消费消息,至少需要下面5个步骤

public static void main(String[] args) throws MQClientException {
    // 1. 传入CONSUMER_GROUP,创建DefaultMQPushConsumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  	// 2. 设置namesrvAddr
  	consumer.setNamesrvAddr("127.0.0.1:9876");
    // 3. 订阅Topic 
    consumer.subscribe(TOPIC, "*");
    // 4.注册消息Listener
    consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -&gt; {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 5.启动Consumer
    consumer.start();
}

DefaultMQPushConsumer源码分析

启动源码分析

DefaultMQPushConsumer只是设置属性,Consumer的初始化实际是在DefaultMQPushConsumer#start中执行的,DefaultMQPushConsumer#start实际调用了DefaultMQPushConsumerImpl#start执行初始化。

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
public void start() throws MQClientException {
    // consumerGroup封装namespace
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // DefaultMQPushConsumerImpl启动
    this.defaultMQPushConsumerImpl.start();
    // 消息轨迹跟踪服务,默认null
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

下面我们来分步骤分析DefaultMQPushConsumerImpl#start代码

第一步:

  • 先将Consumer的状态更新为START_FAILED
  • 校验Consumer的配置。主要校验ConsumerGroup,
  • 消费模式校验(MessageModel),消费开始位置(ConsumeFromWhere),消费时间戳(默认是半小时之前),队列分配策略(默认是AllocateMessageQueueAveragely),订阅Topic和Subscription关系校验,消息监听器(MessageListener)校验等。
  • 将Consumer中的订阅关系拷贝到RebalanceImpl中,Consumer中订阅关系的来源主要包括DefaultMQPushConsumerImpl#subscribe方法获取,也会订阅重试topic,其主题名为%RETRY%+消费者组名,消费者启动时会自动订阅该主题
  • 如果是集群模式,则修改消费者名称为PID#时间戳
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
  //...
  // 状态先设置为启动失败
  this.serviceState = ServiceState.START_FAILED;
  // 校验配置,ConsumerGroup校验,
  this.checkConfig();
  // 订阅关系copy到RebalanceImpl中
  this.copySubscription();
  // 如果是集群模式,消费者名称如果是DEFAULT,则会改成:PID#时间戳
  if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
      this.defaultMQPushConsumer.changeInstanceNameToPID();
  }
  //...
}

第二步:

主要是初始化MQClientInstance、RebalanceImpl和pullAPIWrapper。

**MQClientInstance:**是消息拉取服务,主要用于拉取消息,同一个进程内的所有Consumer会使用同一个MQClientInstance

**RebalanceImpl:**是消费者负载均衡服务,用于确定消费者消费的消息队列以及负载均衡。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 生成一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置消费者组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 消息消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置消息消费模式
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 构建拉消息包装器
this.pullAPIWrapper = new PullAPIWrapper(
    mQClientFactory,
    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

第三步:

根据消息消费模式的不同设置不同的消息消费进度存储器(OffsetStore),如果是广播模式,则使用LocalFileOffsetStore作为消息进度存储器,如果是集群模式则使用RemoteBrokerOffsetStore作为消息进度存储器。创建完成之后调用load()方法加载偏移量,如果是LocalFileOffsetStore将会从本地加载。

广播模式下:LocalFileOffsetStore将消费进度存储在Consumer本地的${user.home}/.rocketmq_offsets/clientId/consumerGroup/offsets.json文件中

集群模式下:RemoteBrokerOffsetStore将消费进度存储在Broker

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            // 如果是广播模式,则使用LocalFileOffsetStore存储偏移量
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        case CLUSTERING:
            // 如果是集群模式,则使用RemoteBrokerOffsetStore存储偏移量
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 如果是广播模式,则从本地文件load偏移量,如果是集群模式则是一个空实现
this.offsetStore.load();

第四步:

根据消息监听器的类型不同创建不同的消息消费服务(并发/顺序消息消费服务),并启动。然后注册消费者组和消费者信息到MQClientInstance中的consumerTable中,注册成功后启动MQClientInstance客户端通信实例。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 如果是顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
// 如果是并发消费
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 将自身注册到MQClientInstance
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// ...
mQClientFactory.start();

第五步:

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 向Namesrv拉取并更新当前消费者订阅topic路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确
this.mQClientFactory.checkClientInBroker();
// 给所有Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒负载均衡服务rebalanceService,并进行rebalance
this.mQClientFactory.rebalanceImmediately();

总结

本篇文章我们介绍了Consumer的API,属性,接口和实现类,通过对这几部分的了解,我们能够对Consumer有一个整体的认识。我们还分析了DefaultMQPushConsumer的启动的源码,通过对DefaultMQPushConsumer#start开始逐渐深入分析DefaultMQPushConsumer的启动过程,能够帮助我们对Consumer消费消息一些关键的类如MQClientInstance,OffsetStore,RebalanceImpl,ConsumeMessageService由一个初步的认识,由助于我们后续详细了解这些服务的工作原理。

以上就是RocketMQ 源码分析之Consumer整体介绍启动分析的详细内容,更多关于RocketMQ Consumer源码解析的资料请关注脚本之家其它相关文章!

相关文章

  • java事务回滚失败问题分析

    java事务回滚失败问题分析

    这篇文章主要介绍了java事务回滚失败问题分析,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • Spring中的事务隔离级别和传播行为

    Spring中的事务隔离级别和传播行为

    这篇文章主要介绍了Spring中的事务隔离级别和传播行为,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Java实现WebSocket四个步骤

    Java实现WebSocket四个步骤

    这篇文章主要为大家介绍了Java实现WebSocket的方法实例,只需要简单四个步骤,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • SpringMVC前端和后端数据交互总结

    SpringMVC前端和后端数据交互总结

    本篇文章主要介绍了SpringMVC前端和后端数据交互总结,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-03-03
  • SpringBoot Event实现异步消费机制的示例代码

    SpringBoot Event实现异步消费机制的示例代码

    这篇文章主要介绍了SpringBoot Event实现异步消费机制,ApplicationEvent以及Listener是Spring为我们提供的一个事件监听、订阅的实现,内部实现原理是观察者设计模式,文中有详细的代码示例供大家参考,需要的朋友可以参考下
    2024-04-04
  • Springcloud Bus消息总线原理是实现详解

    Springcloud Bus消息总线原理是实现详解

    Spring Cloud Bus 使用轻量级的消息代理来连接微服务架构中的各个服务,可以将其用于广播状态更改(例如配置中心配置更改)或其他管理指令,本文将对其用法进行详细介绍
    2022-09-09
  • SpringBoot集成netty实现websocket通信功能

    SpringBoot集成netty实现websocket通信功能

    Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,WebSocket 是一种网络通信协议,相比传统的HTTP协议,本文给大家介绍了SpringBoot集成netty实现websocket通信功能,需要的朋友可以参考下
    2024-03-03
  • SpringBoot整合SpringSecurity和JWT和Redis实现统一鉴权认证

    SpringBoot整合SpringSecurity和JWT和Redis实现统一鉴权认证

    Spring Security是一个可以为Java应用程序提供全面安全服务的框架,同时它也可以轻松扩展以满足自定义需求,本文主要介绍了SpringBoot整合SpringSecurity和JWT和Redis实现统一鉴权认证,感兴趣的可以了解一下
    2023-11-11
  • javaSystem.out.println()输出byte[]、char[]异常的问题详析

    javaSystem.out.println()输出byte[]、char[]异常的问题详析

    这篇文章主要给大家介绍了关于javaSystem.out.println()输出byte[]、char[]异常问题的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起看看啊
    2019-01-01
  • springBoot 项目排除数据库启动方式

    springBoot 项目排除数据库启动方式

    这篇文章主要介绍了springBoot 项目排除数据库启动方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09

最新评论