RocketMQ中的NameServer详细解析

 更新时间:2024年01月03日 10:39:22   作者:潜水路人甲  
这篇文章主要介绍了RocketMQ中的NameServer详细解析,NameServer是一个非常简单的Topic路由注册中心,支持Broker的动态注册与发现,因此不能保证NameServer的一致性,需要的朋友可以参考下

前言

NameServer是一个非常简单的Topic路由注册中心,支持Broker的动态注册与发现。

Producer和Conumser通过NameServer可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer各实例间相互不进行信息通讯,因此不能保证NameServer的一致性(Consistency),可以保证可用性(Availability)。

即选择了CAP中的AP。NameServer只能保证最终一致性,关于怎么保证最终一致性后文再讲。

现在先从NameServer的启动开始。

NameServer为namesrv模块

NamesrvStartup

public static NamesrvController main0(String[] args) {
//构造NamesrvController 
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            return controller;
}
    public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
        boolean initResult = controller.initialize();
//启动
        controller.start();
        return controller;
    }
 

NamesrvStartup作为NameServer的启动类,主要做了三件事:

  • 构造NamesrvController(NameServer控制器)
  • 加载初始化,由NamesrvController负责
  • 启动remotingServer,开启Netty服务

NamesrvController

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
    //nameSrv的配置
    private final NamesrvConfig namesrvConfig;
    //netty的配置
    private final NettyServerConfig nettyServerConfig;
 
    //执行单线程的任务调度,自定义编程名称
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
 
    //kv的配置管理
    private final KVConfigManager kvConfigManager;
 
    //路由管理器,有broker的ip和队列信息,producer发送的queue信息,consumer的pull的queue信息
    private final RouteInfoManager routeInfoManager;
 
    //namesrv的netty的服务端实现
    private RemotingServer remotingServer;
 
    //处理接受到请求事件的回调监听服务,主要处理netty的事件
    private BrokerHousekeepingService brokerHousekeepingService;
 
    private ExecutorService remotingExecutor;
 
    private Configuration configuration;
    private FileWatchService fileWatchService;
 
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        //构造kv配置的管理
        this.kvConfigManager = new KVConfigManager(this);
        //构造路由信息管理
        this.routeInfoManager = new RouteInfoManager();
        //构造网络连接事件管理
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        //配置
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

NamesrvController作为一个控制器主要负责NettyServer的创建,注册requestProcessor,启动NettyServer及各种task

    public boolean initialize() {
 
        //加载kv配置
        this.kvConfigManager.load();
        //初始化NettyServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //执行器,用于接受请求并进行处理
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //注册Netty的处理器,即remotingExecutor
        this.registerProcessor();
        //进行扫描未活跃的Broker的任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        //...省略其他代码
        return true;
    }
 
   
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            //注册request处理器
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }  
  
 
    public void start() throws Exception {
        //启动netty server
        this.remotingServer.start();
 
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

NettyRequestProcessor

NettyRequestProcessor为请求处理器,上一步注册的处理器,一般使用默认的处理器DefaultRequestProcessor,processRequest方法处理请求。

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        //...
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            //...省略其他代码
        }
        return null;
    }

到此这篇关于RocketMQ中的NameServer详细解析的文章就介绍到这了,更多相关RocketMQ中的NameServer内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入理解@component与@Configuration注解

    深入理解@component与@Configuration注解

    这篇文章主要介绍了深入理解@component与@Configuration注解,从Spring3.0,@Configuration用于定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,这些方法将会被扫描,并用于构建bean定义,初始化Spring容器,需要的朋友可以参考下
    2023-11-11
  • Java编程实现游戏中的简单碰撞检测功能示例

    Java编程实现游戏中的简单碰撞检测功能示例

    这篇文章主要介绍了Java编程中的简单碰撞检测功能,涉及java针对坐标点的相关数学运算操作技巧,需要的朋友可以参考下
    2017-10-10
  • Java Maven依赖传递,可选依赖,排除依赖详解

    Java Maven依赖传递,可选依赖,排除依赖详解

    这篇文章主要介绍了Java Maven依赖传递,可选依赖,排除依赖详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • 详解SpringBoot读取Yml配置文件的3种方法

    详解SpringBoot读取Yml配置文件的3种方法

    本文主要介绍了详解SpringBoot读取Yml配置文件的3种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • hadoop是什么语言

    hadoop是什么语言

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储
    2017-09-09
  • 剖析SpringCloud Feign中所隐藏的坑

    剖析SpringCloud Feign中所隐藏的坑

    这篇文章主要为大家介绍了剖析SpringCloud Feign中所隐藏的坑示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • JAVA遍历Map集合的几种方法汇总

    JAVA遍历Map集合的几种方法汇总

    这篇文章主要给大家介绍了关于JAVA遍历Map集合的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • 简单记事本java源码实例

    简单记事本java源码实例

    这篇文章主要介绍了简单记事本java源码,以一个完整的实例形式分析了记事本的Java实现方法,对于Java应用程序的开发有一定的参考借鉴价值,需要的朋友可以参考下
    2014-11-11
  • 基于Java 数组内存分配的相关问题

    基于Java 数组内存分配的相关问题

    本篇文章是对Java中数组内存分配进行了详细的分析介绍,需要的朋友参考下
    2013-05-05
  • ScheduledExecutorService任务定时代码示例

    ScheduledExecutorService任务定时代码示例

    这篇文章主要介绍了ScheduledExecutorService任务定时代码示例,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01

最新评论