详解RocketMQ 消费端如何监听消息

 更新时间:2022年12月15日 14:51:30   作者:小郭的技术笔记  
这篇文章主要为大家介绍了RocketMQ 消费端如何监听消息示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的,

那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~

流程地图

源码跟踪

这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK

核心模块(消息拉取)

入口:this.pullMessageService.start();

  • 执行线程池run方法,轮流从pullRequestQueue中获取PullRequest

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

声明一个阻塞队列用来存放 PullRequest 对象

PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道

protected volatile boolean stopped = false;
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 判断启动状态
    while (!this.isStopped()) {
        try {
            // 取出一个PullRequest对象
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}
  • 获取消费队列快照,判断状态是否正常,同时更新最后一次拉取时间

PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 获取消费队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 设置最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
  • 校验客户端运行状态
// 校验状态
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The consumer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}

如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        // 最后将pullRequest放入pullRequestQueue中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}
  • 校验消费队列中的消息数量和大小是否符合设置

如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取

// 缓存消息条数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 当队列中的消息跳过,超过设置 则延迟拉取消息
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
  • 根据主题获取配置的订阅关系

这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}
  • 如果为集群模式,则从内存中读取位置

通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方

广播模式:消息进度存储在本地文件

集群模式:消息进度存储在Broker 服务器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    // 从内存中读取位置
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}
  • 内核中拉取消息(最重要的模块)

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

我们看到他有非常多的参数

拉取流程

  • 通过BrokerName找到对应的Broker
// step 1 通过BrokerName找到对应的Broker
FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
  • 如果没有找到对应的,则更新路由信息
// step 2 如果没有找到对应的,则更新路由信息
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}
  • 检查Broker版本和Tag信息
// check version
if (!ExpressionType.isTagType(expressionType)
    &amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
  • 设置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
  • 调用pullMessage方法拉取消息,返回拉取结果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);

因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

这里我们就先不细看了

拉取消息处理

  • 如果PullCallback回调成功,则对结果进行处理
// 处理pullResult数据
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList

将pullResult 转成 PullResultExt,转换消息格式为List

PullResultExt pullResultExt = (PullResultExt) pullResult;
// 转换消息格式为List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

执行消息过滤,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

设置消息的transactionId、扩展属性、BrokerName名称,放入List中

for (MessageExt msg : msgListFilterAgain) {
    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(traFlag)) {
        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    }
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
        Long.toString(pullResult.getMinOffset()));
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
        Long.toString(pullResult.getMaxOffset()));
    msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);

当pullStatus为FOUND,消息进行提交消费的请求

  • 获取第一条消息的offset(偏移量)
// 获取第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  • 将读取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量

public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        // 上锁
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // 循环读取消息list,存入msgTreeMap
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);
            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
            if (!msgs.isEmpty()) {
                // 获取最后一条消息
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                // 获取最大偏移量
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                ...
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    }
    ...
}
  • 提交消费请求,消息提交到内部的线程池
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

获取 ConsumeRequest对象,拿到当前主题的监听器

这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • ProcessQueue中移除已经处理的消息,同时更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())
            return;
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            ...
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            ...
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                // 如果存在失败消息,则过5秒在定时执行
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
                ...
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新Offset位置  
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • 最后pullRequest放入pullRequestQueue中

入口:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

消息消费进度提交

  • 成功消费一条消息后,更新本地缓存表
  • 每5s向Broker提交消息消费进度
  • Broker每5s将进度持久化到consumerOffset.json

总结

目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~

以上就是详解RocketMQ 消费端如何监听消息的详细内容,更多关于RocketMQ 消费端监听消息的资料请关注脚本之家其它相关文章!

相关文章

  • Java并发编程变量可见性避免指令重排使用详解

    Java并发编程变量可见性避免指令重排使用详解

    这篇文章主要为大家介绍了Java并发编程变量可见性避免指令重排使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • spring boot实现自动输出word文档功能的实例代码

    spring boot实现自动输出word文档功能的实例代码

    这篇文章主要介绍了spring boot实现自动输出word文档功能的实例代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • Java面向对象的封装特征深度解析

    Java面向对象的封装特征深度解析

    在面向对象程式设计方法中,封装(英语:Encapsulation)是指一种将抽象性函式接口的实现细节部分包装、隐藏起来的方法。封装可以被认为是一个保护屏障,防止该类的代码和数据被外部类定义的代码随机访问
    2021-10-10
  • java线性表排序示例分享

    java线性表排序示例分享

    这篇文章主要介绍了java线性表排序示例,需要的朋友可以参考下
    2014-03-03
  • 你要知道IDEA的这些必备插件

    你要知道IDEA的这些必备插件

    这篇文章主要介绍了你要知道IDEA的这些必备插件,文中有非常详细的图文示例及代码,对正在使用IDEA的小伙伴们有很好的帮助哟,需要的朋友可以参考下
    2021-05-05
  • MybatisPlus多表查询及分页查询完整代码

    MybatisPlus多表查询及分页查询完整代码

    这篇文章主要介绍了MybatisPlus多表查询及分页查询完整代码,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-08-08
  • Mybatis日志模块的适配器模式详解

    Mybatis日志模块的适配器模式详解

    这篇文章主要介绍了Mybatis日志模块的适配器模式详解,,mybatis用了适配器模式来兼容这些框架,适配器模式就是通过组合的方式,将需要适配的类转为使用者能够使用的接口
    2022-08-08
  • Maven仓库无用文件和文件夹清理的方法实现

    Maven仓库无用文件和文件夹清理的方法实现

    这篇文章主要介绍了Maven仓库无用文件和文件夹清理的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • java Springboot对接开发微信支付详细流程

    java Springboot对接开发微信支付详细流程

    最近要做一个微信小程序,需要微信支付,所以研究了下怎么在java上集成微信支付功能,下面这篇文章主要给大家介绍了关于java Springboot对接开发微信支付的相关资料,需要的朋友可以参考下
    2024-08-08
  • Spring Data环境搭建实现过程解析

    Spring Data环境搭建实现过程解析

    这篇文章主要介绍了Spring Data环境搭建实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08

最新评论