«

Nacos源码学习计划-Day08-集群-心跳健康检查

ZealSinger 发布于 阅读:126 技术文档


集群部分前言

我们前面已经就Nacos单机部署情况下,服务上线,服务下线,心跳健康检测三个最主要的方面,从服务端和客户端两个方面都进行了学习,接下来我们要学习Nacos集群相关的部分

我们首先来想想,Nacos集群和单机下,某些策略/逻辑/实现一定一样的么?会遇到哪些新的问题?例如

所以在集群环境下,我们还有很多问题需要去探索。

集群心跳健康检测分析

我们来看看刚刚上面提到的问题的第一个——集群下,每个Nacos节点都需要执行心跳健康检测的定时任务么?

策略

根据我们的集群相关储备,其实就是集群下的定时任务的处理,一般而言,我们对于心跳检测定时任务,目的是为了在某一时刻确定微服务的健康状态,那么如果每个Nacos节点都进行一次定时任务,也就是在一段时间内/一个时刻集群内的所有的节点都获得了微服务的健康状态,那么如果在这个过程中,每个节点获取到的健康状态不一样,我们该如何选择?以哪个为准呢?

所以从上述思考就能知道,全部进行询问会导致结果的多样性,从而造成决策困难,所以在Nacos中,我们是集群中某一个Nacos节点去做心跳检测任务,然后把检测结果同步给其余节点

需要注意,我们这里说的是分析的是非临时实例的心跳检测
Nacos中对于临时和非临时的心跳检测机制是不一样的
临时实例服务端是客户端无限循环的任务队列发送心跳
非临时实例是服务端对实例进行不健康和下线处理的主动检测定时任务

 

选举源码

我们之前在《06-服务都安检测不健康实例》接触过,HealthCheckReactor类是 Nacos 命名服务中用于管理健康检查任务调度的核心组件,他会接收ClientBeatCheckTask类型的定时任务从而负责健康状态的检测,既然要看健康检测的代码逻辑,自然就是看ClientBeatCheckTask的run方法的源码

@Override
   public void run() {
       try {

           // 集群下 判断自身节点,是否需要执行心跳健康检查任务,如果不需要,会直接 return 从集群节点的 "责任分工" 角度,判断当前节点是否需要负责该服务的心跳检查
           if (!getDistroMapper().responsible(service.getName())) {
               return;
          }

           // 是否开启健康检查任务,默认是 true 从 "功能开关" 角度,判断全局是否开启了健康检查功能
           if (!getSwitchDomain().isHealthCheckEnabled()) {
               return;
          }

           List<Instance> instances = service.allIPs(true);

           // first set health status of instances:
           for (Instance instance : instances) {
               if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                   if (!instance.isMarked()) {
                       if (instance.isHealthy()) {
                           instance.setHealthy(false);
                           Loggers.EVT_LOG
                                  .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                           instance.getIp(), instance.getPort(), instance.getClusterName(),
                                           service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                           instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                           getPushService().serviceChanged(service);
                           ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                      }
                  }
              }
          }

           if (!getGlobalConfig().isExpireInstance()) {
               return;
          }

           // then remove obsolete instances:
           for (Instance instance : instances) {

               if (instance.isMarked()) {
                   continue;
              }

               if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                   // delete instance
                   Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                           JacksonUtils.toJson(instance));
                   deleteIp(instance);
              }
          }

      } catch (Exception e) {
           Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
      }

  }

可以看到,首先是利用responsible方法判断当前节点是否需要执行心跳检测,源码如下

通过这个方法我们就能得知,在 Nacos 集群架构下,服务端心跳健康检查任务最终只有一台集群节点能够去执行心跳健康检查任务,而其他的集群节点,并不会去执行

public boolean responsible(String serviceName) {

   // 获取我们集群节点数量,我们这里假设的是三个集群节点
   final List<String> servers = healthyList;

   // 如果采用单机模式启动,直接返回true
   if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
       return true;
  }

   // 如果没有可用的健康集群节点,则直接返回 false
   if (CollectionUtils.isEmpty(servers)) {
       // means distro config is not ready yet
       return false;
  }

   int index = servers.indexOf(EnvUtil.getLocalAddress());
   int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());

   if (lastIndex < 0 || index < 0) {
       return true;
  }

   // 把 serviceName 进行 Hash 操作,然后跟 servers size 进行取模,最终只会有一个节点能够返回 true
   int target = distroHash(serviceName) % servers.size();
   return target >= index && target <= lastIndex;
}

同步信息源码

根据上面的内容,我们知道了Nacos内部通过hash和取模的方式确定了只会有某一个节点负责心跳检测。那么心跳检测之后,状态变更了,如果去通知别的节点呢?

