«

Nacos源码学习计划-Day03-服务端如何处理客户端的注册请求(上)

ZealSinger 发布于 阅读:131 技术文档


服务端如何处理客户端的注册请求

我们前面有了解到,客户端的自动注册是借助Spring的消息监听机制,在Spring启动后自动发送了HTTP的注册请求到服务端,那么今天我们来看看,这个请求打到Nacos服务端之后,服务端是如何处理的,这个时候就要看我们的下载的Nacos源码了

分析请求入口

既然要看服务端如何处理这个HTTP请求的,那就很明显需要找到对应的URL的Controller,这个该如何分析和查找?可以从一下两个方法尝试寻找

因为我们分析客户端代码,已经分析过知道了最终那个请求URL为/nacos/v1/ns/instance,其实通过这个信息,和IDEA的端点查找插件和全局搜索,其实就能很方便找到对应的Controller

分析法就是根据功能和Nacos的模块分类进行搜索。例如当前这个,属于注册中心功能,也就是我们核心模块中的Nameing Service,可以看到Nacos中源码中刚好有一个Name模块,然后我们找入口自然是找Controller,所以到Controler目录下查找,功能和注册实例相关的,自然就是找类似于InstanceController;RegistryController类似的Controller,这样子就能很快找到

image-20250908135319893

整体分析

找到对应的Controller,我们可以纵观一下整个Controller,能利于我们对于整个Controller的功能有个大致的整体认识

可以看到,Nacos源码这个Controller很大的一个特点就是,整个类聚焦于Instance的相关逻辑,包含了实例的注册,删除,修改,不是通过路径的不同进行功能入口的区分而是很严格的遵循Restful风格,regsiter 对应 @PostMapping、deregister 对应 @DeleteMapping、update 对应 @PutMapping 就是一种规范,在很多系统代码中,可能都是清一色的使用@PostMaping,然后通过路径/update ; /delete ; /add等路径上的区别进行区分

