«

Nacos源码学习计划-Day06-服务端检测不健康实例

ZealSinger 发布于 阅读:33 技术文档


在Nacos的管理界面中,我们可以看到会展示健康实例的个数,这个数据是怎么获取的?Nacos服务端是如何知道对应的客户端状态是否可用的呢?

image-20251009101057110

其实我们在《02-客户端如何发其注册》这一章节中,我们提到和分析过客户端发送心跳的逻辑,也就是通过这个心跳,让服务端知道客户端的状态,这个BeatInfo最终也还是落到了一个HTTP请求,转门发送到了Nacos服务端的心跳检测接口。

image-20251009101352184

所以我们接下来可以去Nacos源码中看看服务端实例心跳接口源码

服务端实例心跳接口源码分析

之前有分析过客户端的发送心跳包的逻辑,所以这里我们不过多的再说了,通过分析可以得知最终发送的URL和请求体结构如下图

image-20251009101835315

找到服务端中对应的接口,删除一些和我们主要逻辑没有关系的代码后,大致的内容就是如下

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

   ObjectNode result = JacksonUtils.createEmptyJsonNode();
   result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
   String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);

   // 从request中获取请求参数、 namespaceId 、 serviceName
   RsInfo clientBeat = null;
   if (StringUtils.isNotBlank(beat)) {
       clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
  }
   String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
   String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
   NamingUtils.checkServiceNameFormat(serviceName);
   Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
   
   
   // 通过 命名空间、服务名等信息,从内存注册表中获取 instance 实例对象
   Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

   // 如果获取实例为空,会重新调用注册的方法
   if (instance == null) {
       if (clientBeat == null) {
           result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
           return result;
      }

       Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
           + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

       instance = new Instance();
       instance.setPort(clientBeat.getPort());
       instance.setIp(clientBeat.getIp());
       instance.setWeight(clientBeat.getWeight());
       instance.setMetadata(clientBeat.getMetadata());
       instance.setClusterName(clusterName);
       instance.setServiceName(serviceName);
       instance.setInstanceId(instance.getInstanceId());
       instance.setEphemeral(clientBeat.isEphemeral());

       // 重新注册
       serviceManager.registerInstance(namespaceId, serviceName, instance);
  }

   // 获取 service
   Service service = serviceManager.getService(namespaceId, serviceName);

   if (service == null) {
       throw new NacosException(NacosException.SERVER_ERROR,
           "service not found: " + serviceName + "@" + namespaceId);
  }
   if (clientBeat == null) {
       clientBeat = new RsInfo();
       clientBeat.setIp(ip);
       clientBeat.setPort(port);
       clientBeat.setCluster(clusterName);
  }
   
   // 提交客户端心跳操作任务,更改 lastBeat 属性
   service.processClientBeat(clientBeat);

   result.put(CommonParams.CODE, NamingResponseCode.OK);
   if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
       result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
  }
   result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
   return result;
}

在后半段逻辑中我们可以看到有个processClientBeat方法,其逻辑如下,主要任务是对于客户端心跳的一个异步响应任务

实例心跳接口总的来说,先通过请求参数,在我们内存注册表中找 Instance,如果不找到会重新进行注册。然后提交了一个ClientBeatProcessor异步任务,在异步任务中,会找到相同集群下的所有临时实例列表,然后进行 for 循环,根据 ip + port 来进行判断,如果找到相同 ip + port 的 instance 会立马修改 lastBeat 属性,并且最后也会判断实例是否健康,如果不健康,会重新标记为健康的状态

 // 先创建定义了一个ClientBeatProcessor异步任务对象 然后通过HealthCheckReactor执行这个异步任务
public void processClientBeat(final RsInfo rsInfo) {
       ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
       clientBeatProcessor.setService(this);
       clientBeatProcessor.setRsInfo(rsInfo);
       HealthCheckReactor.scheduleNow(clientBeatProcessor);
  }

// ClientBeatProcessor是一个Runable的实现类 主要逻辑如下
@Override
public void run() {
   Service service = this.service;
   if (Loggers.EVT_LOG.isDebugEnabled()) {
       Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
  }

   // 获取当前心跳包对应的 ip、clusterName
   String ip = rsInfo.getIp();
   String clusterName = rsInfo.getCluster();
   int port = rsInfo.getPort();
   Cluster cluster = service.getClusterMap().get(clusterName);
   // 获取 当前 cluster 下所有临时实例
   List<Instance> instances = cluster.allIPs(true);

   // 遍历临时实例
   for (Instance instance : instances) {
       // 判断 ip 、 port ,只操作当前发送心跳检查的 instance 实例
       if (instance.getIp().equals(ip) && instance.getPort() == port) {
           if (Loggers.EVT_LOG.isDebugEnabled()) {
               Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
          }
           // 把 最后心跳时间 修改为当前时间
           instance.setLastBeat(System.currentTimeMillis());
           if (!instance.isMarked()) {
               // 如果之前为不健康的状态
               if (!instance.isHealthy()) {
                   // 更改为健康
                   instance.setHealthy(true);
                   Loggers.EVT_LOG
                      .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                           cluster.getService().getName(), ip, port, cluster.getName(),
                           UtilsAndCommons.LOCALHOST_SITE);
                   getPushService().serviceChanged(service);
              }
          }
      }
  }
}