我们来看一下ServiceManager这个类,有一个 init方法,该方法被 @PostConstruct 注解修饰了,在 Bean 创建的时候,会来调用这个init 方法,而在这个方法中,就会开启心跳健康检查结果同步的定时任务

@PostConstruct
public void init() {

   // 同步心跳健康检查结果异步任务
   GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

   // 处理 同步心跳健康检查结果异步任务 内存队列 + 异步任务
   GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
   
   // ......
}

我们来看一下ServiceReporter对应的run的逻辑

public void run() {
   try {
       // 获取所有的service实例
       Map<String, Set<String>> allServiceNames = getAllServiceNames();
       
       // 如果没有实例 直接返回 都没有实例同步个蛋对吧
       if (allServiceNames.size() <= 0) {
           //ignore
           return;
      }
       
       // 按照命名空间 遍历所有的命名空间
       for (String namespaceId : allServiceNames.keySet()) {
           // ServiceChecksum只有两个成员
           // 一个是String类型的namespaceId;
           // 还有一个是一个Map,key为String类型的serviceName,value是String类型的service.getChecksum() 这个是一个校验和字段 后面会提到
           ServiceChecksum checksum = new ServiceChecksum(namespaceId);
           
           // 遍历每个命名空间下的所有的实例
           for (String serviceName : allServiceNames.get(namespaceId)) {
               // 采用和 判断是否需要进行心跳检测 同样的算法,保证只有一台集群节点,同步心跳健康检查结果
               if (!distroMapper.responsible(serviceName)) {
                   continue;
              }
               
               Service service = getService(namespaceId, serviceName);
               
               if (service == null || service.isEmpty()) {
                   continue;
              }
/* 这个方法的作用是整合服务的信息为一个整体的特定格式的字符串作为校验和,也就是service.getChecksum()
收集服务的所有实例信息,包括IP、端口、权重、健康状态和集群名称,经过MD5加密后且保持UTF-8的编码
校验和格式:服务信息 + 实例1信息 + 实例2信息 + ... 单个实例信息格式为: IP:端口_权重_健康状态_集群名称,

校验和整体格式为:
服务信息节点1IP:节点1端口_节点1权重_节点1健康状态_节点1所在集群名称,节点2IP:节点2端口_节点2权重_节点2健康状态_节点2所在集群名称....
*/
               service.recalculateChecksum();
               
               checksum.addItem(serviceName, service.getChecksum());
          }
           // 创建消息对象
           Message msg = new Message();
           
           // 将整个ServiceChecksum对象JSON序列化设置为message的data 可以理解为在封装消息体
           msg.setData(JacksonUtils.toJson(checksum));
           
           // 获取集群中的所有成员
           Collection<Member> sameSiteServers = memberManager.allMembers();
           
           if (sameSiteServers == null || sameSiteServers.size() <= 0) {
               return;
          }
           
           // 遍历所有的成员节点 目的就是为了进行数据同步
           for (Member server : sameSiteServers) {
               // 判断是否为自身字节 跳过自己
               if (server.getAddress().equals(NetUtils.localServer())) {
                   continue;
              }
               // 发送消息给别的成员节点
               synchronizer.send(server.getAddress(), msg);
          }
      }
  } catch (Exception e) {
       Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
  } finally {
       GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
               TimeUnit.MILLISECONDS);
  }
}


// 补充 ServiceChecksum结构和recalculateChecksum方法
public static class ServiceChecksum {
       
       public String namespaceId;
       
   // 结合上面对于addItem的判断就能知道 这个里保存的就是 <服务名,服务实例校验和>
       public Map<String, String> serviceName2Checksum = new HashMap<String, String>();
       
       public ServiceChecksum() {
           this.namespaceId = Constants.DEFAULT_NAMESPACE_ID;
      }
       
       public ServiceChecksum(String namespaceId) {
           this.namespaceId = namespaceId;
      }
       
       /**
        * Add service checksum.
        *
        * @param serviceName service name
        * @param checksum   checksum of service
        */
       public void addItem(String serviceName, String checksum) {
           if (StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(checksum)) {
               Loggers.SRV_LOG.warn("[DOMAIN-CHECKSUM] serviceName or checksum is empty,serviceName: {}, checksum: {}",
                       serviceName, checksum);
               return;
          }
           serviceName2Checksum.put(serviceName, checksum);
      }
  }


