Nacos源码学习计划-Day20-Nacos2.x-服务端处理客户端gRPC注册请求
ZealSinger 发布于 阅读:133 技术文档
上一章节我们分析了Nacos2.X的客户端中是如何利用gRPC发起注册请求的,那么接下来自然就是找服务端这边对于该注册请求的处理。
我们可以看到,最终发出请求的方法doRegisterService,其代码回顾一下如下
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
可以看到,这个请求体request的数据类型是InstanceRequest,那么自然,服务端这边找到对应的处理入口,肯定也是需要处理同样的数据类型,利用Idea的全局搜索,我们很快找到了这个Request的对应的Handler处理器Bean

很明显,我们现在要看handle()方法中的逻辑,可以看到,这个方法的逻辑首先根据request中的一些命名空间,groupName组名称和serviceName服务名称构建了一个Service对象,且通过这个条链路注册的实例都是默认ephemeral为true,即临时实例
然后就是就是根据request中的type字段进行switch-case判断,我们回过头看一下,就知道我们这里应该进入到第一个case的逻辑中

registerInstance()的逻辑如下
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
// 调用了注册方法,在这里我们要注意这些参数:
// service 就是我们刚刚在 swtich 之前创建的一个新的 service 对象,里面有命名空间、分组名、服务名
// request.getInstance() 这个就是注册到对应客户端的Instance实例对象(微服务对象),里面肯定有 服务ip、端口等信息
// meta.getConnectionId() 这个很关键,连接Id,meta里面是请求客户端的元数据,客户端的一些IP信息
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
可以看到,实际上的逻辑还是clientOperationService.registerInstance()方法,所以我们来分析这个方法
registerInstance方法分析
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
getSingleton()方法
可以看到第一行,就是通过getSingleton()方法获取到Service实例
Service singleton = ServiceManager.getInstance().getSingleton(service);
其方法实现如下
private final ConcurrentHashMap<Service, Service> singletonRepository;
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
public Service getSingleton(Service service) {
// 添加一个 service,如果存在就不添加
singletonRepository.putIfAbsent(service, service);
// 然后从这个 Map 中把 Service 取出来
Service result = singletonRepository.get(service);
// 以 命名空间 为 key,把相同命名空间的 service 加入到 namespaceSingletonMaps 当中,这个map中保存的也是同一个命名空间下的所有的Service
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
可以看到,在Nacos 2.x版本中,把 service 和命名空间拆分成了两个 Map,回想一下在Nacos 1.4.X版本中,是不是进行的Map的嵌套,Service是嵌套在Group所在的Map中的,这里也是Nacos 2.X和1.4之间的内部注册表的结构上的一个明显的差异

获取到对应的Service之后,判断这个Service是否是一个持久化的实例,如果是持久化的实例,则会抛出异常从而不准注册
这个其实在我们第一篇章说到Nacos2.x的变化的时候也提到过,Nacos 2.X版本不会允许同一个服务同时存在持久化实例和非持久化实例
Nacos在1.x版本中允许一个服务同时存在持久化和非持久化实例,持久化属性只是作为实例的一个元数据进行存储和识别,这就导致了实际情况下运维人员很苦恼且从系统架构层面看来存在矛盾。所以在Nacos 2.x中简化了Nacos的服务数据模型,是否持久化的数据抽象至服务级别且不再允许一个服务同时存在持久化和非持久化实例,实例的是否持久化属性配置继承服务的是否持久化属性配置
所以这里的判断就是,如果该服务已经注册了一个持久化实例,本请求链路是注册的非持久化实例,自然是不允许同时注册的,所以抛出异常。
getClient()方法
分支逻辑判断完成之后获取Client对象
Client client = clientManager.getClient(clientId);
方法逻辑如下
private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
public Client getClient(String clientId) {
return clients.get(clientId);
}
这个地方稍微需要一点Netty的知识,所以我们简单概况一下,当我们Nacos底层的gRPC实际上也是Netty,当客户端和服务端之间使用Netty建立连接的时候,服务端内部会创建一个socketChannel对象用于管理这条连接,每和一个独特的客户端的连接都会对应唯一一个socketChannel,在 socketChannel 对象的基础之上,Nacos 又封装了一层 Client 对象,并且会生成一个 connectionId 把它们关联起来

所以这一步就是利用这个id进行获取对应的客户端
然后接下来的逻辑如下
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
clientIsLegal方法的逻辑如下,其实也没什么很好说的,就是一些对于client的判空和是否持久化的判断
private boolean clientIsLegal(Client client, String clientId) {
if (client == null) {
Loggers.SRV_LOG.warn("Client connection {} already disconnect", clientId);
return false;
}
if (!client.isEphemeral()) {
Loggers.SRV_LOG.warn("Client connection {} type is not ephemeral", clientId);
return false;
}
return true;
}
getPublishInfo(instance)方法的逻辑,也没什么很好说的,不要看代码好像还不少,其实就是将入参Instance中的对象,经过一些判断和处理,封装到了InstancePublishInfo对象中
default InstancePublishInfo getPublishInfo(Instance instance) {
InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
Map<String, Object> extendDatum = result.getExtendDatum();
if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
extendDatum.putAll(instance.getMetadata());
}
if (StringUtils.isNotEmpty(instance.getInstanceId())) {
extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
}
if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
}
if (!instance.isEnabled()) {
extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
}
String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
: instance.getClusterName();
result.setHealthy(instance.isHealthy());
result.setCluster(clusterName);
return result;
}
addServiceInstance()方法
接下来就是addServiceInstance(singleton, instanceInfo);方法,这行的逻辑比较重要,可以来分析一下
client.addServiceInstance(singleton, instanceInfo);
同样的,这里是调用的client对象中的方法,这里得看不同的实现类从而有不同的逻辑,好在的是addServiceInstance只有两个对应的实现方法

