«

Nacos源码学习计划-Day23-Nacos2.x-服务实例信息变化如何同步到集群节点

ZealSinger 发布于 阅读:266 技术文档


前面我们了解了Nacos2.X版本下以gRpc为客户端和服务端之间的通讯下,客户端如何注册和查询实例,以及服务端如何处理注册请求,如何将服务变更通知到订阅的客户端,也在这几个过程中加深了对于Nacos2.X版本下内存注册表的结构。

在看Nacos服务端处理服务注册的时候,我们最后是探索到了AbstractClient类下的addServiceInstance()方法

@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
   // 把 instance 信息放入到 publishers 中
   if (null == publishers.put(service, instancePublishInfo)) {
       MetricsMonitor.incrementInstanceCount();
  }
   // 同步集群节点数据
   NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
   Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
   return true;
}

在该方法的最后那个pushlishEvent()方法中,发布了一个ClientChangedEvent,从命名和业务逻辑来看,这一步肯定是发布客户端改变事件用于同步别的集群节点,所以我们这一篇的内容就是来了解一下这个

处理ClientChangedEvent

还是老样子,我们对于Event事件类型的处理,都是查找对应的处理Event的地方,利用Idea的全局搜索轻松实现

private void syncToAllServer(ClientEvent event) {
   Client client = event.getClient();
   // Only ephemeral data sync by Distro, persist client should sync by raft.
   // 临时实例是走 distro 协议,持久化实例走 raft 协议
   // isResponsibleClient 这里判断了只有该 client 的责任节点有权利进行集群数据同步
   if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
       return;
  }
   // 客户端注销集群节点同步事件
   if (event instanceof ClientEvent.ClientDisconnectEvent) {
       // 这里有常量默认值TYPE,其数值为 Nacos:Naming:v2:ClientData
       DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
       distroProtocol.sync(distroKey, DataOperation.DELETE);
       
   // 客户端改变集群节点同步事件
  } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 这里有常量默认值TYPE,其数值为 Nacos:Naming:v2:ClientData
       DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
       distroProtocol.sync(distroKey, DataOperation.CHANGE);
  }
}

通过上面这一段代码我们可以得知,不管是客户端注销了,还是客户端改变了,都会来同步集群节点,它们两个都是调用了 distroProtocol.sync 来进行同步,唯一的区别就是参数一个是 DELETE、一个是CHANGE。那么我们接下来一个看下同步的具体逻辑吧,代码如下:

public void sync(DistroKey distroKey, DataOperation action) {
   sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}

public void sync(DistroKey distroKey, DataOperation action, long delay) {
   // 遍历集群其他节点,除开自身节点
   for (Member each : memberManager.allMembersWithoutSelf()) {
       syncToTarget(distroKey, action, each.getAddress(), delay);
  }
}

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
   DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
           targetServer);
   // 创建了 DistroDelayTask 任务,并且把 targetServer(即集群节点的address) 包装成 DistroKey 当作参数传入了进去
   DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
  // 往执行引擎中添加任务
  distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
   if (Loggers.DISTRO.isDebugEnabled()) {
       Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
  }
}

可以看到,这里最后利用distroTaskEngineHolder.getDelayTaskExecuteEngine()去执行distroDelayTask任务,这个地方的逻辑我们在上面就说过,实际上执行的就需要看对应的processor,而DistroTaskEngineHolder中的执行器就是DistroDelayTaskProcessor,我们看看对应的处理方法

@Override
public boolean process(NacosTask task) {
   if (!(task instanceof DistroDelayTask)) {
       return true;
  }
   DistroDelayTask distroDelayTask = (DistroDelayTask) task;
   DistroKey distroKey = distroDelayTask.getDistroKey();
   switch (distroDelayTask.getAction()) {
       case DELETE:
           // 删除任务
           DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
           distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
           return true;
       case CHANGE:
       case ADD:
           // 改变、新增都会走这一段逻辑,封装为DistroSyncChangeTask
           DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
           distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
           return true;
       default:
           return false;
  }
}

这一段逻辑,不管是删除,还是改变和新增的操作,都是创建了一个任务,添加到了 distro 任务执行引擎,这个引擎和之前看的延时任务执行实现有点不一样。

