«

Nacos源码学习计划-Day05-服务调用时的调用链路(如何获取服务信息)

ZealSinger 发布于 阅读:56 技术文档


前面的内容中,我们已经了解到了服务是如何触发自动注册到Nacos中,以及对应的Nacos中是如何处理这个注册请求的(如何保存服务到内存中的)

当我们的服务注册后,我们就可以通过Nacos对于已经注册的所有的服务进行Feign的方式进行服务之间的调用,那么这个过程是如何实现的呢?今天就来探索这个问题

首先我们来理一下整个服务调用的流程:

  1. 每个客户端(服务调用者)内部会有个本地缓存缓存了已经调用过的服务的相关信息,缓存列表会定时的从Nacos中拉取最新服务列表从而更新本地缓存

  2. 在服务调用者需要调用服务提供者的时候,首先会根据服务提供者的服务名称到本地缓存中寻找对应的信息如果有多个实例就按照负载均衡算法选出一个

  3. 然后利用选出来的IP和Port进行HTTP调用从而得到调用结果(这里需要注意,实际的调用是服务调用者发起的,Nacos只是负责提供最新的服务信息而不负责调用)

image-20250927213025720

所以这里我们需要搞清楚两个问题

客户端实例发现

这里稍微会需要多看一点,因为Spring Cloud Alibaba随着版本的更替,底层的复杂均衡器由Ribbon替换为了SCLB(Spring Cloud LoadBalancer),且生态设计新增了不少抽象层,所以新版本和旧版本中的代码还是差距有点大

旧版本-Ribbon

Nacos中实际上就是借助Ribbon实现的负载均衡,在Nacos-discovery依赖中导入了Ribbon相关的依赖

image-20250927223719984

Ribbion中有一个接口-ServerList接口,这是一个拓展接口,主要作用就是获取Server列表,然后Nacos中对该接口进行了实现,从而完成对Ribbon的整合

package com.netflix.loadbalancer;
import java.util.List;
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
   public List<T> getInitialListOfServers();
   /**
    * Return updated list of servers. This is called say every 30 secs
    * (configurable) by the Loadbalancer's Ping cycle
    *
    */
   public List<T> getUpdatedListOfServers();  
}

我们进行微服务的调用的时候,Ribbon最终会调用NacosServerList类中的getUpdateListOfServers

至于怎么调用的,这个设计到Ribbon中的源码理解,大致的流程图如下,简单而言就是
(1)Ribbon中的LoadBalancerInterceptor类继承了ClientHttpRequestInterceptor(客户端 Http 请求拦截器) 就会去拦截客户端发起的http请求 而我们的RestTemplate就是属于被拦截的请求
(2)利用拦截到的URL等信息从而得到服务名,调用 RibbonLoadBalanceClient中的execute方法
(3)execute方法中调用getLoadBalancer()方法,该方法会会返回一个ILoadBalancer的类实体对象,在该对象中就包含了一个allList的成员即包含了所有的注册中心中有的实例对象(得到这个对象的内部逻辑中,就涉及到了不同注册中心的实现,也就是说如果我们的注册中心是Nacos,我们在这里调用了Nacos的NacosServerList中的相关方法)
(4)得到ILoadBalancer对象之后,利用getServer方法就会进行负载均衡选出一个最终的service

ribbon 源码分析流程

image-20250927224719618

可以看到NacosServerList的代码中底层都是调用的getServers()这个方法,所以我们重点看一下

可以看到,实际上,是先从缓存中获取数据,缓存中没有数据则去Nacos服务端拉取数据;如果需要去服务端拉取数据,就会顺带更新缓存内容,如果不需要,则会开启一个定时任务进行定时的Nacos服务端数据更新

private List<NacosServer> getServers() {
  try {
     // 读取分组
     String group = discoveryProperties.getGroup();
     // 通过服务名称、分组、true表示只需要健康实例,查询列表
     // 我们重点看 seelctInstances方法
     List<Instance> instances = discoveryProperties.namingServiceInstance()
          .selectInstances(serviceId, group, true);
     
     // 把 Instance 转换成 NacosServer 类型
     return instancesToServerList(instances);
  }
  catch (Exception e) {
     throw new IllegalStateException(
           "Can not get service instances from nacos, serviceId=" + serviceId,
           e);
  }
}
//------------------------------------------------------//