我们先来看看AbstractClient中的实现
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
// 将其放入到一个内部的Maap中,根据put的返回值判断是否第一次放入该Service,如果为null则说明第一次放入
if (null == publishers.put(service, instancePublishInfo)) {
// 第一次放入的就需要内部计数器递增一下进行记录,这个方法底层就是个原子Int增加
MetricsMonitor.incrementInstanceCount();
}
// 发布事件 事件类型ClientChangedEvent 这个和集群同步相关 暂且先不进行详细解释
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
以至于另外一个实现类IpPortBaseClient中的addServiceInstance,其实也是直接用的父类中实现,没有什么需要额外分析的
最后几行代码,可以很直观的知道,其实也是和事件发布有关,我们接下来就是分析这些发布的事件
client.setLastUpdatedTime(); //更新时间
// 发布ClientRegisterServiceEvent事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
// 发布InstanceMetadataEvent事件
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
ClientRegisterServiceEvent事件
既然我们是和服务注册相关,自然首先看ClientRegisterServiceEvent事件,(我们这里不会介绍InstanceMetadataEvent事件的处理,感兴趣的可以自己点进去看看,和实例的元数据保存有关的,会记录元数据的过期与否,底层也就是对于一个Set进行操作。没过期则加入反之则remove)分析发布事件的相关逻辑,其实就是看两个点
-
哪里接受和处理这个事件
-
这个事件的逻辑是啥
那我们先找到在哪里接受和处理这个事件类型,依旧可以通过Idea的全局搜索进行查找
可以看到,很多都是在new创建对象的时候涉及到,这些地方肯定不是;除此之外就只有如下图所示的地方,出现了event instanceof,这个地方很像对于接收到的event事件进行类型判断然后分类型进行处理,所以我们可以去看看这里的逻辑

可以从这里的代码结构看出来,就是一种类似事件中心的处理结构,Event都进入到这个方法,然后根据不同的事件类型进行处理
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
// 处理客户端注册事件
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
// 处理客户端注销事件
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
// 处理客户端订阅事件
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
// 处理客户端取消订阅事件
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
可以看到,其逻辑下调用的是addPublisherIndexes()方法,而addPublisherIndexes()方法
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private void addPublisherIndexes(Service service, String clientId) {
// 这里可以看到 publisherIndexes 存储了 <Service,Set<clientId>>的对应结构,通过clientId自然能找到client,client中对应的有Instance的信息
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
// 发布ServiceChangedEvent 服务改变事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
继续往下看就是publishEvent的逻辑,但是我们通过事件的类型就可以判断,下面的内容属于服务改变才会触发的事件,而我们当前要追踪的是服务注册事件,自然而言下面的其实不属于我们今天的主线任务
总结
到这里,我们今天的分析其实就结束了,可能大家会比较恍惚啊,可能只记得出现了很多个Map,但是没能理清楚各种Map之间的关系,因为Nacos2.x版本中的设计,不是简单的Map嵌套了,而是采用扁平化的设计,减少层级开销,我们再来总结一下这个流程中涉及到的map以及之间的关系
-
singletonRepository-
类型:
ConcurrentHashMap<Service, Service> -
作用:确保每个服务(Service)在全局唯一。
当注册请求到来时,通过
getSingleton(service)方法,先尝试将服务对象放入该 Map(putIfAbsent),若已存在则直接返回现有实例,避免重复创建。 -
意义:Service 的唯一性是 Nacos2.x 的重要设计(例如不允许同一服务同时存在持久化和非持久化实例),此 Map 是这一规则的底层保障。
-
-
namespaceSingletonMaps-
类型:
ConcurrentHashMap<String, Set<Service>> -
作用:按命名空间(namespace)对服务进行分组管理。
Key 是命名空间 ID,Value 是该命名空间下所有 Service 的集合(ConcurrentHashSet)。每次获取或创建 Service 时,会将其加入对应命名空间的集合中。
-
意义:实现服务的命名空间隔离,方便按命名空间快速查询所有服务(例如跨命名空间的服务不可见)。
-
-
clients(ClientManager 中)-
类型:
ConcurrentMap<String, IpPortBasedClient> -
作用:管理客户端连接与 Client 对象的映射。
Key 是客户端连接 ID(connectionId,由 gRPC/Netty 连接生成),Value 是封装了客户端信息的IpPortBasedClient对象。每个客户端连接对应唯一 Client 对象。
-
-
publishers(Client 对象中)-
类型:
ConcurrentHashMap<Service, InstancePublishInfo> -
作用:记录当前客户端发布的所有实例信息。
Key 是服务(Service),Value 是该服务下的实例详情(InstancePublishInfo,包含 IP、端口、元数据等)。当客户端注册实例时,会将实例信息存入此 Map。
-
-
publisherIndexes-
类型:
ConcurrentMap<Service, Set<String>> -
作用:反向索引服务与客户端的关联。
Key 是服务(Service),Value 是发布该服务的所有客户端 ID(clientId)的集合。通过该结构可以快速找到某个服务对应的所有客户端,进而通过客户端 ID 从clients中获取实例信息。
-
整体结构可以归纳为如下流程关系

文章标题:Nacos源码学习计划-Day20-Nacos2.x-服务端处理客户端gRPC注册请求
文章链接:https://zealsinger.xyz/?post=47
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!
微信扫一扫
支付宝扫一扫