image-20251227211423702

从逻辑上找下去,我们首先需要看到DistroExecuteTaskExecuteEngine中的addTask()的逻辑

可以发现,就是task加入到TaskExecuteWorker类型的数组中,然后调用TaskExecuteWorker.process方法执行task

image-20251227212310543

然后我们来看看process中的逻辑

image-20251227214537141

可以看到这里面的主要逻辑就是将task放入到队列中。

在延时任务执行引擎实现原理是有一个 Map tasks 任务池,很多对应的任务处理器,然后定时从任务池获取任务,执行任务。而这个 distroTaskEngineHolder实现原理使用阻塞队列 + 异步任务的方式来实现的,所以最终这个任务肯定会被拿取并且执行

DistroSyncChangeTask

我们回到DistroDelayTaskProcessor中的process逻辑,可以知道无论是ADD还是CHANGE对应的都是封装为了DistroSyncChangeTask任务类型,我们查看其设计,可以发现继承自AbstractDistroExecuteTask,继续往上看其实就是实现了Runnable接口,所以我们来看看对应的run逻辑,因为我们是CHANGE类型,所以我们这里也肯定去看DistroSyncChangeTask下的doExecuteWithCallback的逻辑

image-20251227220202242

@Override
protected void doExecuteWithCallback(DistroCallback callback) {
   
   String type = getDistroKey().getResourceType();
   // 获取请求数据
   DistroData distroData = getDistroData(type);
   if (null == distroData) {
       Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
       return;
  }
   // syncData 同步集群节点
   getDistroComponentHolder().findTransportAgent(type)
          .syncData(distroData, getDistroKey().getTargetServer(), callback);
}

此时我们应该去看syncData的逻辑,该方法也是接口中的方法,需要在对应的实现类中查找对应的实现,这里有两个实现类

image-20251227224407283

怎么知道是哪个呢?我们可以回头来看一下,调用syncData的是findTransportAgent(type)的返回指,这个返回值其实在之前的run的逻辑中也有被调用,其返回类型是DistroTransportAgent,而DistroClientTransportAgentDistroTransportAgent的子类

image-20251227224351093

看到这里我们知道了,接下来要看的内容是DistroClientTransportAgent的syncData()的逻辑

@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
   if (isNoExistTarget(targetServer)) {
       callback.onSuccess();
       return;
  }
   // 创建请求对象
   DistroDataRequest request = new DistroDataRequest(data, data.getType());
   
   // 找到集群节点
   Member member = memberManager.find(targetServer);
   try {
       // 发送 rpc 异步请求
       clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
  } catch (NacosException nacosException) {
       callback.onFailed(nacosException);
  }
}

所以看到这里我们差不多知道了整个某个节点上的服务实例修改后,如果通知到其余节点的过程,捋一捋,一开始我们是从 ClientChangedEvent 事件开始分析,我们找到了处理这个事件的逻辑,调用了 sync 方法,在这个方法中会遍历其他集群节点进行数据同步,同步的逻辑如下。

  1. 先是创建了 DistroDelayTask 延迟任务,放入到了延迟任务执行引擎,由 DistroDelayTaskProcessor 处理器来处理。

  2. DistroDelayTaskProcessor 处理器,处理的逻辑中又创建了 DistroSyncChangeTask 线程任务,执行引擎是先调用了 AbstractDistroExecuteTask 父类中的 run 方法,在 run 方法中又调用了子类的doExecuteWithCallback 方法。

  3. doExecuteWithCallback 方法中会去获取最新的微服务实例列表,通过 rpc 发起异步请求进行数据同步。

集群其余节点处理DistroDataRequest

上述内容中,我们已经清楚了集群节点发其同步请求,那么现在我们来看看别的集群节点如何处理这个同步请求的。我们知道,对应的请求类型是DistroDataRequest,老规矩,利用IDEA全局搜索找到对应的Handler处理器

image-20251228102952496

可以发现,无论是ADD 还是 CHANGE 还是 DELETE 的逻辑,最终都是执行的handlerSyncData(...)的逻辑,跟下去整个逻辑链路大概是handleSyncData()->onReceive()->processData->handlerClientSyncData()