// seelctInstances方法存在多个重载,最终的执行就是最下面那个
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
   return selectInstances(serviceName, groupName, healthy, true);
}

@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)
       throws NacosException {
   return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
       boolean subscribe) throws NacosException {
   
   ServiceInfo serviceInfo;
   // 这个参数传入默认就是 true
   if (subscribe) {
       serviceInfo =
       // 重点在这个方法
       hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
               StringUtils.join(clusters, ","));
  } else {
       serviceInfo = hostReactor
              .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                       StringUtils.join(clusters, ","));
  }
   return selectInstances(serviceInfo, healthy);
}



//------------------------------------------------------//
// hostReactor.getServiceInfo逻辑

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
   
   NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
   String key = ServiceInfo.getKey(serviceName, clusters);
   if (failoverReactor.isFailoverSwitch()) {
       return failoverReactor.getService(key);
  }
   
   // 先查询 本地缓存中实例列表
   ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
   
   // 如果等于 null
   if (null == serviceObj) {
       serviceObj = new ServiceInfo(serviceName, clusters);
       serviceInfoMap.put(serviceObj.getKey(), serviceObj);
       updatingMap.put(serviceName, new Object());
       
       // 立即更新Service数据,在这里就会去调用 Nacos 实例列表查询接口
       updateServiceNow(serviceName, clusters);
       
       updatingMap.remove(serviceName);
       
  } else if (updatingMap.containsKey(serviceName)) {
       
       if (UPDATE_HOLD_INTERVAL > 0) {
           // hold a moment waiting for update finish
           synchronized (serviceObj) {
               try {
                   serviceObj.wait(UPDATE_HOLD_INTERVAL);
              } catch (InterruptedException e) {
                   NAMING_LOGGER
                          .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
              }
          }
      }
  }
   
   // 开启定时任务 维护本地缓存
   scheduleUpdateIfAbsent(serviceName, clusters);
   
   // 最终是从本地缓存中 获取实例列表数据
   return serviceInfoMap.get(serviceObj.getKey());
}

查询Nacos服务端数据的方法为updateServiceNow(),其逻辑如下

private void updateServiceNow(String serviceName, String clusters) {
   try {
       // 调用这个方法
       updateService(serviceName, clusters);
  } catch (NacosException e) {
       NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
  }
}


public void updateService(String serviceName, String clusters) throws NacosException {
   ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
   try {
       
       // queryList方法即调用 Nacos 实例查询接口
       String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
       
       // 如果结果不为空,则会更新到本地缓存数据
       if (StringUtils.isNotEmpty(result)) {
           // 更新本地缓存
           processServiceJson(result);
      }
  } finally {
       if (oldService != null) {
           synchronized (oldService) {
               oldService.notifyAll();
          }
      }
  }
}

