RocketMQ NameServer保障数据一致性实现方法讲解

 更新时间:2022年12月12日 08:55:39   作者:小王曾是少年  
这篇文章主要介绍了RocketMQ NameServer保障数据一致性实现方法,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

路由注册角度

对于ZooKeeper这样的强一致性组件,使用主从分离的架构,数据只写到主节点,主从之间的数据同步通过内部机制来进行数据复制。

对于RocketMQ来说,NameServer节点之间是互相不进行通信的,这样也就无法进行数据复制。RocketMQ采用的机制是:在Broker节点启动的时候,轮询所有的NameServer节点,并与每个NameServer节点建立长连接,发送注册请求。

相应的,NameServer节点内部也会维护一个Broker列表,用来动态存储Broker的信息,做服务发现。

与此同时,Broker使用心跳机制来向所有NameServer节点证明自己是存活的,即定期发送心跳包;收到心跳包之后,NameServer节点会更新这个Broker的最新存活时间。

注意: NameServer节点在处理心跳包时,存在多个请求同时处理同一张表的情况,为了保证并发安全性,RocketMQ引入了读写锁(ReadWriteLock),保证了多个Producer并发读取路由信息不受影响,但同一时刻只能处理一个Broker发来的心跳包,这也符合读多写少的经典场景。

路由剔除

正常情况下:

如果Broker下线,则会与NameServer断开长连接,底层基于Netty的通道关闭监听器会监听到连接断开事件,然后将这个Broker信息剔除。

异常情况下:

NameServer有一个周期为10s的定时任务,定期扫描Broker表,如果超过120s没有收到某个Broker的心跳包,则会判定其失效并移除。

对于日常运维的需求,RocketMQ提供了优雅剔除路由信息的方式,即可以先禁止Broker的写权限,这样发送到这个Broker的请求都会收到一个NO_PERMISSION的响应,客户端自动重试其他的Broker

路由发现

生产者视角:

一般是在发送第一条消息时,才会根据TopicNameServer获取路由信息

消费者视角:

订阅的Topic一般是固定的,所以在启动时就会拉取

针对路由信息可能变化的场景,RocketMQ提供了定时拉取Topic最新路由信息的机制,以应对Broker集群发生变化的场景。

DefaultMQProducerDefaultMQConsumer有一个pollNameServerInterval的配置项,用于指定从NameServer获取路由信息的周期,其底层依赖MQClientInstance类,MQClientInstance类中的updateTopicRouteInfoFromNameServer方法,可以根据指定的时间间隔,周期性地从NameServer里拉取路由信息。在拉取时,会将当前启动的ProducerConsumer需要用到的Topic列表放到一个集合里,逐个进行更新,源码如下:

/**
* 更新单个Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
    return updateTopicRouteInfoFromNameServer(topic, false, null);
}
/**
* 更新单个Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                	// 使用默认TopicKey获取TopicRouteData
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }
                    if (changed) {
                    	// 克隆出一个实例cloneTopicRouteData : topicRouteData会被设置到下面的publishInfo/subscribeInfo 
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
						// 更新Broker地址相关信息,当某个Broker心跳超时后,会被从brokerAddrTable中移除
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }
                        // Update Pub info
                        {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQProducerInner> entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }
                        // Update sub info
                        {
                            Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQConsumerInner> entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                }
            } catch (MQClientException e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } catch (RemotingException e) {
                log.error("updateTopicRouteInfoFromNameServer Exception", e);
                throw new IllegalStateException(e);
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }
    return false;
}

Broker宕机时,还可以通过客户端的重试机制来解决,避免因为定时更新路由信息不及时导致的服务宕机~~

到此这篇关于RocketMQ NameServer保障数据一致性实现方法讲解的文章就介绍到这了,更多相关RocketMQ NameServer内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中List转Map List实现的几种姿势

    Java中List转Map List实现的几种姿势

    本文主要介绍了Java中List转Map List实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • 教你用Springboot实现拦截器获取header内容

    教你用Springboot实现拦截器获取header内容

    项目中遇到一个需求,对接上游系统是涉及到需要增加请求头,请求头的信息是动态获取的,需要动态从下游拿到之后转给上游,文中非常详细的介绍了该需求的实现,需要的朋友可以参考下
    2021-05-05
  • springcloud集成zookeeper的方法示例

    springcloud集成zookeeper的方法示例

    这篇文章主要介绍了springcloud集成zookeeper的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • Aspectj与Spring AOP的对比分析

    Aspectj与Spring AOP的对比分析

    这篇文章主要介绍了Aspectj与Spring AOP的对比分析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • SpringBoot设置首页(默认页)跳转功能的实现方案

    SpringBoot设置首页(默认页)跳转功能的实现方案

    这篇文章主要介绍了SpringBoot设置首页(默认页)跳转功能,本文通过两种方案,给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-07-07
  • JavaWeb导出Excel文件并弹出下载框

    JavaWeb导出Excel文件并弹出下载框

    这篇文章主要为大家详细介绍了JavaWeb导出Excel文件并弹出下载框的相关资料,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • Java Map遍历2种实现方法代码实例

    Java Map遍历2种实现方法代码实例

    这篇文章主要介绍了Java Map遍历2种实现方法代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • java 线程池如何执行策略又拒绝哪些策略

    java 线程池如何执行策略又拒绝哪些策略

    这篇文章主要介绍了java 线程池如何执行策略又拒绝哪些策略,文章通过线程池的执行方法 execute() 展开全篇内容,需要的小伙伴可以参考一下
    2022-05-05
  • 一个Java配置文件加密解密工具类分享

    一个Java配置文件加密解密工具类分享

    在 JavaEE 配置文件中,例如 XML 或者 properties 文件,由于某些敏感信息不希望普通人员看见,则可以采用加密的方式存储,程序读取后进行解密
    2014-04-04
  • SpringBoot3集成和使用Jasypt的代码详解

    SpringBoot3集成和使用Jasypt的代码详解

    随着信息安全的日益受到重视,加密敏感数据在应用程序中变得越来越重要,Jasypt作为一个简化Java应用程序中数据加密的工具,为开发者提供了一种便捷而灵活的加密解决方案,本文将深入解析Jasypt的工作原理,需要的朋友可以参考下
    2024-01-01

最新评论