private DistroDataResponse handleSyncData(DistroData distroData) {
   // 创建response对象
   DistroDataResponse result = new DistroDataResponse();
   // onReceive方法处理请求数据distroData
   if (!distroProtocol.onReceive(distroData)) {
       result.setErrorCode(ResponseCode.FAIL.getCode());
       result.setMessage("[DISTRO-FAILED] distro data handle failed");
  }
   return result;
}


/**
* Receive synced distro data, find processor to process.
*
* @param distroData Received data
* @return true if handle receive data successfully, otherwise false
*/
public boolean onReceive(DistroData distroData) {
   Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
           distroData.getDistroKey());
   String resourceType = distroData.getDistroKey().getResourceType();
   // 利用type找到对应的processor处理器
   DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
   if (null == dataProcessor) {
       Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
       return false;
  }
   // 调用处理器的processData方法处理请求数据
   return dataProcessor.processData(distroData);
}

processData也是接口中的方法,一样的有两个实现类,这里我们需要根据resourceType来进行确认,这个resourceType的值大家可以回过头看这一章的最开始那个地方,在syncToAllServer中,也就是我们Event创建和传入的地方,我们当前这个resourceTypeDistroKey其实一开始就绑定了,那么其实这个resourceType就是我们上面提到的常量TYPE

然后这个地方通过findDataProcessor查找DistroComponentHolder中的Map成员dataProcessorMap,以TYPE为key的对应的value

private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();

public DistroDataProcessor findDataProcessor(String processType) {
   return dataProcessorMap.get(processType);
}

到这里我们知道了存储在哪里以及对应的key,但是对应的value该如何查找呢?大家可以继续看到DistroComponentHolder中,可以发现这个Map是private私有的,那么就说明外部不能随便进行赋值,有且只能通过registerDataProcessor方法进行赋值

那么我们查找一下这个方法使用的地方,看看是否能查找到部分线索,可以发现就两个地方

image-20251228204246865

我们可以分别看一下这两个地方中,因为我们需要其processTypeTYPE,那么也就是对应的processType()方法返回值为TYPE,可以看到,符合条件的是DistroClientComponentRegistry,另外一个的返回不满足条件(另外一个的就不贴图了,大家自己可以去看一下,他的Key是另外一个常量)

image-20251228204526868

那么确定了对应的返回值类型是DistroClientComponentRegistry,那么我们就可以看其中的processData()方法的实现逻辑

@Override
public boolean processData(DistroData distroData) {
   switch (distroData.getType()) {
       case ADD:
       case CHANGE:
           // 添加和改变走这个逻辑
           // 这一步是将请求数据利用序列化器进行对应的反序列化的操作
           ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                  .deserialize(distroData.getContent(), ClientSyncData.class);
           handlerClientSyncData(clientSyncData);
           return true;
       case DELETE:
           // 删除走这个逻辑
           String deleteClientId = distroData.getDistroKey().getResourceKey();
           Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
           clientManager.clientDisconnected(deleteClientId);
           return true;
       default:
           return false;
  }
}

可以看到主要的逻辑就是handlerClientSyncData方法,而在这个方法内的主要逻辑是upgradeClient方法

private void handlerClientSyncData(ClientSyncData clientSyncData) {
   Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
   clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
   Client client = clientManager.getClient(clientSyncData.getClientId());
   upgradeClient(client, clientSyncData);
}

// 传入client即客户端ID对应的客户端对象 和 clientSyncData为对应的客户端数据
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
   List<String> namespaces = clientSyncData.getNamespaces();
   List<String> groupNames = clientSyncData.getGroupNames();
   List<String> serviceNames = clientSyncData.getServiceNames();
   List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
   // 同步数据集合
   Set<Service> syncedService = new HashSet<>();
   // 遍历传入参数中的所有服务信息
   for (int i = 0; i < namespaces.size(); i++) {
       // 创建对应的Service对象
       Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
       // 获取内存中保存的对应的service单例对象
       Service singleton = ServiceManager.getInstance().getSingleton(service);
       // 加入到集合中,方便后续比较
       syncedService.add(singleton);
       InstancePublishInfo instancePublishInfo = instances.get(i);
       // 如果从client中获取的和同步数据中的内容不一样,则发布新增事件,client需要同步新数据
       if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
           client.addServiceInstance(singleton, instancePublishInfo);
           NotifyCenter.publishEvent(
                   new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
      }
  }
   // 如果某个服务不在同步数据中(即不在 syncedService 集合中),则从client中移除该服务实例并发布注销事件
   for (Service each : client.getAllPublishedService()) {
       if (!syncedService.contains(each)) {
           client.removeServiceInstance(each);
           NotifyCenter.publishEvent(
                   new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
      }
  }
}

