Nacos源码学习计划-Day23-Nacos2.x-服务实例信息变化如何同步到集群节点
ZealSinger 发布于 阅读:266 技术文档
在看Nacos服务端处理服务注册的时候,我们最后是探索到了AbstractClient类下的addServiceInstance()方法
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
// 把 instance 信息放入到 publishers 中
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
// 同步集群节点数据
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
在该方法的最后那个pushlishEvent()方法中,发布了一个ClientChangedEvent,从命名和业务逻辑来看,这一步肯定是发布客户端改变事件用于同步别的集群节点,所以我们这一篇的内容就是来了解一下这个
处理ClientChangedEvent
还是老样子,我们对于Event事件类型的处理,都是查找对应的处理Event的地方,利用Idea的全局搜索轻松实现
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
// 临时实例是走 distro 协议,持久化实例走 raft 协议
// isResponsibleClient 这里判断了只有该 client 的责任节点有权利进行集群数据同步
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
// 客户端注销集群节点同步事件
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 这里有常量默认值TYPE,其数值为 Nacos:Naming:v2:ClientData
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
// 客户端改变集群节点同步事件
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 这里有常量默认值TYPE,其数值为 Nacos:Naming:v2:ClientData
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
通过上面这一段代码我们可以得知,不管是客户端注销了,还是客户端改变了,都会来同步集群节点,它们两个都是调用了 distroProtocol.sync 来进行同步,唯一的区别就是参数一个是 DELETE、一个是CHANGE。那么我们接下来一个看下同步的具体逻辑吧,代码如下:
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历集群其他节点,除开自身节点
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
// 创建了 DistroDelayTask 任务,并且把 targetServer(即集群节点的address) 包装成 DistroKey 当作参数传入了进去
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 往执行引擎中添加任务
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
可以看到,这里最后利用distroTaskEngineHolder.getDelayTaskExecuteEngine()去执行distroDelayTask任务,这个地方的逻辑我们在上面就说过,实际上执行的就需要看对应的processor,而DistroTaskEngineHolder中的执行器就是DistroDelayTaskProcessor,我们看看对应的处理方法
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE:
// 删除任务
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
// 改变、新增都会走这一段逻辑,封装为DistroSyncChangeTask
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
这一段逻辑,不管是删除,还是改变和新增的操作,都是创建了一个任务,添加到了 distro 任务执行引擎,这个引擎和之前看的延时任务执行实现有点不一样。

从逻辑上找下去,我们首先需要看到DistroExecuteTaskExecuteEngine中的addTask()的逻辑
可以发现,就是将task加入到TaskExecuteWorker类型的数组中,然后调用TaskExecuteWorker.process方法执行task

然后我们来看看process中的逻辑

可以看到这里面的主要逻辑就是将task放入到队列中。
在延时任务执行引擎实现原理是有一个 Map tasks 任务池,很多对应的任务处理器,然后定时从任务池获取任务,执行任务。而这个 distroTaskEngineHolder实现原理使用阻塞队列 + 异步任务的方式来实现的,所以最终这个任务肯定会被拿取并且执行
DistroSyncChangeTask
我们回到DistroDelayTaskProcessor中的process逻辑,可以知道无论是ADD还是CHANGE对应的都是封装为了DistroSyncChangeTask任务类型,我们查看其设计,可以发现继承自AbstractDistroExecuteTask,继续往上看其实就是实现了Runnable接口,所以我们来看看对应的run逻辑,因为我们是CHANGE类型,所以我们这里也肯定去看DistroSyncChangeTask下的doExecuteWithCallback的逻辑

protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
// 获取请求数据
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return;
}
// syncData 同步集群节点
getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
}
此时我们应该去看syncData的逻辑,该方法也是接口中的方法,需要在对应的实现类中查找对应的实现,这里有两个实现类

怎么知道是哪个呢?我们可以回头来看一下,调用syncData的是findTransportAgent(type)的返回指,这个返回值其实在之前的run的逻辑中也有被调用,其返回类型是DistroTransportAgent,而DistroClientTransportAgent是DistroTransportAgent的子类