/**
* Instance operation controller.
*
* @author nkorange
*/
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
   
   @Autowired
   private SwitchDomain switchDomain;
   
   @Autowired
   private PushService pushService;
   
   @Autowired
   private ServiceManager serviceManager;
   
   private DataSource pushDataSource = new DataSource() {
       
       @Override
       public String getData(PushService.PushClient client) {
           
           ObjectNode result = JacksonUtils.createEmptyJsonNode();
           try {
               result = doSrvIpxt(client.getNamespaceId(), client.getServiceName(), client.getAgent(),
                       client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0,
                       StringUtils.EMPTY, false, StringUtils.EMPTY, StringUtils.EMPTY, false);
          } catch (Exception e) {
               Loggers.SRV_LOG.warn("PUSH-SERVICE: service is not modified", e);
          }
           
           // overdrive the cache millis to push mode
           result.put("cacheMillis", switchDomain.getPushCacheMillis(client.getServiceName()));
           
           return result.toString();
      }
  };
   
   /**
    * Register new instance.
    *
    * @param request http request
    * @return 'ok' if success
    * @throws Exception any error during register
    */
   @CanDistro
   @PostMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public String register(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Deregister instances.
    *
    * @param request http request
    * @return 'ok' if success
    * @throws Exception any error during deregister
    */
   @CanDistro
   @DeleteMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public String deregister(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Update instance.
    *
    * @param request http request
    * @return 'ok' if success
    * @throws Exception any error during update
    */
   @CanDistro
   @PutMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public String update(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Batch update instance's metadata. old key exist = update, old key not exist = add.
    *
    * @param request http request
    * @return success updated instances. such as '{"updated":["2.2.2.2:8080:unknown:xxxx-cluster:ephemeral"}'.
    * @throws Exception any error during update
    * @since 1.4.0
    */
   @CanDistro
   @PutMapping(value = "/metadata/batch")
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public ObjectNode batchUpdateInstanceMatadata(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Batch delete instance's metadata. old key exist = delete, old key not exist = not operate
    *
    * @param request http request
    * @return success updated instances. such as '{"updated":["2.2.2.2:8080:unknown:xxxx-cluster:ephemeral"}'.
    * @throws Exception any error during update
    * @since 1.4.0
    */
   @CanDistro
   @DeleteMapping("/metadata/batch")
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public ObjectNode batchDeleteInstanceMatadata(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   private InstanceOperationInfo buildOperationInfo(String serviceName, String consistencyType,
           List<Instance> instances) {
       //....具体内容暂时不需要看....//
  }
   
   private List<Instance> parseBatchInstances(String instances) {
       //....具体内容暂时不需要看....//
  }
   
   private List<Instance> batchOperateMetadata(String namespace, InstanceOperationInfo instanceOperationInfo,
           Map<String, String> metadata, String action) {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Patch instance.
    *
    * @param request http request
    * @return 'ok' if success
    * @throws Exception any error during patch
    */
   @CanDistro
   @PatchMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public String patch(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Get all instance of input service.
    *
    * @param request http request
    * @return list of instance
    * @throws Exception any error during list
    */
   @GetMapping("/list")
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
   public ObjectNode list(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Get detail information of specified instance.
    *
    * @param request http request
    * @return detail information of instance
    * @throws Exception any error during get
    */
   @GetMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
   public ObjectNode detail(HttpServletRequest request) throws Exception {
      //....具体内容暂时不需要看....//
  }
   
   /**
    * Create a beat for instance.
    *
    * @param request http request
    * @return detail information of instance
    * @throws Exception any error during handle
    */
   @CanDistro
   @PutMapping("/beat")
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public ObjectNode beat(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * List all instance with health status.
    *
    * @param key (namespace##)?serviceName
    * @return list of instance
    * @throws NacosException any error during handle
    */
   @RequestMapping("/statuses")
   public ObjectNode listWithHealthStatus(@RequestParam String key) throws NacosException {
       //....具体内容暂时不需要看....//
  }
   
   private Instance parseInstance(HttpServletRequest request) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   private Instance getIpAddress(HttpServletRequest request) {
       //....具体内容暂时不需要看....//
  }
   
   private Instance getBasicIpAddress(HttpServletRequest request) {
       //....具体内容暂时不需要看....//
  }
   
   private void checkIfDisabled(Service service) throws Exception {
       //....具体内容暂时不需要看....//
  }
   
   /**
    * Get service full information with instances.
    *
    * @param namespaceId namespace id
    * @param serviceName service name
    * @param agent       agent infor string
    * @param clusters   cluster names
    * @param clientIP   client ip
    * @param udpPort     push udp port
    * @param env         env
    * @param isCheck     is check request
    * @param app         app name
    * @param tid         tenant
    * @param healthyOnly whether only for healthy check
    * @return service full information with instances
    * @throws Exception any error during handle
    */
   public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
           int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
       //....具体内容暂时不需要看....//
}

除此之外,整体代码布局很数服,按照Idea的默认布局来看,Nacos的代码基本都不会超过一屏,也就是我们一般不会需要左右滑动,这个样子观感很舒服

image-20250908140610709

服务端服务注册源码分析

初始分析

OK,回归主线,我们来看到Post对应的方法,register的代码在高版本和低版本中最外层这一块上是没区别的,所以无论哪个版本的都可以放心看

    /**
    * Register new instance.
    *
    * @param request http request
    * @return 'ok' if success
    * @throws Exception any error during register
    */
   @CanDistro
   @PostMapping
   @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
   public String register(HttpServletRequest request) throws Exception {
       // 获取命名空间
       // WebUtils工具类 optional方法用于解析Requets请求体中的信息
       // 如果请求体中有CommonParams.NAMESPACE_ID(即namespaceId)的Key即包含了信息,则使用请求体中携带的对应的value;如果没有则使用默认的DEFAULT_NAMESPACE_ID(public)
       final String namespaceId = WebUtils
              .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
       // 同上 获取服务名称serviceName 没有则默认值serviceName
       final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
       // 检查命名空间是否符合格式要求 @@ServiceName 和 GroupName@@ServiceName这种格式是合法的 其余的均不合法
       NamingUtils.checkServiceNameFormat(serviceName);
       // 利用requets中的信息封装Instance实例对象
       final Instance instance = parseInstance(request);
       // 主线核心 注册实例
       serviceManager.registerInstance(namespaceId, serviceName, instance);
       return "ok";
  }

其余的内容也是顺带看一下就行,我们来看核心部分即最后一行serviceManager调用registerInstance方法

serviceManager.registerInstance(namespaceId, serviceName, instance);

// 以命名空间,服务名,实例对象为入参
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
       // 创建一个空的Service对象
       createEmptyService(namespaceId, serviceName, instance.isEphemeral());
       // 根据命名空间和服务名称获取service对象
   //其实如果正常运行的话就是获取上面那个刚刚创建的service
       Service service = getService(namespaceId, serviceName);
       // 判空抛异常
       if (service == null) {
           throw new NacosException(NacosException.INVALID_PARAM,
                   "service not found, namespace: " + namespaceId + ", service: " + serviceName);
      }
      // 主线核心--添加实例 传入命名空间,服务名,实例对象是否为临时的标识,实例对象
       addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

继续往下看,看到addInstance方法

/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral   whether instance is ephemeral
* @param ips         instances
* @throws NacosException nacos exception
*/
// 注意看 最后一个是个Instance的可变参数
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
       throws NacosException {
   
   // 获取了一个什么key,现在也不知道做什么用的,等会详细分析,因为后面逻辑有使用到这个key,可以简单猜测一下就是实例队列中每个实例有个唯一的Key但是暂且还不确定
   String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
   // 同样还是根据命名空间以及服务名获取一个Service,这里肯定不会为空,因为为空上层方法就已经抛异常了
   Service service = getService(namespaceId, serviceName);
   
   // 使用synchronized 锁住一个Service
   synchronized (service) {
       // 这里提前说一下,ips 上层方法传过来的,是本次实例注册对应的Instance,也就是已开始从Request里面获取的参数信息。
       // 最后会放在instanList里面,为什么这里是List,说明它不仅仅只有一个,还会包含之前已经注册的Instance,放在了一个List里面
       List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
       
       // 创建了一个Instances对象,把InstanceList设置到属性里面去了
       Instances instances = new Instances();
       instances.setInstanceList(instanceList);
       
       // 最后调用了一个put方法,把key、实例列表传进去了
       consistencyService.put(key, instances);
  }
}

put方法

我们来看一下最后那行put代码的逻辑,可以看到这个put方法有很多实现类,那我们要看的是哪个呢?

image-20250908145322476

有两个方法进行判断

image-20250908145529812

然后看这个里面的实现类的put方法

@Override
public void put(String key, Record value) throws NacosException {
  // 先通过 Key 选择一个 Service,然后调用对应 Service 的 put 方法
  mapConsistencyService(key).put(key, value);
}

private ConsistencyService mapConsistencyService(String key) {
  // 判断 Key 的规则,选择不同的 service
  return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

可以看到,put方法的入参是一个String类型的key,这个Key是上层方法中我们说到过的专门构建的,我们需要待会儿回去看看那个方法的逻辑才知道Key的具体含义; 第二个入参value是一个Record类型,这个类型是啥?我们明明传入的是Instances对象

先解决第二个问题,Record是一个接口,该接口和序列化有关。而Instances类是该接口的实现类,在 Instances 对象中,有一个 instanceList 集合属性,这个属性包含了之前已经注册的实例以及新需要注册的实例对象

public class Instances implements Record {
  private List<Instance> instanceList = new ArrayList<>();
}

buildInstanceListKey方法

然后我们回到上一层方法中,看一下获得Key的那个方法

// 入参分别为 命名空间ID 服务名  是否为临时实例(默认true)
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
   return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName)
          : buildPersistentInstanceListKey(namespaceId, serviceName);
}


public static final String NAMESPACE_KEY_CONNECTOR = "##";

private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";

public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";


// 临时实例 com.alibaba.nacos.naming.iplist.ephemeral + namespaceId ## + serviceName
private static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {
       return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
  }

// 非临时实例 com.alibaba.nacos.naming.iplist + namespaceId ## + serviceName   也就是少了个 .ephemeral 部分
private static String buildPersistentInstanceListKey(String namespaceId, String serviceName) {
       return INSTANCE_LIST_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
}

继续回到上面put方法中的mapConsistencyService方法

private ConsistencyService mapConsistencyService(String key) {
  // 判断 Key 的规则,选择不同的 service
   // 从下面的分析来看 一般默认为true 所以这里返回ephemeralConsistencyService
  return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}


public static boolean matchEphemeralKey(String key) {
       // currently only instance list has ephemeral type:
       return matchEphemeralInstanceListKey(key);
}


/*
这里逻辑很简单 就是利用String的startWith方法,判断key的头部是
com.alibaba.nacos.naming.iplist 还是
com.alibaba.nacos.naming.iplist.ephemeral
如果是前者则为非临时实例 返回false
如果是后者则为临时实例 返回true
*/
public static boolean matchEphemeralInstanceListKey(String key) {
       return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
}

onPut方法

知道了mapConsistencyService返回的是ephemeralConsistencyService,我们可以回到最上面的put方法内那个put了,接下来就可以看这个类的put方法,其核心是onPut方法

@Override
   public void put(String key, Record value) throws NacosException {
       // 核心方法
       onPut(key, value);
       // 集群节点同步 这个涉及到后续的集群部分的内容 暂时可以先不看
       distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
               globalConfig.getTaskDispatchPeriod() / 2);
  }

public void onPut(String key, Record value) {
   
   // 这里还是判断刚刚那个 key 前缀,判断是否为临时实例 这里是为 true
   if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
       // 创建Datum对象,把key、和 Instances都放入Datum对象里面去
       // Datum对象key-value-timestamp 可以理解为一个三元组
       // key为传入的key value为传入的Instances对象 timestamp是一个原子Long类型
       Datum<Instances> datum = new Datum<>();
       datum.value = (Instances) value;
       datum.key = key;
       datum.timestamp.incrementAndGet();
       
       // 最后添加到了DataStore里面了,这个里面就是一个Map对象
       dataStore.put(key, datum);
  }
   
   // 分支代码先不用看
   if (!listeners.containsKey(key)) {
       return;
  }
   
   // 添加任务
   notifier.addTask(key, DataOperation.CHANGE);
}

dataStore.put方法

dataStore成员是一个DataStore对象,其结构如下,可以看到,上面的put方法其实就是调用内部Map的put方法,以key为map的key,封装出来的Datum对象为Value保存在了DataStore对象的成员Map中

@Component
public class DataStore {
   
   private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
   
   public void put(String key, Datum value) {
       dataMap.put(key, value);
  }
   
   public Datum remove(String key) {
       return dataMap.remove(key);
  }
   
   public Set<String> keys() {
       return dataMap.keySet();
  }
   
   public Datum get(String key) {
       return dataMap.get(key);
  }
   
   public boolean contains(String key) {
       return dataMap.containsKey(key);
  }
   
   /**
    * Batch get datum for a list of keys.
    *
    * @param keys of datum
    * @return list of datum
    */
   public Map<String, Datum> batchGet(List<String> keys) {
       Map<String, Datum> map = new HashMap<>(128);
       for (String key : keys) {
           Datum datum = dataMap.get(key);
           if (datum == null) {
               continue;
          }
           map.put(key, datum);
      }
       return map;
  }
   
   public int getInstanceCount() {
       int count = 0;
       for (Map.Entry<String, Datum> entry : dataMap.entrySet()) {
           try {
               Datum instancesDatum = entry.getValue();
               if (instancesDatum.value instanceof Instances) {
                   count += ((Instances) instancesDatum.value).getInstanceList().size();
              }
          } catch (Exception ignore) {
          }
      }
       return count;
  }
   
   public Map<String, Datum> getDataMap() {
       return dataMap;
  }
}

addTask方法

除此之外,onPut最后还有一个addTask方法,即onPut方法的最后一行,第一个入参key为传入的key,第二个入参为枚举类成员CHANGE,调用者notifier的类型是Notifier是一个任务类,实现了Runable接口

notifier.addTask(key, DataOperation.CHANGE);

//------------------------------------------//

// 阻塞队列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

public void addTask(String datumKey, DataOperation action) {

   if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
       return;
  }
   if (action == DataOperation.CHANGE) {
       services.put(datumKey, StringUtils.EMPTY);
  }

   // 以上都是分支逻辑
   
   // 重点在这里,tasks是一个阻塞队列,并且把key、action包装成 Pair 对象,放入了队列中
   // Pair对象是个二元组对象,Java提供的,<A,B>
   tasks.offer(Pair.with(datumKey, action));
}

总结

不知不觉,主要逻辑部分我们已经看完了,是不是还有点懵,正常,我们总结回顾一下。

接收到HTTP请求之后,对请求的部分进行校验以及获取其请求体中的命名空间,服务名,并且将request中的必要的信息封装为Instances对象

调用registerInstance方法,传入上述获得的三个属性(命名空间,服务名,Instance对象),这个步骤中的主要内容就是利用入参创建一个空的Service对象

然后调用addInstance方法,传入三个属性,并且还有是否为临时实例。在这个方法内,首先通过buildInstanceListKey方法,根据是否为临时实例,返回Key,临时实例和非临时实例的Key的区别仅仅在于字符串头部不一样,最后调用consistencyService.put方法传入key和instance实例,在这个put内会根据key调用不同service的put方法,我们假设默认情况下即临时实例下,会将其存储到DataStore的成员Map,该Map最终结构为

<String key , Datum<key,value(Instances对象),timestamp(原子Long类型)> >

最后调用了 addTask 方法,包装成 Pair 对象,放入到一个阻塞队列里面去了

确实有点绕,主要是有很多put方法,put内有put还有不同service的选择,put内还有onPut,可以多多看一下,流程图如下

目前我们这里还只是介绍了大致的逻辑和流程,很多细节点和功能点没有分析,例如保存到Map中和阻塞队列中的作用是什么等问题都还没分析,目前先了解这部分

 

编程 Java 项目