// 向 Nacos 服务端发起 HTTP 列表查询接口
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
       throws NacosException {
   
   final Map<String, String> params = new HashMap<String, String>(8);
   params.put(CommonParams.NAMESPACE_ID, namespaceId);
   params.put(CommonParams.SERVICE_NAME, serviceName);
   params.put("clusters", clusters);
   params.put("udpPort", String.valueOf(udpPort));
   params.put("clientIP", NetUtils.localIP());
   params.put("healthyOnly", String.valueOf(healthyOnly));
   
 
   return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

getServiceInfo方法中,获取完实例列表之后,会开启定时任务,在定时任务中会执行 UpdateTask 任务的 run 方法,在 run 方法中会查询本地缓存,如果缓存为空,又会执行 updateService方法调用 Nacos 实例列表查询接口,更新缓存数据。并且这个定时任务会重复一直执行。

最终是从本地缓存中 serviceInfoMap 直接获取实例数据,从这里也可以看出本地缓存其实是一个 Map 结构

这里还有一个细节,就是在queryList方法的入参中,可以看到第三个参数udpPort参数,这个是 Nacos 服务端实例数据发生了改变,假设服务端新增了一个实例,服务端会通过 udp 的方式来通知客户端,这一块后面的章节会讲到

新版本-SCLB

新版本中抽象层变多了,而且获取实例的逻辑稍微有点变化。先说总的:

SCLB中,NacosDusciveryClient是Nacos对Spring Cloud服务发现接口的实现,依赖NacosServiceDiscovery与Nacos服务端进行交互(其实也不是直接交互,通过内部的namingService进行交互),LoadBalancerClient是负载均衡的入口,NacosLoadBanlancer是针对Nacos特有负载均衡器

image-20250927222904992

其整体流程可以如下

  1. 请求到达后,请求会被LoadBalancerInterceptor拦截,转而调用LoadBalancerClientexecute方法

  2. 在该方法中,LoadBalancerClient的实际对象为BlockingLoadBalancerClient,他的execute方法中就会查询所有的实例对象且根据负载均衡策略选择一个实例出来。而查询功能默认会委托给DiscoveryClient(即NacosDiscoveryClient

  3. NacosDiscoveryClient中就会调用NacosServiceDiscovery对象查找所有的实例,这里需要注意,在NacosServiceDiscovery中查到的对象是Nacos中的实例对象格式,通过hostToServiceInstanceList转化为了SpringCloud内部的服务实例对象 image-20250928123510538

  4. 从NacosDiscoveryClient和NacosServiceDiscovery的逻辑可以看出来,所有的查询都是先查询Nacos服务端,查询后立马更新缓存,只有当查询失败得时候,才会走到NacosDiscoveryClient中的catch块中从而 ServiceCache.getInstances(serviceId)从本地缓存中拿取数据,并且可以知道,这一层返回的还是List也就是还没进行负载均衡选择的结果集

  5. 获取到对应服务名的实例List集合之后,就需要进行负载均衡。List数据返回到BlockingLoadBalancerClient之后,会调用其内部的choose方法,该方法就是利用会调用其内部的ReactorServiceInstanceLoadBalancer实现类(loadBalancerClientFactory工厂方法调用choose,该工厂就是使用的ReactorServiceInstanceLoadBalancer接口实现类),即NacosLoadBalancer(若配置了 Nacos 的负载均衡策略) NacosLoadBalancer 根据 Nacos 的负载均衡规则(默认支持权重、健康状态过滤,可自定义)从实例列表中选择一个最优实例(例如,权重高的健康实例优先被选中)image-20250928124544384

服务端查询实例

还是对应的在naming模块中的InstanceController中

很明显,主要的逻辑在最后面的return中

@GetMapping("/list")
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
   public ObjectNode list(HttpServletRequest request) throws Exception {
       
       String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
       String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
       NamingUtils.checkServiceNameFormat(serviceName);
       
       String agent = WebUtils.getUserAgent(request);
       String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
       String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
       int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
       String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
       boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
       
       String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
       
       String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
       
       boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
       // 重点
       return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
               healthyOnly);
  }

doSrvIpxt方法逻辑内容比较多,我们主要看如下部分

// 获取实例列表
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

// 查询每个集群下的所有的实例
public List<Instance> srvIPs(List<String> clusters) {
   if (CollectionUtils.isEmpty(clusters)) {
       clusters = new ArrayList<>();
       clusters.addAll(clusterMap.keySet());
  }
   // 拿到需要查询的 集群对象
   return allIPs(clusters);
}

public List<Instance> allIPs(List<String> clusters) {
   List<Instance> result = new ArrayList<>();
   // 遍历集群对象
   for (String cluster : clusters) {
       // 从缓存池Map中拿到对应的cluster对象
       Cluster clusterObj = clusterMap.get(cluster);
       if (clusterObj == null) {
           continue;
      }
       // 获取 cluster 对象中所有的 Instance 实例
       result.addAll(clusterObj.allIPs());
  }

   return result;
}

public List<Instance> allIPs() {
   // 返回持久化实例、临时实例
   List<Instance> allInstances = new ArrayList<>();
   allInstances.addAll(persistentInstances);
   allInstances.addAll(ephemeralInstances);
   return allInstances;
}

编程 Java 项目