/**
    * Re-calculate checksum of service.
    * 重新计算服务的校验和(checksum)。
    *
    * 该方法会收集服务的所有实例信息,包括IP、端口、权重、健康状态和集群名称,
    * 并结合服务的基本信息生成一个唯一的校验和。这个校验和主要用于判断服务
    * 实例列表是否发生了变化,以便进行数据同步和一致性检查。
    *
    * 具体步骤:
    * 1. 获取服务的所有实例列表
    * 2. 获取服务的基本信息字符串
    * 3. 对实例列表按自然顺序排序以确保一致性
    * 4. 将每个实例的核心属性拼接成特定格式的字符串
    * 5. 使用MD5算法对整个字符串进行哈希,生成校验和
    *
    * 校验和格式:服务信息 + 实例1信息 + 实例2信息 + ...
    * 其中每个实例信息格式为:IP:端口_权重_健康状态_集群名称,
    */
   public synchronized void recalculateChecksum() {
       List<Instance> ips = allIPs();

       StringBuilder ipsString = new StringBuilder();
       ipsString.append(getServiceString());

       if (Loggers.SRV_LOG.isDebugEnabled()) {
           Loggers.SRV_LOG.debug("service to json: " + getServiceString());
      }

       if (CollectionUtils.isNotEmpty(ips)) {
           Collections.sort(ips);
      }

       for (Instance ip : ips) {
           String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip
                  .getClusterName();
           ipsString.append(string);
           ipsString.append(",");
      }

       checksum = MD5Utils.md5Hex(ipsString.toString(), Constants.ENCODE);
  }

可以看到,最后发送同步消息的核心方法为synchronizer.send(),我们来看看他的源码

该send方法有两个实现类,我们是从ServiceManager出发的,这里也是看ServiceStatusSynchronizer这个部分的实现

@Override
public void send(final String serverIP, Message msg) {
   if (serverIP == null) {
       return;
  }

   // 构建请求参数 msg的data内容即校验和作为了statuses的value进行保存 本节点的IP作为clientIP的value
   Map<String, String> params = new HashMap<String, String>(10);
   params.put("statuses", msg.getData());
   params.put("clientIP", NetUtils.localServer());

   // 拼接 url 地址
   String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
       + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";

   if (IPUtil.containsPort(serverIP)) {
       url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
           + "/service/status";
  }

   try {
       // 异步发送 http 请求,url 地址就是:http://ip/v1/ns/service/status , 同步心跳健康检查结果
       HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
            // 代码省略
      });
  } catch (Exception e) {
       Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
  }
}

通过代码可以得知,最终也是通过 HTTP 的POST请求方式来进行数据同步的,也能够看出请求地址是v1/ns/service/status

我们顺着去看看,这个HTTP请求对应的接口的处理逻辑

image-20251027210832371

接口的代码比较长,我们简单过一下,不会进行过于详细的解析

@PostMapping("/status")
   public String serviceStatus(HttpServletRequest request) throws Exception {
       // 将整个request序列化为JSON格式
       String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
       String value = URLDecoder.decode(entity, "UTF-8");
       JsonNode json = JacksonUtils.toObj(value);
       
       //format: service1@@checksum@@@service2@@checksum
       String statuses = json.get("statuses").asText();  // 取出校验和 校验和也就是上面官方注解的format格式
       String serverIp = json.get("clientIP").asText();  // 取出发出该信息的nacos节点IP信息
       
      // 检测IP是否为集群中的节点  
       if (!memberManager.hasMember(serverIp)) {
           throw new NacosException(NacosException.INVALID_PARAM, "ip: " + serverIp + " is not in serverlist");
      }
       
       try {
           // 将当前statuses转化为ServiceChecksum对象。可以成功转化因为statuses本来就是ServiceChecksum转换来的
           ServiceManager.ServiceChecksum checksums = JacksonUtils
                  .toObj(statuses, ServiceManager.ServiceChecksum.class);
           if (checksums == null) {
               Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: null");
               return "fail";
          }
           // serviceName2Checksum就是一个map,结构为 <服务名,服务实例校验和>
           for (Map.Entry<String, String> entry : checksums.serviceName2Checksum.entrySet()) {
               if (entry == null || StringUtils.isEmpty(entry.getKey()) || StringUtils.isEmpty(entry.getValue())) {
                   continue;
              }
               String serviceName = entry.getKey();
               String checksum = entry.getValue();
               Service service = serviceManager.getService(checksums.namespaceId, serviceName);
               
               if (service == null) {
                   continue;
              }
               // 得到当前Nacos节点中的对应的服务的最新校验和
               service.recalculateChecksum();
               // 通过对比 别的Nacos节点发过来的校验和 与 当前Nacos节点中该服务的校验和 来判断是否发生了状态变更
               if (!checksum.equals(service.getChecksum())) {
                   if (Loggers.SRV_LOG.isDebugEnabled()) {
                       Loggers.SRV_LOG.debug("checksum of {} is not consistent, remote: {}, checksum: {}, local: {}",
                               serviceName, serverIp, checksum, service.getChecksum());
                  }
                   // 如果发生了变更 就需要更新 即同步客户端信息的操作
                   serviceManager.addUpdatedServiceToQueue(checksums.namespaceId, serviceName, serverIp, checksum);
              }
          }
      } catch (Exception e) {
           Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: " + statuses, e);
      }
       
       return "ok";
  }

