Nacos源码学习计划-Day03-服务端如何处理客户端的注册请求(上)
ZealSinger 发布于 阅读:131 技术文档
我们前面有了解到,客户端的自动注册是借助Spring的消息监听机制,在Spring启动后自动发送了HTTP的注册请求到服务端,那么今天我们来看看,这个请求打到Nacos服务端之后,服务端是如何处理的,这个时候就要看我们的下载的Nacos源码了
分析请求入口
既然要看服务端如何处理这个HTTP请求的,那就很明显需要找到对应的URL的Controller,这个该如何分析和查找?可以从一下两个方法尝试寻找
-
直接搜索
因为我们分析客户端代码,已经分析过知道了最终那个请求URL为/nacos/v1/ns/instance,其实通过这个信息,和IDEA的端点查找插件和全局搜索,其实就能很方便找到对应的Controller
-
分析法
分析法就是根据功能和Nacos的模块分类进行搜索。例如当前这个,属于注册中心功能,也就是我们核心模块中的Nameing Service,可以看到Nacos中源码中刚好有一个Name模块,然后我们找入口自然是找Controller,所以到Controler目录下查找,功能和注册实例相关的,自然就是找类似于InstanceController;RegistryController类似的Controller,这样子就能很快找到
整体分析
找到对应的Controller,我们可以纵观一下整个Controller,能利于我们对于整个Controller的功能有个大致的整体认识
可以看到,Nacos源码这个Controller很大的一个特点就是,整个类聚焦于Instance的相关逻辑,包含了实例的注册,删除,修改,不是通过路径的不同进行功能入口的区分而是很严格的遵循Restful风格,regsiter 对应 @PostMapping、deregister 对应 @DeleteMapping、update 对应 @PutMapping 就是一种规范,在很多系统代码中,可能都是清一色的使用@PostMaping,然后通过路径/update ; /delete ; /add等路径上的区别进行区分
/**
* Instance operation controller.
*
* @author nkorange
*/
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") (
public class InstanceController {
private SwitchDomain switchDomain;
private PushService pushService;
private ServiceManager serviceManager;
private DataSource pushDataSource = new DataSource() {
public String getData(PushService.PushClient client) {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
try {
result = doSrvIpxt(client.getNamespaceId(), client.getServiceName(), client.getAgent(),
client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0,
StringUtils.EMPTY, false, StringUtils.EMPTY, StringUtils.EMPTY, false);
} catch (Exception e) {
Loggers.SRV_LOG.warn("PUSH-SERVICE: service is not modified", e);
}
// overdrive the cache millis to push mode
result.put("cacheMillis", switchDomain.getPushCacheMillis(client.getServiceName()));
return result.toString();
}
};
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public String register(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Deregister instances.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during deregister
*/
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public String deregister(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Update instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during update
*/
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public String update(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Batch update instance's metadata. old key exist = update, old key not exist = add.
*
* @param request http request
* @return success updated instances. such as '{"updated":["2.2.2.2:8080:unknown:xxxx-cluster:ephemeral"}'.
* @throws Exception any error during update
* @since 1.4.0
*/
value = "/metadata/batch") (
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public ObjectNode batchUpdateInstanceMatadata(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Batch delete instance's metadata. old key exist = delete, old key not exist = not operate
*
* @param request http request
* @return success updated instances. such as '{"updated":["2.2.2.2:8080:unknown:xxxx-cluster:ephemeral"}'.
* @throws Exception any error during update
* @since 1.4.0
*/
"/metadata/batch") (
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public ObjectNode batchDeleteInstanceMatadata(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
private InstanceOperationInfo buildOperationInfo(String serviceName, String consistencyType,
List<Instance> instances) {
//....具体内容暂时不需要看....//
}
private List<Instance> parseBatchInstances(String instances) {
//....具体内容暂时不需要看....//
}
private List<Instance> batchOperateMetadata(String namespace, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata, String action) {
//....具体内容暂时不需要看....//
}
/**
* Patch instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during patch
*/
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public String patch(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Get all instance of input service.
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/
"/list") (
parser = NamingResourceParser.class, action = ActionTypes.READ) (
public ObjectNode list(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Get detail information of specified instance.
*
* @param request http request
* @return detail information of instance
* @throws Exception any error during get
*/
parser = NamingResourceParser.class, action = ActionTypes.READ) (
public ObjectNode detail(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Create a beat for instance.
*
* @param request http request
* @return detail information of instance
* @throws Exception any error during handle
*/
"/beat") (
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public ObjectNode beat(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* List all instance with health status.
*
* @param key (namespace##)?serviceName
* @return list of instance
* @throws NacosException any error during handle
*/
"/statuses") (
public ObjectNode listWithHealthStatus( String key) throws NacosException {
//....具体内容暂时不需要看....//
}
private Instance parseInstance(HttpServletRequest request) throws Exception {
//....具体内容暂时不需要看....//
}
private Instance getIpAddress(HttpServletRequest request) {
//....具体内容暂时不需要看....//
}
private Instance getBasicIpAddress(HttpServletRequest request) {
//....具体内容暂时不需要看....//
}
private void checkIfDisabled(Service service) throws Exception {
//....具体内容暂时不需要看....//
}
/**
* Get service full information with instances.
*
* @param namespaceId namespace id
* @param serviceName service name
* @param agent agent infor string
* @param clusters cluster names
* @param clientIP client ip
* @param udpPort push udp port
* @param env env
* @param isCheck is check request
* @param app app name
* @param tid tenant
* @param healthyOnly whether only for healthy check
* @return service full information with instances
* @throws Exception any error during handle
*/
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
//....具体内容暂时不需要看....//
}
除此之外,整体代码布局很数服,按照Idea的默认布局来看,Nacos的代码基本都不会超过一屏,也就是我们一般不会需要左右滑动,这个样子观感很舒服
服务端服务注册源码分析
初始分析
OK,回归主线,我们来看到Post对应的方法,register的代码在高版本和低版本中最外层这一块上是没区别的,所以无论哪个版本的都可以放心看
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public String register(HttpServletRequest request) throws Exception {
// 获取命名空间
// WebUtils工具类 optional方法用于解析Requets请求体中的信息
// 如果请求体中有CommonParams.NAMESPACE_ID(即namespaceId)的Key即包含了信息,则使用请求体中携带的对应的value;如果没有则使用默认的DEFAULT_NAMESPACE_ID(public)
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 同上 获取服务名称serviceName 没有则默认值serviceName
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 检查命名空间是否符合格式要求 @@ServiceName 和 GroupName@@ServiceName这种格式是合法的 其余的均不合法
NamingUtils.checkServiceNameFormat(serviceName);
// 利用requets中的信息封装Instance实例对象
final Instance instance = parseInstance(request);
// 主线核心 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
其余的内容也是顺带看一下就行,我们来看核心部分即最后一行serviceManager调用registerInstance方法
serviceManager.registerInstance(namespaceId, serviceName, instance);
// 以命名空间,服务名,实例对象为入参
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的Service对象
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 根据命名空间和服务名称获取service对象
//其实如果正常运行的话就是获取上面那个刚刚创建的service
Service service = getService(namespaceId, serviceName);
// 判空抛异常
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 主线核心--添加实例 传入命名空间,服务名,实例对象是否为临时的标识,实例对象
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
继续往下看,看到addInstance方法
/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral whether instance is ephemeral
* @param ips instances
* @throws NacosException nacos exception
*/
// 注意看 最后一个是个Instance的可变参数
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 获取了一个什么key,现在也不知道做什么用的,等会详细分析,因为后面逻辑有使用到这个key,可以简单猜测一下就是实例队列中每个实例有个唯一的Key但是暂且还不确定
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 同样还是根据命名空间以及服务名获取一个Service,这里肯定不会为空,因为为空上层方法就已经抛异常了
Service service = getService(namespaceId, serviceName);
// 使用synchronized 锁住一个Service
synchronized (service) {
// 这里提前说一下,ips 上层方法传过来的,是本次实例注册对应的Instance,也就是已开始从Request里面获取的参数信息。
// 最后会放在instanList里面,为什么这里是List,说明它不仅仅只有一个,还会包含之前已经注册的Instance,放在了一个List里面
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 创建了一个Instances对象,把InstanceList设置到属性里面去了
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 最后调用了一个put方法,把key、实例列表传进去了
consistencyService.put(key, instances);
}
}
put方法
我们来看一下最后那行put代码的逻辑,可以看到这个put方法有很多实现类,那我们要看的是哪个呢?
有两个方法进行判断
-
Debug方法。因为我们能正常跑Nacos服务,用我们本地的另外一个服务注册到本地Nacos服务上,然后在这里打断点的方式看看跳入到哪个中
-
看注入。consistencyService注入的肯定是接口的实现类,既然有多个实现类那么肯定会需要指定Bean,Bean是唯一的,如下,这里就是找DelegateConsistencyServiceImpl这个实现类
然后看这个里面的实现类的put方法
public void put(String key, Record value) throws NacosException {
// 先通过 Key 选择一个 Service,然后调用对应 Service 的 put 方法
mapConsistencyService(key).put(key, value);
}
private ConsistencyService mapConsistencyService(String key) {
// 判断 Key 的规则,选择不同的 service
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
可以看到,put方法的入参是一个String类型的key,这个Key是上层方法中我们说到过的专门构建的,我们需要待会儿回去看看那个方法的逻辑才知道Key的具体含义; 第二个入参value是一个Record类型,这个类型是啥?我们明明传入的是Instances对象
先解决第二个问题,Record是一个接口,该接口和序列化有关。而Instances类是该接口的实现类,在 Instances 对象中,有一个 instanceList 集合属性,这个属性包含了之前已经注册的实例以及新需要注册的实例对象
public class Instances implements Record {
private List<Instance> instanceList = new ArrayList<>();
}
buildInstanceListKey方法
然后我们回到上一层方法中,看一下获得Key的那个方法
// 入参分别为 命名空间ID 服务名 是否为临时实例(默认true)
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName)
: buildPersistentInstanceListKey(namespaceId, serviceName);
}
public static final String NAMESPACE_KEY_CONNECTOR = "##";
private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";
public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";
// 临时实例 com.alibaba.nacos.naming.iplist.ephemeral + namespaceId ## + serviceName
private static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {
return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
}
// 非临时实例 com.alibaba.nacos.naming.iplist + namespaceId ## + serviceName 也就是少了个 .ephemeral 部分
private static String buildPersistentInstanceListKey(String namespaceId, String serviceName) {
return INSTANCE_LIST_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
}
继续回到上面put方法中的mapConsistencyService方法
private ConsistencyService mapConsistencyService(String key) {
// 判断 Key 的规则,选择不同的 service
// 从下面的分析来看 一般默认为true 所以这里返回ephemeralConsistencyService
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
public static boolean matchEphemeralKey(String key) {
// currently only instance list has ephemeral type:
return matchEphemeralInstanceListKey(key);
}
/*
这里逻辑很简单 就是利用String的startWith方法,判断key的头部是
com.alibaba.nacos.naming.iplist 还是
com.alibaba.nacos.naming.iplist.ephemeral
如果是前者则为非临时实例 返回false
如果是后者则为临时实例 返回true
*/
public static boolean matchEphemeralInstanceListKey(String key) {
return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
}
onPut方法
知道了mapConsistencyService返回的是ephemeralConsistencyService,我们可以回到最上面的put方法内那个put了,接下来就可以看这个类的put方法,其核心是onPut方法
public void put(String key, Record value) throws NacosException {
// 核心方法
onPut(key, value);
// 集群节点同步 这个涉及到后续的集群部分的内容 暂时可以先不看
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
// 这里还是判断刚刚那个 key 前缀,判断是否为临时实例 这里是为 true
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 创建Datum对象,把key、和 Instances都放入Datum对象里面去
// Datum对象key-value-timestamp 可以理解为一个三元组
// key为传入的key value为传入的Instances对象 timestamp是一个原子Long类型
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 最后添加到了DataStore里面了,这个里面就是一个Map对象
dataStore.put(key, datum);
}
// 分支代码先不用看
if (!listeners.containsKey(key)) {
return;
}
// 添加任务
notifier.addTask(key, DataOperation.CHANGE);
}
dataStore.put方法
dataStore成员是一个DataStore对象,其结构如下,可以看到,上面的put方法其实就是调用内部Map的put方法,以key为map的key,封装出来的Datum对象为Value保存在了DataStore对象的成员Map中
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
dataMap.put(key, value);
}
public Datum remove(String key) {
return dataMap.remove(key);
}
public Set<String> keys() {
return dataMap.keySet();
}
public Datum get(String key) {
return dataMap.get(key);
}
public boolean contains(String key) {
return dataMap.containsKey(key);
}
/**
* Batch get datum for a list of keys.
*
* @param keys of datum
* @return list of datum
*/
public Map<String, Datum> batchGet(List<String> keys) {
Map<String, Datum> map = new HashMap<>(128);
for (String key : keys) {
Datum datum = dataMap.get(key);
if (datum == null) {
continue;
}
map.put(key, datum);
}
return map;
}
public int getInstanceCount() {
int count = 0;
for (Map.Entry<String, Datum> entry : dataMap.entrySet()) {
try {
Datum instancesDatum = entry.getValue();
if (instancesDatum.value instanceof Instances) {
count += ((Instances) instancesDatum.value).getInstanceList().size();
}
} catch (Exception ignore) {
}
}
return count;
}
public Map<String, Datum> getDataMap() {
return dataMap;
}
}
addTask方法
除此之外,onPut最后还有一个addTask方法,即onPut方法的最后一行,第一个入参key为传入的key,第二个入参为枚举类成员CHANGE,调用者notifier的类型是Notifier是一个任务类,实现了Runable接口
notifier.addTask(key, DataOperation.CHANGE);
//------------------------------------------//
// 阻塞队列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 以上都是分支逻辑
// 重点在这里,tasks是一个阻塞队列,并且把key、action包装成 Pair 对象,放入了队列中
// Pair对象是个二元组对象,Java提供的,<A,B>
tasks.offer(Pair.with(datumKey, action));
}
总结
不知不觉,主要逻辑部分我们已经看完了,是不是还有点懵,正常,我们总结回顾一下。
接收到HTTP请求之后,对请求的部分进行校验以及获取其请求体中的命名空间,服务名,并且将request中的必要的信息封装为Instances对象
调用registerInstance方法,传入上述获得的三个属性(命名空间,服务名,Instance对象),这个步骤中的主要内容就是利用入参创建一个空的Service对象
然后调用addInstance方法,传入三个属性,并且还有是否为临时实例。在这个方法内,首先通过buildInstanceListKey方法,根据是否为临时实例,返回Key,临时实例和非临时实例的Key的区别仅仅在于字符串头部不一样,最后调用consistencyService.put方法传入key和instance实例,在这个put内会根据key调用不同service的put方法,我们假设默认情况下即临时实例下,会将其存储到DataStore的成员Map,该Map最终结构为
<String key , Datum<key,value(Instances对象),timestamp(原子Long类型)> >
最后调用了 addTask 方法,包装成 Pair 对象,放入到一个阻塞队列里面去了
确实有点绕,主要是有很多put方法,put内有put还有不同service的选择,put内还有onPut,可以多多看一下,流程图如下
目前我们这里还只是介绍了大致的逻辑和流程,很多细节点和功能点没有分析,例如保存到Map中和阻塞队列中的作用是什么等问题都还没分析,目前先了解这部分
文章标题:Nacos源码学习计划-Day03-服务端如何处理客户端的注册请求(上)
文章链接:https://zealsinger.xyz/?post=28
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