RocketMQ中的NameServer详细解析
前言
NameServer是一个非常简单的Topic路由注册中心,支持Broker的动态注册与发现。
Producer和Conumser通过NameServer可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer各实例间相互不进行信息通讯,因此不能保证NameServer的一致性(Consistency),可以保证可用性(Availability)。
即选择了CAP中的AP。NameServer只能保证最终一致性,关于怎么保证最终一致性后文再讲。
现在先从NameServer的启动开始。
NameServer为namesrv模块

NamesrvStartup
public static NamesrvController main0(String[] args) {
//构造NamesrvController
NamesrvController controller = createNamesrvController(args);
start(controller);
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
boolean initResult = controller.initialize();
//启动
controller.start();
return controller;
}
NamesrvStartup作为NameServer的启动类,主要做了三件事:
- 构造NamesrvController(NameServer控制器)
- 加载初始化,由NamesrvController负责
- 启动remotingServer,开启Netty服务
NamesrvController
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//nameSrv的配置
private final NamesrvConfig namesrvConfig;
//netty的配置
private final NettyServerConfig nettyServerConfig;
//执行单线程的任务调度,自定义编程名称
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//kv的配置管理
private final KVConfigManager kvConfigManager;
//路由管理器,有broker的ip和队列信息,producer发送的queue信息,consumer的pull的queue信息
private final RouteInfoManager routeInfoManager;
//namesrv的netty的服务端实现
private RemotingServer remotingServer;
//处理接受到请求事件的回调监听服务,主要处理netty的事件
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//构造kv配置的管理
this.kvConfigManager = new KVConfigManager(this);
//构造路由信息管理
this.routeInfoManager = new RouteInfoManager();
//构造网络连接事件管理
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//配置
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}NamesrvController作为一个控制器主要负责NettyServer的创建,注册requestProcessor,启动NettyServer及各种task
public boolean initialize() {
//加载kv配置
this.kvConfigManager.load();
//初始化NettyServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//执行器,用于接受请求并进行处理
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册Netty的处理器,即remotingExecutor
this.registerProcessor();
//进行扫描未活跃的Broker的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//...省略其他代码
return true;
}
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
//注册request处理器
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
public void start() throws Exception {
//启动netty server
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}NettyRequestProcessor
NettyRequestProcessor为请求处理器,上一步注册的处理器,一般使用默认的处理器DefaultRequestProcessor,processRequest方法处理请求。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//...
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
//...省略其他代码
}
return null;
}到此这篇关于RocketMQ中的NameServer详细解析的文章就介绍到这了,更多相关RocketMQ中的NameServer内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
浅谈java中Math.random()与java.util.random()的区别
下面小编就为大家带来一篇浅谈java中Math.random()与java.util.random()的区别。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧2016-09-09
POI XSSFSheet shiftRows bug问题解决
这篇文章主要介绍了POI XSSFSheet shiftRows bug问题解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-07-07
通过代理类实现java连接数据库(使用dao层操作数据)实例分享
java通过代理类实现数据库DAO操作代码分享,大家参考使用吧2013-12-12
spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的声明详解
这篇文章主要给大家介绍了关于spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 声明的相关资料,需要的朋友可以参考下2020-08-08
Spring Boot mybatis-config 和 log4j 输出sql 日志的方式
这篇文章主要介绍了Spring Boot mybatis-config 和 log4j 输出sql 日志的方式,本文通过实例图文相结合给大家介绍的非常详细,需要的朋友可以参考下2021-07-07
RabbitMQ的Direct Exchange模式实现的消息发布案例(示例代码)
本文介绍了RabbitMQ的DirectExchange模式下的消息发布和消费的实现,详细说明了如何在DirectExchange模式中进行消息的发送和接收,以及消息处理的基本方法,感兴趣的朋友跟随小编一起看看吧2024-09-09


最新评论