看到这里我们知道了,接下来要看的内容是DistroClientTransportAgent的syncData()的逻辑
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
// 创建请求对象
DistroDataRequest request = new DistroDataRequest(data, data.getType());
// 找到集群节点
Member member = memberManager.find(targetServer);
try {
// 发送 rpc 异步请求
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
所以看到这里我们差不多知道了整个某个节点上的服务实例修改后,如果通知到其余节点的过程,捋一捋,一开始我们是从 ClientChangedEvent 事件开始分析,我们找到了处理这个事件的逻辑,调用了 sync 方法,在这个方法中会遍历其他集群节点进行数据同步,同步的逻辑如下。
-
先是创建了 DistroDelayTask 延迟任务,放入到了延迟任务执行引擎,由
DistroDelayTaskProcessor处理器来处理。 -
在
DistroDelayTaskProcessor处理器,处理的逻辑中又创建了DistroSyncChangeTask线程任务,执行引擎是先调用了AbstractDistroExecuteTask父类中的run方法,在run方法中又调用了子类的doExecuteWithCallback方法。 -
doExecuteWithCallback方法中会去获取最新的微服务实例列表,通过 rpc 发起异步请求进行数据同步。
集群其余节点处理DistroDataRequest
上述内容中,我们已经清楚了集群节点发其同步请求,那么现在我们来看看别的集群节点如何处理这个同步请求的。我们知道,对应的请求类型是DistroDataRequest,老规矩,利用IDEA全局搜索找到对应的Handler处理器

可以发现,无论是ADD 还是 CHANGE 还是 DELETE 的逻辑,最终都是执行的handlerSyncData(...)的逻辑,跟下去整个逻辑链路大概是handleSyncData()->onReceive()->processData->handlerClientSyncData()
private DistroDataResponse handleSyncData(DistroData distroData) {
// 创建response对象
DistroDataResponse result = new DistroDataResponse();
// onReceive方法处理请求数据distroData
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
/**
* Receive synced distro data, find processor to process.
*
* @param distroData Received data
* @return true if handle receive data successfully, otherwise false
*/
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
String resourceType = distroData.getDistroKey().getResourceType();
// 利用type找到对应的processor处理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
// 调用处理器的processData方法处理请求数据
return dataProcessor.processData(distroData);
}
processData也是接口中的方法,一样的有两个实现类,这里我们需要根据resourceType来进行确认,这个resourceType的值大家可以回过头看这一章的最开始那个地方,在syncToAllServer中,也就是我们Event创建和传入的地方,我们当前这个resourceType和DistroKey其实一开始就绑定了,那么其实这个resourceType就是我们上面提到的常量TYPE
然后这个地方通过findDataProcessor查找DistroComponentHolder中的Map成员dataProcessorMap,以TYPE为key的对应的value
private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();
public DistroDataProcessor findDataProcessor(String processType) {
return dataProcessorMap.get(processType);
}
到这里我们知道了存储在哪里以及对应的key,但是对应的value该如何查找呢?大家可以继续看到DistroComponentHolder中,可以发现这个Map是private私有的,那么就说明外部不能随便进行赋值,有且只能通过registerDataProcessor方法进行赋值
那么我们查找一下这个方法使用的地方,看看是否能查找到部分线索,可以发现就两个地方

我们可以分别看一下这两个地方中,因为我们需要其processType为TYPE,那么也就是对应的processType()方法返回值为TYPE,可以看到,符合条件的是DistroClientComponentRegistry,另外一个的返回不满足条件(另外一个的就不贴图了,大家自己可以去看一下,他的Key是另外一个常量)

那么确定了对应的返回值类型是DistroClientComponentRegistry,那么我们就可以看其中的processData()方法的实现逻辑
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
// 添加和改变走这个逻辑
// 这一步是将请求数据利用序列化器进行对应的反序列化的操作
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
// 删除走这个逻辑
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
可以看到主要的逻辑就是handlerClientSyncData方法,而在这个方法内的主要逻辑是upgradeClient方法
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
upgradeClient(client, clientSyncData);
}
// 传入client即客户端ID对应的客户端对象 和 clientSyncData为对应的客户端数据
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
// 同步数据集合
Set<Service> syncedService = new HashSet<>();
// 遍历传入参数中的所有服务信息
for (int i = 0; i < namespaces.size(); i++) {
// 创建对应的Service对象
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
// 获取内存中保存的对应的service单例对象
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 加入到集合中,方便后续比较
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
// 如果从client中获取的和同步数据中的内容不一样,则发布新增事件,client需要同步新数据
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
// 如果某个服务不在同步数据中(即不在 syncedService 集合中),则从client中移除该服务实例并发布注销事件
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
在这里会解析传过来的参数,最后对于新增的数据,还是发布了 ClientRegisterServiceEvent 事件,这个事件我们在讲解注册源码的时候已经分析过了,这一块注册逻辑就结束了。
我们会过来看看processData中对于删除操作的逻辑处理,可以发现核心在于clientManager.clientDisconnected(deleteClientId);一样得需要找对应的实现类
这个实现类怎么找到的呢,这个有点难度。
从代码的角度分析的话,DistroClientComponentRegistry中的这个clientManager,本质上是来源于DistroClientComponentRegistry中的clientManager,而DistroClientComponentRegistry是个Bean对象
那么我们来看看clientManager,他对应的接口类型是ClientManagerDelegate,这个ClientManagerDelegate中也同样实现了clientDisconnected这个方法
ClientManagerDelegate 持有三种不同的 ClientManager 实现:
ConnectionBasedClientManager:基于连接的客户端管理器
EphemeralIpPortClientManager:临时IP端口客户端管理器
PersistentIpPortClientManager:持久IP端口客户端管理器
而ClientManagerDelegate中对于clientDisconnected的实现如下
@Override
public boolean clientDisconnected(String clientId) {
return getClientManagerById(clientId).clientDisconnected(clientId);
}
与这个方法相关联的方法如下
private ClientManager getClientManagerById(String clientId) {
if (isConnectionBasedClient(clientId)) {
return connectionBasedClientManager;
}
return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
}
private boolean isConnectionBasedClient(String clientId) {
return !clientId.contains(IpPortBasedClient.ID_DELIMITER);
}
也就是说,我们确定这个clientDisconnected的实现类,最终是要依靠clientId,而本质上这个clientId是客户端注册的时候生成的,比较难追踪,而且你会发现很多的相关的信息都会回到这个clientId上来
所以我们这里其实最好的方法是debug的形式,或者我们也可以从理解的角度来看,一般的客户端都是临时IP端口,所以默认为EphemeralIpPortClientManager的实现
这个方法我们看 EphemeralIpPortClientManager 临时实例的实现类
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
// 移除客户端信息
IpPortBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
// 发布客户端注销事件
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
client.release();
return true;
}
ClientDisconnectEvent 发布客户端注销事件,会在多个地方处理该事件。
第一个是DistroClientDataProcessor,客户端注销后会同步集群节点。第二个是ClientServiceIndexesManager,移除本身节点该 client 的数据。
客户端注销同步集群节点没什么好说的了,和注册同步逻辑类似,我们来看下 ClientServiceIndexesManager 类处理注销事件,代码如下:
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 处理客户端注销
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
// 移除订阅者信息 其实就是操作subscriberIndexes
removeSubscriberIndexes(each, client.getClientId());
}
for (Service each : client.getAllPublishedService()) {
// 移除注册表信息 其实就是操作publisherIndexes
removePublisherIndexes(each, client.getClientId());
}
}
文章标题:Nacos源码学习计划-Day23-Nacos2.x-服务实例信息变化如何同步到集群节点
文章链接:https://zealsinger.xyz/?post=52
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!
微信扫一扫
支付宝扫一扫