Nacos服务注册与发现原理解读
Nacos是阿里巴巴开源的服务注册与发现组件,同时也提供配置管理功能。它支持基于DNS和RPC的服务发现,致力于帮助开发人员发现、配置和管理微服务。
下面将深入解析其服务注册与发现的核心原理,并提供关键源码示例。
核心原理架构
Nacos的服务注册与发现架构主要包含三个核心组件:
- 服务提供者:负责将自身服务实例注册到Nacos Server
- 服务消费者:负责从Nacos Server获取服务列表并进行服务调用
- Nacos Server:负责维护服务实例信息,处理注册、发现、健康检查等请求
服务注册原理
服务注册是指服务提供者将自身服务实例的信息注册到Nacos Server的过程。
其核心流程如下:
- 1. 服务实例启动时,通过SDK向Nacos Server发送注册请求
- 2. Nacos Server接收请求并验证信息,将服务实例信息存储到内存和持久化存储中
- 3. 服务实例定期向Nacos Server发送心跳包维持注册状态
服务注册核心源码
下面是Nacos服务注册的核心源码示例:
// ServiceRegistration接口定义服务注册的基本方法
public interface ServiceRegistration<T> {
void register();
void deregister();
T getRegistration();
}
// NacosServiceRegistry实现了服务注册逻辑
@Service
public class NacosServiceRegistry implements ServiceRegistry<NacosRegistration> {
private final NacosServiceManager nacosServiceManager;
private final NacosRegistrationProperties registrationProperties;
public NacosServiceRegistry(NacosServiceManager nacosServiceManager,
NacosRegistrationProperties registrationProperties) {
this.nacosServiceManager = nacosServiceManager;
this.registrationProperties = registrationProperties;
}
@Override
public void register(NacosRegistration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 构建服务实例注册信息
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 调用Nacos客户端进行服务注册
namingService().registerInstance(
registration.getServiceId(),
registration.getGroupName(),
instance);
log.info("nacos registry, {} {}:{} register finished",
registration.getServiceId(),
instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry error", e);
}
}
// 获取Nacos命名服务客户端
private NamingService namingService() throws NacosException {
return nacosServiceManager.getNamingService(registrationProperties);
}
// 从注册信息构建Nacos实例对象
private Instance getNacosInstanceFromRegistration(NacosRegistration registration) {
Instance instance = new Instance();
// 设置实例基本信息
instance.setIp(registration.getIp());
instance.setPort(registration.getPort());
instance.setWeight(registration.getWeight() == null ? 1.0F : registration.getWeight());
instance.setClusterName(registration.getClusterName());
instance.setHealthy(registration.isHealthy());
instance.setServiceName(registration.getServiceId());
instance.setInstanceId(registration.getInstanceId());
instance.setEphemeral(registration.isEphemeral());
// 设置元数据
if (registration.getMetadata() != null) {
instance.setMetadata(registration.getMetadata());
}
return instance;
}
}
// NacosNamingService是Nacos命名服务的核心实现类
public class NacosNamingService implements NamingService {
private final NacosServiceFactory serviceFactory;
private final NamingProxy namingProxy;
private final ClientWorker clientWorker;
public NacosNamingService(Properties properties) throws NacosException {
// 初始化相关组件
this.serviceFactory = new NacosServiceFactory(properties);
this.namingProxy = serviceFactory.createNamingProxy();
this.clientWorker = new ClientWorker(namingProxy, properties);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 检查服务名是否合法
if (StringUtils.isEmpty(groupName)) {
groupName = Constants.DEFAULT_GROUP;
}
// 调用客户端工作类处理注册
clientWorker.registerInstance(serviceName, groupName, instance);
}
// 客户端工作类处理注册逻辑
public class ClientWorker {
private final NamingProxy namingProxy;
private final ServiceInfoHolder serviceInfoHolder;
private final ScheduledExecutorService executorService;
public ClientWorker(NamingProxy namingProxy, Properties properties) {
this.namingProxy = namingProxy;
this.serviceInfoHolder = new ServiceInfoHolder();
this.executorService = Executors.newScheduledThreadPool(1,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.naming.client.Worker");
t.setDaemon(true);
return t;
}
});
// 启动心跳任务
scheduleHeartbeat();
}
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 构建注册请求参数
Map<String, String> params = new HashMap<>(16);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("clusterName", instance.getClusterName());
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("serviceId", instance.getServiceId());
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// 发送注册请求到Nacos Server
namingProxy.registerInstance(params);
// 加入到服务信息持有者中
serviceInfoHolder.processServiceJson(
serviceName, groupName,
JSON.toJSONString(Collections.singletonList(instance))
);
}
// 启动心跳任务,定期发送心跳维持注册状态
private void scheduleHeartbeat() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 发送心跳包
clientWorker.sendHeartbeat();
} catch (Exception e) {
log.error("Exception when sending heartbeat", e);
}
}
}, 5000, 5000, TimeUnit.MILLISECONDS);
}
// 发送心跳方法
public void sendHeartbeat() throws NacosException {
for (Map.Entry<String, List<Instance>> entry : serviceInfoHolder.getServices().entrySet()) {
String serviceName = entry.getKey();
List<Instance> instances = entry.getValue();
for (Instance instance : instances) {
if (!instance.isHealthy() || !instance.isEphemeral()) {
continue;
}
// 构建心跳请求参数
Map<String, String> params = new HashMap<>(16);
params.put("serviceName", serviceName);
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("clusterName", instance.getClusterName());
params.put("serviceId", instance.getServiceId());
// 发送心跳请求
namingProxy.sendHeartbeat(params);
}
}
}
}
}服务发现原理
服务发现是指服务消费者从Nacos Server获取服务列表,并根据一定的负载均衡策略选择具体服务实例进行调用的过程。
核心流程如下:
- 1. 服务消费者启动时,向Nacos Server订阅所需服务
- 2. Nacos Server推送服务列表给消费者
- 3. 消费者本地缓存服务列表,并定期更新
- 4. 服务调用时,根据负载均衡策略选择具体实例
服务发现核心源码
下面是Nacos服务发现的核心源码示例:
// NacosServiceDiscovery实现了服务发现逻辑
@Service
public class NacosServiceDiscovery implements ServiceDiscovery<ServiceInstance> {
private final NacosServiceManager nacosServiceManager;
private final NacosDiscoveryProperties discoveryProperties;
public NacosServiceDiscovery(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties discoveryProperties) {
this.nacosServiceManager = nacosServiceManager;
this.discoveryProperties = discoveryProperties;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
return getInstances(serviceId, "");
}
@Override
public List<ServiceInstance> getInstances(String serviceId, String group) throws NacosException {
if (StringUtils.isEmpty(serviceId)) {
throw new NacosException(NacosException.INVALID_PARAM, "serviceId is empty");
}
if (StringUtils.isEmpty(group)) {
group = discoveryProperties.getGroup();
}
// 调用Nacos命名服务获取实例列表
List<Instance> instances = namingService().getInstances(serviceId, group);
// 转换为标准ServiceInstance格式
return instances.stream()
.map(instance -> new NacosServiceInstance(instance, serviceId))
.collect(Collectors.toList());
}
@Override
public List<String> getServices() throws NacosException {
// 获取所有服务列表
return namingService().getServicesOfServer(1000, 0).stream()
.map(serviceInfo -> serviceInfo.getName())
.collect(Collectors.toList());
}
// 获取Nacos命名服务客户端
private NamingService namingService() throws NacosException {
return nacosServiceManager.getNamingService(discoveryProperties.getNacosProperties());
}
}
// NacosNamingService中的服务发现相关方法
public class NacosNamingService implements NamingService {
// 获取服务实例列表
@Override
public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters)
throws NacosException {
if (StringUtils.isEmpty(groupName)) {
groupName = Constants.DEFAULT_GROUP;
}
// 调用客户端工作类获取实例
return clientWorker.getInstances(serviceName, groupName, clusters);
}
// 客户端工作类处理服务发现逻辑
public class ClientWorker {
// 服务信息持有者,缓存服务列表
private final ServiceInfoHolder serviceInfoHolder;
public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters)
throws NacosException {
// 构建服务标识
String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName);
// 从服务信息持有者获取服务信息
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceId);
// 如果服务信息为空或已过期,主动拉取
if (serviceInfo == null || serviceInfo.isExpired()) {
serviceInfo = refreshServiceInfo(serviceName, groupName, clusters);
}
// 返回可用实例列表
return serviceInfo == null ? Collections.emptyList() : serviceInfo.getHosts();
}
// 刷新服务信息
public ServiceInfo refreshServiceInfo(String serviceName, String groupName, List<String> clusters)
throws NacosException {
String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName);
// 构建请求参数
Map<String, String> params = new HashMap<>(16);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
if (clusters != null && !clusters.isEmpty()) {
params.put("clusters", StringUtils.join(clusters, ","));
}
// 调用Nacos Server获取服务信息
String result = namingProxy.queryList(serviceId, params);
// 处理服务信息
return serviceInfoHolder.processServiceJson(serviceId, result);
}
// 服务信息持有者类,负责缓存和管理服务信息
public class ServiceInfoHolder {
// 服务信息缓存
private final Map<String, ServiceInfo> services = new ConcurrentHashMap<>();
// 上次更新时间
private final Map<String, Long> lastRefTime = new ConcurrentHashMap<>();
public ServiceInfo processServiceJson(String serviceId, String json) {
if (StringUtils.isEmpty(json)) {
return null;
}
ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
if (serviceInfo != null) {
// 更新服务信息
services.put(serviceId, serviceInfo);
lastRefTime.put(serviceId, System.currentTimeMillis());
// 注册监听器,当服务信息变化时通知
if (null != listeners.get(serviceId)) {
for (EventListener listener : listeners.get(serviceId)) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
listener.onEvent(new NamingEvent(serviceId, serviceInfo));
} catch (Exception e) {
log.error("EventListener execute error.", e);
}
}
});
}
}
}
return serviceInfo;
}
// 获取服务信息
public ServiceInfo getServiceInfo(String serviceId) {
ServiceInfo serviceInfo = services.get(serviceId);
if (serviceInfo != null) {
return serviceInfo;
}
// 如果服务信息不存在,主动拉取
try {
return refreshServiceInfo(serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[0],
serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[1], null);
} catch (NacosException e) {
log.error("getServiceInfo error, serviceId: {}", serviceId, e);
}
return null;
}
// 服务信息是否过期
public boolean isExpired(String serviceId) {
Long lastRef = lastRefTime.get(serviceId);
if (lastRef == null) {
return true;
}
// 默认15秒更新一次
return (System.currentTimeMillis() - lastRef) > 15 * 1000;
}
}
}
}服务健康检查机制
Nacos的服务健康检查是保证服务可用性的关键机制,主要包含两种检查方式:
1. 客户端主动上报:服务实例定期向Nacos Server发送心跳包
2. 服务端主动检查:Nacos Server定期向服务实例发送健康检查请求
健康检查的核心源码涉及到ClientWorker类中的心跳机制和Server端的检查逻辑,上述源码中已包含客户端心跳相关部分。
服务配置与同步机制
Nacos采用了长轮询和推送相结合的方式实现服务配置的实时同步:
1. 客户端发起长轮询请求到服务端
2. 服务端有变更时立即响应,无变更则等待一段时间后响应
3. 客户端收到响应后立即发起新的长轮询请求
4. 服务端也可以主动推送变更到客户端
这种机制保证了服务信息的实时性和一致性,同时减少了客户端与服务端的通信开销。
总结
Nacos的服务注册与发现机制通过简洁而高效的设计,实现了微服务的自动注册、发现和健康管理。其核心原理包括:
- 基于客户端的服务注册与心跳维持
- 基于长轮询和推送的服务信息同步
- 灵活的服务发现与负载均衡策略
- 可靠的服务健康检查机制
通过上述源码可以看到,Nacos通过NamingService接口封装了核心功能,ClientWorker处理具体的注册、发现和心跳逻辑,ServiceInfoHolder负责服务信息的缓存和管理,整体架构清晰且易于扩展。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
jar命令修改jar包中的application.yml配置文件
本文主要介绍了jar命令修改jar包中的application.yml配置文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2023-08-08
Mybatis-plus自动填充不生效或自动填充数据为null原因及解决方案
本文主要介绍了Mybatis-plus自动填充不生效或自动填充数据为null原因及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2022-05-05
解决Idea的选择文件后定位瞄准器"Select Opened File"的功能
使用IntelliJ IDEA时,可能会发现"SelectOpenedFile"功能不见了,这个功能允许用户快速定位到当前打开文件的位置,若要找回此功能,只需在IDEA的标题栏上右键,然后选择"Always Select Opened File",这样就可以重新启用这个便捷的功能2024-11-11
基于resty orm的ActiveRecord操作数据指南
这篇文章主要为大家介绍了基于resty orm的ActiveRecord操作数据指南,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步2022-03-03


最新评论