我们进一步查看addUpdatedServiceToQueue方法,该方法会把我们传入的参数,包装成 ServiceKey 对象,最终放入到了toBeUpdatedServicesQueue阻塞队列当中,代码如下


// 阻塞队列
private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {
   lock.lock();
   try {
       // 包装成 ServiceKey 对象,放入到 toBeUpdatedServicesQueue 队列中
       toBeUpdatedServicesQueue
          .offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
  } catch (Exception e) {
       toBeUpdatedServicesQueue.poll();
       toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
       Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);
  } finally {
       lock.unlock();
  }
}

既然最后是放入到了阻塞队列,想必大家肯定也能猜到,后台肯定也会有一个异步任务,从这个阻塞队列中进行取任务。这个逻辑就和之前分析的实例注册一样,那么这个取任务的异步任务在哪?

这就需要我们回到最开始ServiceManager类了。还记得我们上面整个分析的起点是啥?ServiceManager第一行的逻辑——同步心跳健康检查结果异步任务 ,我们上面的分析都是对于同步心跳健康检查结果异步任务的逻辑的分析,然后第二行也是一个异步任务,而这个异步任务就是我们从阻塞队列中取出ServiceKey对象

@PostConstruct
public void init() {

   // 同步心跳健康检查结果异步任务
   GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

   // 处理 同步心跳健康检查结果异步任务 内存队列 + 异步任务
   GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
   
   // ......
}

第二个线程任务类是:UpdatedServiceProcessor,我们从run 方法中(代码如下),能够看出是一个 while 循环,并且是没有结束条件的。在循环的逻辑当中,会从toBeUpdatedServicesQueue阻塞队列中一直取任务,取到任务之后,又是提交了一个ServiceUpdaterl类型线程池任务

@Override
public void run() {
   ServiceKey serviceKey = null;
   try {
       // 无限循环
       while (true) {
           try {
               // 从阻塞任务中取任务
               serviceKey = toBeUpdatedServicesQueue.take();
          } catch (Exception e) {
               Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
          }

           if (serviceKey == null) {
               continue;
          }
           // 又是提交一个线程任务
           GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
      }
  } catch (Exception e) {
       Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
  }
}

我们再来看一下最后提交的这个ServiceUpdater的任务,可以看到里面的核心方法就是updatedHealthStatus方法,也看看他的逻辑

先解析我们的 msg.getData()参数,然后获取注册表中全部的 Instance 实例列,进行遍历,在健康状态有变动的情况下,会直接更改它的 healthy 属性。在方法的最后,如果有更新 healthy属性的情况下,最终也会发布服务改变事件来通知客户端进行更新。

public void run() {
   try {
       updatedHealthStatus(namespaceId, serviceName, serverIP);
  } catch (Exception e) {
       Loggers.SRV_LOG
              .warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
                       serverIP, e);
  }
}


public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
   // 组成Messaeg对象
   Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
   JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
   
   // 解析参数
   ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
   Map<String, String> ipsMap = new HashMap<>(ipList.size());
   for (int i = 0; i < ipList.size(); i++) {
       
       String ip = ipList.get(i).asText();
       String[] strings = ip.split("_");
       ipsMap.put(strings[0], strings[1]);
  }
   
   Service service = getService(namespaceId, serviceName);
   
   if (service == null) {
       return;
  }
   // 是否更改的标识变量 默认没有改变
   boolean changed = false;
   
   List<Instance> instances = service.allIPs();
   for (Instance instance : instances) {
       // 获取全部的实例信息,进行遍历
       boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
       // 判断健康状态
       if (valid != instance.isHealthy()) {
           changed = true;
           instance.setHealthy(valid);
           Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
                  (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
                   instance.getClusterName());
      }
  }
   // 如果实例健康状态改变了,那么就发布 服务改变事件,使用 upd 的方式通知客户端
   if (changed) {
       pushService.serviceChanged(service);
       if (Loggers.EVT_LOG.isDebugEnabled()) {
           StringBuilder stringBuilder = new StringBuilder();
           List<Instance> allIps = service.allIPs();
           for (Instance instance : allIps) {
               stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
          }
           Loggers.EVT_LOG
                  .debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
                           service.getName(), stringBuilder.toString());
      }
  }
   
}

总结

编程 Java 项目