Nacos源码学习计划-Day06-服务端检测不健康实例
ZealSinger 发布于 阅读:33 技术文档
其实我们在《02-客户端如何发其注册》这一章节中,我们提到和分析过客户端发送心跳的逻辑,也就是通过这个心跳,让服务端知道客户端的状态,这个BeatInfo最终也还是落到了一个HTTP请求,转门发送到了Nacos服务端的心跳检测接口。
所以我们接下来可以去Nacos源码中看看服务端实例心跳接口源码
服务端实例心跳接口源码分析
之前有分析过客户端的发送心跳包的逻辑,所以这里我们不过多的再说了,通过分析可以得知最终发送的URL和请求体结构如下图
找到服务端中对应的接口,删除一些和我们主要逻辑没有关系的代码后,大致的内容就是如下
"/beat") (
parser = NamingResourceParser.class, action = ActionTypes.WRITE) (
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
// 从request中获取请求参数、 namespaceId 、 serviceName
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 通过 命名空间、服务名等信息,从内存注册表中获取 instance 实例对象
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果获取实例为空,会重新调用注册的方法
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
// 重新注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 获取 service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 提交客户端心跳操作任务,更改 lastBeat 属性
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
在后半段逻辑中我们可以看到有个processClientBeat方法,其逻辑如下,主要任务是对于客户端心跳的一个异步响应任务
实例心跳接口总的来说,先通过请求参数,在我们内存注册表中找 Instance,如果不找到会重新进行注册。然后提交了一个ClientBeatProcessor
异步任务,在异步任务中,会找到相同集群下的所有临时实例列表,然后进行 for 循环,根据 ip + port 来进行判断,如果找到相同 ip + port 的 instance 会立马修改 lastBeat 属性,并且最后也会判断实例是否健康,如果不健康,会重新标记为健康的状态
// 先创建定义了一个ClientBeatProcessor异步任务对象 然后通过HealthCheckReactor执行这个异步任务
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
// ClientBeatProcessor是一个Runable的实现类 主要逻辑如下
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
// 获取当前心跳包对应的 ip、clusterName
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取 当前 cluster 下所有临时实例
List<Instance> instances = cluster.allIPs(true);
// 遍历临时实例
for (Instance instance : instances) {
// 判断 ip 、 port ,只操作当前发送心跳检查的 instance 实例
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 把 最后心跳时间 修改为当前时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
// 如果之前为不健康的状态
if (!instance.isHealthy()) {
// 更改为健康
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
/*
HealthCheckReactor类是 Nacos 命名服务中用于管理健康检查任务调度的核心组件。它的主要作用是通过线程池调度健康检查任务,支持定时执行和取消任务。
以下是该类的主要逻辑:
1. 健康检查任务调度
- scheduleCheck(HealthCheckTask task)方法用于调度健康检查任务,根据任务的检查间隔时间来安排执行
- 使用 GlobalExecutor.scheduleNamingHealth()方法来安排任务执行
2. 客户端心跳检查任务管理
- scheduleCheck(ClientBeatCheckTask task)方法用于调度客户端心跳检查任务,固定延迟5秒执行
- 使用 futureMap来存储任务的调度Future,避免重复调度
- cancelCheck(ClientBeatCheckTask task)方法用于取消已调度的心跳检查任务,并从 futureMap中移除
3. **即时任务调度**
- scheduleNow(Runnable task)方法用于立即调度任务执行(无延迟)
该类通过 GlobalExecutor来实际执行任务调度,使用 `ConcurrentHashMap` 来管理已调度任务的引用,确保任务可以被正确取消。
*/
服务端感知和处理不健康实例
上述我们分析了客户端的心跳包到了服务端后如何处理的,但是心跳包是健康客户端5s自己发送包所以能被服务端检测到,那么对于已经不健康的包肯定是不会发送心跳包的,那么Nacos服务端是如何检测和感知的呢?
其实,早在我们之前看 服务端注册接口 的逻辑中,我们没看的分支代码中,有个 createEmptyService 方法
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的Service
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 通过命名空间、服务名获取一个Service,为空则抛出异常
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
在 createEmptyService
方法中,又会开启一个异步任务,这个异步任务的作用就是:检查有哪些客户端是不健康的状态,如果不健康就需要对它进行处理。我这里把开启异步任务调用链路列举出来,代码就不一一贴了:createEmptyService() -> createServiceIfAbsent() -> putServiceAndInit(service) -> service.init() 在 service.init()
在这个方法中,开启了健康检查异步任务,代码如下:
public void init() {
// 开启 clientBeatCheckTask 延时任务,每 5s 执行一次
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
可以看到这里提交的是一个clientBeatCheckTask的任务,所以我们还是得去看看他的run方法中的逻辑,分析代码可以知道主要逻辑是如下
-
第一个循环的主要作用是,找出哪些 Instance 是不健康的,如果不健康就需要把 healthy 属性更改为 false。那是怎么判断不健康的呢?利用的就是 lastBeat 属性。如果是健康的,那么每间隔 5s 客户端就会调一次 实例心跳接口,lastBeat 属性会被更新为当前时间。如果不健康,那么 lastBeat 属性是不会变化,一旦超过 15s 还没变化的,就会被这个定时任务标记为不健康了。
-
第二个循环的主要作用是,找出哪些 Instance 是可以删除的,同样还是利用 lastBeat 属性,一旦超过 30s 没更新,Nacos 会直接把该 Instance 删除掉。
public void run() {
try {
// 获取全部的临时实例
List<Instance> instances = service.allIPs(true);
// 遍历每一个临时实例
for (Instance instance : instances) {
// 判断 当前时间 - 实例最后心跳时间 > 心跳超时时间
// instance.getInstanceHeartBeatTimeOut() 取常量 15s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
// marked 默认为 false,所以这个 if 成立
if (!instance.isMarked()) {
// 如果这个 instance 还是健康的状态
if (instance.isHealthy()) {
// 最终就改成 不健康 状态
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
// 事件发布监听事件, 通过 upd 协议来发送通知
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
// 又一次遍历全部的临时实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 如果 当前时间 - 最后一次心跳时间 > 心跳删除时间
// instance.getIpDeleteTimeout() 取常量 30s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 直接把对应的 instance 从注册表中删除
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
文章标题:Nacos源码学习计划-Day06-服务端检测不健康实例
文章链接:https://zealsinger.xyz/?post=31
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