/*
HealthCheckReactor类是 Nacos 命名服务中用于管理健康检查任务调度的核心组件。它的主要作用是通过线程池调度健康检查任务,支持定时执行和取消任务。

以下是该类的主要逻辑:

1. 健康检查任务调度
  - scheduleCheck(HealthCheckTask task)方法用于调度健康检查任务,根据任务的检查间隔时间来安排执行
  - 使用 GlobalExecutor.scheduleNamingHealth()方法来安排任务执行

2. 客户端心跳检查任务管理
  - scheduleCheck(ClientBeatCheckTask task)方法用于调度客户端心跳检查任务,固定延迟5秒执行
  - 使用 futureMap来存储任务的调度Future,避免重复调度
  - cancelCheck(ClientBeatCheckTask task)方法用于取消已调度的心跳检查任务,并从 futureMap中移除

3. **即时任务调度**
  - scheduleNow(Runnable task)方法用于立即调度任务执行(无延迟)

该类通过 GlobalExecutor来实际执行任务调度,使用 `ConcurrentHashMap` 来管理已调度任务的引用,确保任务可以被正确取消。
*/

服务端感知和处理不健康实例

上述我们分析了客户端的心跳包到了服务端后如何处理的,但是心跳包是健康客户端5s自己发送包所以能被服务端检测到,那么对于已经不健康的包肯定是不会发送心跳包的,那么Nacos服务端是如何检测和感知的呢?

其实,早在我们之前看 服务端注册接口 的逻辑中,我们没看的分支代码中,有个 createEmptyService 方法

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

   // 创建一个空的Service
   createEmptyService(namespaceId, serviceName, instance.isEphemeral());

   // 通过命名空间、服务名获取一个Service,为空则抛出异常
   Service service = getService(namespaceId, serviceName);
   if (service == null) {
       throw new NacosException(NacosException.INVALID_PARAM,
           "service not found, namespace: " + namespaceId + ", service: " + serviceName);
  }

   // 添加实例
   addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

createEmptyService方法中,又会开启一个异步任务,这个异步任务的作用就是:检查有哪些客户端是不健康的状态,如果不健康就需要对它进行处理。我这里把开启异步任务调用链路列举出来,代码就不一一贴了:createEmptyService() -> createServiceIfAbsent() -> putServiceAndInit(service) -> service.init()service.init() 在这个方法中,开启了健康检查异步任务,代码如下:

public void init() {
   // 开启 clientBeatCheckTask 延时任务,每 5s 执行一次
   HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
   for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
       entry.getValue().setService(this);
       entry.getValue().init();
  }
}

可以看到这里提交的是一个clientBeatCheckTask的任务,所以我们还是得去看看他的run方法中的逻辑,分析代码可以知道主要逻辑是如下

@Override
public void run() {
   try {
       // 获取全部的临时实例
       List<Instance> instances = service.allIPs(true);

       // 遍历每一个临时实例
       for (Instance instance : instances) {
           // 判断 当前时间 - 实例最后心跳时间 > 心跳超时时间
           // instance.getInstanceHeartBeatTimeOut() 取常量 15s
           if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
               // marked 默认为 false,所以这个 if 成立
               if (!instance.isMarked()) {
                   // 如果这个 instance 还是健康的状态
                   if (instance.isHealthy()) {
                       // 最终就改成 不健康 状态
                       instance.setHealthy(false);
                       Loggers.EVT_LOG
                          .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                               instance.getIp(), instance.getPort(), instance.getClusterName(),
                               service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                               instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                       // 事件发布监听事件, 通过 upd 协议来发送通知
                       getPushService().serviceChanged(service);
                       ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                  }
              }
          }
      }


       // 又一次遍历全部的临时实例
       for (Instance instance : instances) {

           if (instance.isMarked()) {
               continue;
          }

           // 如果 当前时间 - 最后一次心跳时间 > 心跳删除时间
           // instance.getIpDeleteTimeout() 取常量 30s
           if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
               // delete instance
               Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                   JacksonUtils.toJson(instance));
               // 直接把对应的 instance 从注册表中删除
               deleteIp(instance);
          }
      }
       
  } catch (Exception e) {
       Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
  }

}

编程 Java 项目