在这里会解析传过来的参数,最后对于新增的数据,还是发布了 ClientRegisterServiceEvent 事件,这个事件我们在讲解注册源码的时候已经分析过了,这一块注册逻辑就结束了。

我们会过来看看processData中对于删除操作的逻辑处理,可以发现核心在于clientManager.clientDisconnected(deleteClientId);一样得需要找对应的实现类

这个实现类怎么找到的呢,这个有点难度。
从代码的角度分析的话,DistroClientComponentRegistry中的这个clientManager,本质上是来源于DistroClientComponentRegistry中的clientManager,而DistroClientComponentRegistry是个Bean对象
那么我们来看看clientManager,他对应的接口类型是ClientManagerDelegate,这个ClientManagerDelegate中也同样实现了clientDisconnected这个方法
ClientManagerDelegate 持有三种不同的 ClientManager 实现:
ConnectionBasedClientManager:基于连接的客户端管理器
EphemeralIpPortClientManager:临时IP端口客户端管理器
PersistentIpPortClientManager:持久IP端口客户端管理器

而ClientManagerDelegate中对于clientDisconnected的实现如下
@Override
public boolean clientDisconnected(String clientId) {
  return getClientManagerById(clientId).clientDisconnected(clientId);
}
与这个方法相关联的方法如下
private ClientManager getClientManagerById(String clientId) {
  if (isConnectionBasedClient(clientId)) {
      return connectionBasedClientManager;
  }
  return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
}

private boolean isConnectionBasedClient(String clientId) {
  return !clientId.contains(IpPortBasedClient.ID_DELIMITER);
}

也就是说,我们确定这个clientDisconnected的实现类,最终是要依靠clientId,而本质上这个clientId是客户端注册的时候生成的,比较难追踪,而且你会发现很多的相关的信息都会回到这个clientId上来

所以我们这里其实最好的方法是debug的形式,或者我们也可以从理解的角度来看,一般的客户端都是临时IP端口,所以默认为EphemeralIpPortClientManager的实现

这个方法我们看 EphemeralIpPortClientManager 临时实例的实现类

@Override
public boolean clientDisconnected(String clientId) {
  Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
  // 移除客户端信息
  IpPortBasedClient client = clients.remove(clientId);
  if (null == client) {
      return true;
  }
  // 发布客户端注销事件
  NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
  client.release();
  return true;
}

ClientDisconnectEvent 发布客户端注销事件,会在多个地方处理该事件。

第一个是DistroClientDataProcessor,客户端注销后会同步集群节点。第二个是ClientServiceIndexesManager,移除本身节点该 client 的数据。

客户端注销同步集群节点没什么好说的了,和注册同步逻辑类似,我们来看下 ClientServiceIndexesManager 类处理注销事件,代码如下:


@Override
public void onEvent(Event event) {
  if (event instanceof ClientEvent.ClientDisconnectEvent) {
      // 处理客户端注销
      handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
  } else if (event instanceof ClientOperationEvent) {
      handleClientOperation((ClientOperationEvent) event);
  }
}

private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
  Client client = event.getClient();
  for (Service each : client.getAllSubscribeService()) {
      // 移除订阅者信息 其实就是操作subscriberIndexes
      removeSubscriberIndexes(each, client.getClientId());
  }
  for (Service each : client.getAllPublishedService()) {
      // 移除注册表信息 其实就是操作publisherIndexes
      removePublisherIndexes(each, client.getClientId());
  }
}

编程 Java 项目