«

Nacos源码学习计划-Day13-集群-Nacos中Raft协议的具体实现

ZealSinger 发布于 阅读:81 技术文档


我们前面介绍了Raft和Distro协议的相关理论知识,这里我们尝试走进Nacos底层对这两个协议的实现进行进一步的学习和了解。

Nacos中对于Raft协议的实现

我们之前提到过,Nacos的强一致性共识算法一开始就是从业内Raft协议的成熟工业方案中进行选择,最后选择了阿里内部蚂蚁集团对于Raft的实现JRaft,但是这个是在Nacos 2.x版本后引入的,在我们当前的1.4.x的版本的Raft版本是旧的并且存在些许问题,这个我们在后续分析的时候会说明;如果只想学习JRaft的同学可以跳转sofastack/sofa-jraft: A production-grade java implementation of RAFT consensus algorithm.官方GitHub仓库了解和学习

Raft协议下的实例注册

我们之前有提到过,分布式环境下,Raft协议只会允许Leader节点具备读写信息的能力,其余节点接收到的读写操作会转移到Leader节点上进行操作,那么我们回过头来看看“实例注册”的相关接口

在调用ConsistencyService接口put方法的时候,我们有多个实现可以选择,我们之前是根据Bean注入名字从而看的是DelegateConsistencyServiceImpl中的逻辑,这次我们来看RaftConsistencyServiceImpl中的逻辑

image-20251111102625730

RaftConsistencyServiceImp put逻辑如下

    @Override
   public void put(String key, Record value) throws NacosException {
       // 检查是否已经stop了 没有什么业务逻辑 不做分析了
       checkIsStopWork();
       try {
           raftCore.signalPublish(key, value);
      } catch (Exception e) {
           Loggers.RAFT.error("Raft put failed.", e);
           throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);
      }
  }

checkIsStopWork方法比较简单,我们直接来看raftCore.signalPublish(key, value)的逻辑,代码内容很长,我们慢慢的从上往下分析,我们利用虚线进行分割来进行逐步的了解

/**
* Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.
*
* @param key   key
* @param value value
* @throws Exception any exception during publish
*/
public void signalPublish(String key, Record value) throws Exception {
   // 和进入该方法前的stopWork
   if (stopWork) {
       throw new IllegalStateException("old raft protocol already stop work");
  }
//------------------------------------------//    
   //判断自己是否为Leader节点 如果是则处理 如果不是则转发给Leader
   if (!isLeader()) {
       // 将方法入参封装为请求所需格式
       ObjectNode params = JacksonUtils.createEmptyJsonNode();
       params.put("key", key);
       params.replace("value", JacksonUtils.transferToJsonNode(value));
       Map<String, String> parameters = new HashMap<>(1);
       parameters.put("key", key);
       // 找到Leader节点
       final RaftPeer leader = getLeader();
       // 转发给Leader节点进行处理
       raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
       return;
  }
//------------------------------------------//
   // 加锁 保证线程安全
   OPERATE_LOCK.lock();
   try {
       // 将入参封装为Datum对象 这个在正常的单机环境下注册实例中也会存在这样的操作
       final long start = System.currentTimeMillis();
       final Datum datum = new Datum();
       datum.key = key;
       datum.value = value;
       if (getDatum(key) == null) {
           datum.timestamp.set(1L);
      } else {
           datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
      }
       
       ObjectNode json = JacksonUtils.createEmptyJsonNode();
       json.replace("datum", JacksonUtils.transferToJsonNode(datum));
       json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
       // 核心方法 该方法主要是Leader节点本身处理数据的逻辑,先会把数据写入到本地文件,然后立马同步给内存中的注册表
       // peers是整个集群Raft协议中所有Raft节点的集合,local方法的逻辑是返回当前节点 该方法的具体分析在下面
       onPublish(datum, peers.local());
       
       final String content = json.toString();
       // 通过 闭锁 来控制半数以上节点 peers.majorityCount() 获取集群半数以上节点数量
       final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
       // 遍历 同步给其他节点
       for (final String server : peers.allServersIncludeMyself()) {
           if (isLeader(server)) {
               latch.countDown();
               continue;
          }
           final String url = buildUrl(server, API_ON_PUB);
           // 通过HTTP请求的方式同步其他节点
           HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
               @Override
               public void onReceive(RestResult<String> result) {
                   if (!result.ok()) {
                       Loggers.RAFT
                              .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                       datum.key, server, result.getCode());
                       return;
                  }
                   latch.countDown();
              }
               
               @Override
               public void onError(Throwable throwable) {
                   Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
              }
               
               @Override
               public void onCancel() {
               
              }
          });
           
      }
       // 等待半数以上的节点同步成功
       if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
           // only majority servers return success can we consider this update success
           Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
           throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
      }
       
       long end = System.currentTimeMillis();
       Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
  } finally {
       OPERATE_LOCK.unlock();
  }
}

onPublish方法解析

我们对上面的onPublish()方法进行详细的分析

/**
* Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.
可以看到这个解释注释很明确的说明了该方法的作用
发布。如果是领导者,则提交发布到存储。如果不是领导者,请停止发布,因为应该向领导者发出信号。
*
* @param datum datum
* @param source source raft peer
* @throws Exception any exception during publish
*/
public void onPublish(Datum datum, RaftPeer source) throws Exception {
   if (stopWork) {
       throw new IllegalStateException("old raft protocol already stop work");
  }
   // 这里的peers是RaftCore类的成员 是一个RaftPeerSet类的对象 底层用名为peers的HashMap<String,RaftPeer>类型的成员维护了所有的Raft节点信息;value的类型RaftPeer就是单个Raft对象,里面封装了IP,投票给谁,任期term,状态status(默认为Follower)等信息
   RaftPeer local = peers.local();
   if (datum.value == null) {
       Loggers.RAFT.warn("received empty datum");
       throw new IllegalStateException("received empty datum");
  }
   // 如果不是Leader节点则报错处理
   if (!peers.isLeader(source.ip)) {
       Loggers.RAFT
              .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                       JacksonUtils.toJson(getLeader()));
       throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
  }
   // source为入参 其实也是从peers中获取到的当前raft节点 这里主要是为了对比任期版本 确保最新
   // 校验请求的任期(term)是否有效:发起者的任期不能低于当前节点的任期
   // 避免处理旧任期(已过期Leader)的无效请求,保证数据一致性
   if (source.term.get() < local.term.get()) {
       Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
               JacksonUtils.toJson(local));
       throw new IllegalStateException(
               "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
  }
   // 重置当前节点(若为Leader)的Leader任期超时计时器
   // Leader需定期重置该计时器,避免因超时被Follower判定为宕机,从而触发重新选举
   local.resetLeaderDue();
   
    // 校验数据是否需要持久化(通过Key的格式匹配:Nacos中持久化服务的Key有固定前缀)
   if (KeyBuilder.matchPersistentKey(datum.key)) {
       //在这里 Raft 会先把数据写到本地文件当中
       raftStore.write(datum);
  }
   // 将数据更新到当前节点的本地内存缓存(datums是内存级数据存储,供查询快速访问)
   datums.put(datum.key, datum);
   // 如果当前节点是Leader,处理任期term逻辑递增(Raft协议的任期递增核心规矩)
   if (isLeader()) {
       local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
  } else {
       //如果不是Leader 当本地任期递增后会超过Leader任期时 强制与Leader的任期term进行对齐
       if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
           //set leader term:
           getLeader().term.set(source.term.get());
           local.term.set(getLeader().term.get());
      } else {
           // 没有超过的话就正常递增
           local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
      }
  }
   // 将更新后的任期term信息存储持久化到Raft存储,保证重启后能恢复数据,不丢失
   raftStore.updateTerm(local.term.get());
   
  // 通过发布事件来同步内存注册表,和注册的逻辑一样的        
   NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
   Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

raftStore.write(datum)的逻辑中,会把我们的实例信息,以JSON的形式持久化到Nacos服务端的目录下,并且文件名是以命名空间##分组@@服务名来命名,这些信息会在下此Nacos服务端重启的时候加载到内存注册表中

image-20251111204425292

同理raftStore.updateTerm(local.term.get())也会将Term任期信息写到一个固定文件名为META_FILE_NAME变量的本地文件,也会在下次启动的时候加载

image-20251111204508619

持久化完成之后,也会同步内存注册表,通过发布ValueChangeEvent事件,大家可以去找找对应处理这个事件的逻辑,直接通过 IDEA 的全文搜索就能找到了,PersistentNotifier类中有一个onEvent方法,调用了notify方法,最终也是调用了listener.onChange方法,后续的逻辑就和实例注册的逻辑一样,同步到内存注册表。

总结

在数据的写入实现,并没有两阶段提交的步骤,而是 Leader 自身同步成功之后,再去同步其他的集群节点。哪怕集群节点同步失败了,或者没有半数以上的节点支持,Leader 节点自身的数据也不会进行回滚。所以,在这里 Nacos 只是实现了Raft的简化版本,这里也不用太纠结存在什么问题,因为后期这一套是会被剔除的。

那么接下来我们就一起来看一看 Leader 节点是如何选举的吧。

Raft协议下Leader节点的选举

我们知道,Raft协议必须需要一个Leader节点,所以这个选举Leader节点的过程应该也是一个Nacos启动之后就会立马执行的一个任务逻辑。刚刚在上面查看Raft协议下的实例注册的逻辑的时候,可以看到Racft协议相关的核心内容在RaftCore这个核心类下,在这个类下,我们可以看到有一个@PostConstruct注解的init()方法

/**
* Init raft core.
*
* @throws Exception any exception during init
*/
@PostConstruct
public void init() throws Exception {
   Loggers.RAFT.info("initializing Raft sub-system");
   final long start = System.currentTimeMillis();
   // 加载本地文件中的datums
   raftStore.loadDatums(notifier, datums);
   // 加载本地文件中的任期
   setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
   
   Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
   // 是否初始化的标识
   initialized = true;
   
   Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
   // Leader选举任务
   masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
   // 心跳检测任务
   heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
   
   // 判断版本 和我们当前主线不搭边 暂时不管
   versionJudgement.registerObserver(isAllNewVersion -> {
       stopWork = isAllNewVersion;
       if (stopWork) {
           try {
               shutdown();
               raftListener.removeOldRaftMetadata();
          } catch (NacosException e) {
               throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
          }
      }
  }, 100);
   
   NotifyCenter.registerSubscriber(notifier);
   Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
           GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

可以看到这里主要的两个Task任务的加入任务队列和执行,这两个任务类型是MasterElectionHeartBeat

MasterElection任务类解析

MasterElection的主要逻辑如下

public class MasterElection implements Runnable {
   @Override
   public void run() {
       try {
           // 如果停止工作 停止选举
           if (stopWork) {
               return;
          }
           // 如果有节点没有准备好/初始化完毕 停止选举
           if (!peers.isReady()) {
               return;
          }
           // 获得本地节点信息 减少Leader心跳超时倒计时(每次任务执行递减一个周期)
           // local.leaderDueMs (0, GlobalExecutor.LEADER_TIMEOUT_MS)的随机数 也就是我们Raft协议理论篇提到过的初始的一个随机时间
           RaftPeer local = peers.local();
           local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
           // 大于0 则说明还没超时 超时时间内收到了Leader心跳 不能发起选举
           if (local.leaderDueMs > 0) {
               return;
          }
           
           // 小于0则说明Leader心跳超过 认为Leader宕机 重置超时时间并发起选举
           // leaderDueMs: 控制 Follower 节点在多久没收到 Leader 心跳后,会认为 Leader 已经死亡,并触发新一轮的 Leader 选举。
  // heartbeatDueMs: 控制 Leader 节点每隔多久需要向所有 Follower 节点发送一次心跳,以证明自己还活着,并维持其 Leader 地位。
           local.resetLeaderDue();
           local.resetHeartbeatDue();
           
           // 发起投票的核心方法
           sendVote();
      } catch (Exception e) {
           Loggers.RAFT.warn("[RAFT] error while master election {}", e);
      }
       
  }
   
   //----------------------//
   
   private void sendVote() {
       // 获取当前节点
       RaftPeer local = peers.get(NetUtils.localServer());
       Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
               local.term);
       
       // 重置leader属性为null 遍历Raft节点的seet集合,通过循环让raft节点的voteFor投票置为null清空状态重新投票
       peers.reset();
       //将当前节点转换为Candidate状态,准备竞选Leader
       local.term.incrementAndGet();
       local.voteFor = local.ip;
       local.state = RaftPeer.State.CANDIDATE;
       // 对除了自己以外的raft节点发起异步的http请求进行投票
       Map<String, String> params = new HashMap<>(1);
       params.put("vote", JacksonUtils.toJson(local)); // 封装本节点信息,包含任期等等
       for (final String server : peers.allServersWithoutMySelf()) {
           final String url = buildUrl(server, API_VOTE);
           try {
               HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                   @Override
                   public void onReceive(RestResult<String> result) {
                       if (!result.ok()) {
                           Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                           return;
                      }
                       // 等待对方响应 peer中包含了对方的任期和投票结果等信息
                       RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
                       
                       Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                       // 根据响应判断是否得到了一半以上的支持,下面详细解释
                       peers.decideLeader(peer);
                       
                  }
                   
                   @Override
                   public void onError(Throwable throwable) {
                       Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
                  }
                   
                   @Override
                   public void onCancel() {
                   
                  }
              });
          } catch (Exception e) {
               Loggers.RAFT.warn("error while sending vote to server: {}", server);
          }
      }
  }
}

decideLeader方法的逻辑如下

/**
* Calculate and decide which peer is leader. If has new peer has more than half vote, change leader to new peer.
*
* @param candidate new candidate
* @return new leader if new candidate has more than half vote, otherwise old leader
*/
public RaftPeer decideLeader(RaftPeer candidate) {
   // 记录本次投票情况
   peers.put(candidate.ip, candidate);
   
   SortedBag ips = new TreeBag();
   int maxApproveCount = 0;
   String maxApprovePeer = null;
   // 统计投票情况
   for (RaftPeer peer : peers.values()) {
       if (StringUtils.isEmpty(peer.voteFor)) {
           continue;
      }
       
       ips.add(peer.voteFor);
       if (ips.getCount(peer.voteFor) > maxApproveCount) {
           maxApproveCount = ips.getCount(peer.voteFor);
           maxApprovePeer = peer.voteFor;
      }
  }
   // 判断投票数量是否超过了半数 如果有则将自己设置为Leader状态
   if (maxApproveCount >= majorityCount()) {
       RaftPeer peer = peers.get(maxApprovePeer);
       peer.state = RaftPeer.State.LEADER;
       
       if (!Objects.equals(leader, peer)) {
           leader = peer;
           ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
           Loggers.RAFT.info("{} has become the LEADER", leader.ip);
      }
  }
   
   return leader;
}

其余节点处理投票请求

除此之外,我们在RaftCore中还能看到一个receivedVote方法,这个方法是别的节点收到投票请求的时候会触发的方法,我们也顺便来看看

public synchronized RaftPeer receivedVote(RaftPeer remote) {
   if (stopWork) {
       throw new IllegalStateException("old raft protocol already stop work");
  }
   if (!peers.contains(remote)) { // 检查请求节点是否在集群中
       throw new IllegalStateException("can not find peer: " + remote.ip);
  }
   
   RaftPeer local = peers.get(NetUtils.localServer()); // 本地节点信息
   if (remote.term.get() <= local.term.get()) { // 若请求节点的任期 <= 本地任期,拒绝投票
       String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
       Loggers.RAFT.info(msg);
       if (StringUtils.isEmpty(local.voteFor)) {
           local.voteFor = local.ip; // 若未投票,默认投给自己
      }
       return local; // 返回本地节点信息(含当前任期,告知对方任期过期)
  }
   
   // 若请求节点的任期 > 本地任期,更新本地状态并投票
   local.resetLeaderDue(); // 重置 Leader 超时时间
   local.state = RaftPeer.State.FOLLOWER; // 转为 Follower
   local.voteFor = remote.ip; // 投票给请求节点
   local.term.set(remote.term.get()); // 更新本地任期为请求节点的任期
   
   Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
   return local; // 返回本地节点信息(含投票结果)
}

Raft协议下心跳检测

HeartBeat任务类解析

HeartBeat的代码内容如下,可以看到前半部分很多地方和上述MasterElection这个任务类的逻辑类似,我们就不展开说了,主要看sendBeat()的逻辑

public class HeartBeat implements Runnable {
   
   @Override
   public void run() {
       try {
           if (stopWork) {
               return;
          }
           if (!peers.isReady()) {
               return;
          }
           
           RaftPeer local = peers.local();
           local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
           if (local.heartbeatDueMs > 0) {
               return;
          }
           // 重置心跳发送间隔
           local.resetHeartbeatDue();
           // 发送心跳 核心在这里
           sendBeat();
      } catch (Exception e) {
           Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
      }
       
  }
   
   private void sendBeat() throws IOException, InterruptedException {
       RaftPeer local = peers.local();
       // 如果是standalone即单机模式 或者 当前节点是Leader的状态 则不需要进行心跳 直接return
       if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
           return;
      }
       if (Loggers.RAFT.isDebugEnabled()) {
           Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
      }

    // 2. 重置 Leader 选举超时时间
   //这里是一个有趣的设计,Leader 自己也会重置这个超时。
   //这可以防止在某些极端情况下(比如 Leader 与所有 Follower 都断开了),
   //Leader 自己因为长时间收不到任何心跳(虽然它是 Leader)而触发选举,
   //从而给自己一个机会重新加入集群或发起选举。
       local.resetLeaderDue();
       
       // build data 构建心跳包的JSON结构
       ObjectNode packet = JacksonUtils.createEmptyJsonNode();
       //将 Leader 自身的信息(如 term, ip, state)放入包中
       packet.replace("peer", JacksonUtils.transferToJsonNode(local));
       // 创建一个数组,保存后面可能需要传输的数据
       ArrayNode array = JacksonUtils.createEmptyArrayNode();
       
       // 判断是否只需要心跳还是需要携带数据
       if (switchDomain.isSendBeatOnly()) {
           Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
      }
       // 如果需要携带数据 读取datums本地内存表中的数据进行封装
       if (!switchDomain.isSendBeatOnly()) {
           for (Datum datum : datums.values()) {
               
               ObjectNode element = JacksonUtils.createEmptyJsonNode();
               // 节省传输流量 长键Key转化为短键 而且可以看到 只会存Key的内容
               if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                   element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
              } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                   element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
              }
               // 放入时间戳
               element.put("timestamp", datum.timestamp.get());
               
               array.add(element);
          }
      }
       
       packet.replace("datums", array);
       // broadcast 压缩数据 采用GZIP压缩
       Map<String, String> params = new HashMap<String, String>(1);
       params.put("beat", JacksonUtils.toJson(packet));
       
       String content = JacksonUtils.toJson(params);
       
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       GZIPOutputStream gzip = new GZIPOutputStream(out);
       gzip.write(content.getBytes(StandardCharsets.UTF_8));
       gzip.close();
       
       byte[] compressedBytes = out.toByteArray();
       String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
       
       if (Loggers.RAFT.isDebugEnabled()) {
           Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                   compressedContent.length());
      }
       // 异步发送给所有的Follow节点
       for (final String server : peers.allServersWithoutMySelf()) {
           try {
               final String url = buildUrl(server, API_BEAT);
               if (Loggers.RAFT.isDebugEnabled()) {
                   Loggers.RAFT.debug("send beat to server " + server);
              }
               HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                   @Override
                   public void onReceive(RestResult<String> result) {
                       if (!result.ok()) {
                           Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                           MetricsMonitor.getLeaderSendBeatFailedException().increment();
                           return;
                      }
                       
                       peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                       if (Loggers.RAFT.isDebugEnabled()) {
                           Loggers.RAFT.debug("receive beat response from: {}", url);
                      }
                  }
                   
                   @Override
                   public void onError(Throwable throwable) {
                       Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                               throwable);
                       MetricsMonitor.getLeaderSendBeatFailedException().increment();
                  }
                   
                   @Override
                   public void onCancel() {
                   
                  }
              });
          } catch (Exception e) {
               Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
               MetricsMonitor.getLeaderSendBeatFailedException().increment();
          }
      }
       
  }
}

其余节点处理心跳请求

上述的最后,异步发送了HTTP给了别的Nacos服务节点,我们依旧可以跟踪这个/v1/ns/raft/beat请求找到对应的处理逻辑

image-20251113104736902

跟踪最后面的那个核心方法receivedBeat,其实就是RaftCore类下的receivedBeat方法,代码如下,内容有点多

这一段主要逻辑是自身节点的 datums 中的 Key 和心跳包中的 Key 进行比对,如果发现自身节点数据缺少了,那么就会记录到 batch 中。

然后把 batch 中的 key 进行拆分包装成请求参数,通过 HTTP 的方式去查询 Leader 节点中这些 key 对应的 Instance 详细信息。

拿到 Leader 返回 Instance 实例信息之后,还是调用 raftStore.writenotifier.notify两个方法,一个同步持久化文件、一个同步内存注册表,这样最终就完成以 Leader 为准的心跳数据同步流程。

/**
* Received beat from leader. // TODO split method to multiple smaller method.
*
* @param beat beat information from leader
* @return self-peer information
* @throws Exception any exception during handle
*/
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 老规矩 获取当前节点
final RaftPeer local = peers.local();
// 创建一个RaftPeer 用于将beat心跳包中的信息封装到这个对象中
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer");
remote.ip = peer.get("ip").asText();
remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
remote.term.set(peer.get("term").asLong());
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();

// 如果不是Leader发送的心跳则报错
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
// 如果当前任期状态高于心跳包中记录的任期状态 则数据旧 直接报错
if (local.term.get() > remote.term.get()) {
Loggers.RAFT
.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
throw new IllegalArgumentException(
"out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
// 检查自己是不是Follow状态 如果不是则需要修改为Follow状态
if (local.state != RaftPeer.State.FOLLOWER) {

Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
// 解析心跳包中的实例
final JsonNode beatDatums = beat.get("datums");

// 重置心跳情况
local.resetLeaderDue();
local.resetHeartbeatDue();

// 设置这个发送节点为Leader节点 在这个逻辑中会判断和旧的Leader是否一样
peers.makeLeader(remote);

// 是否只是心跳包还是携带了数据
if (!switchDomain.isSendBeatOnly()) {
// 如果携带了数据则进行数据的解析 用receivedKeysMap这个map保存Leader心跳中存在的数据
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());

for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);// value初始值为0 代表是种Leader心跳中发现
}

// 用于批量获取数据的一个集合 所有leader心跳中有但是本地没有的实例的key都会放到这里,最后统一进行批量获取
List<String> batch = new ArrayList<>();

int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT
.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
// 遍历心跳信息中的所有数据
for (Object object : beatDatums) {
// 解析key和时间戳timesDatums
processedCount = processedCount + 1;

JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;

if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}

long timestamp = entry.get("timestamp").asLong();
// 放入到map中 value变为1 说明在Leader心跳中存在
receivedKeysMap.put(datumKey, 1);

// 判断本地内存表中是否存在对应的实例信息
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
//如果没有则需要进行同步 将需要同步的数据统一放到batch中
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}

// 批量同步最少一次50条 所以小于50条的时候直接continue结束后面的逻辑 先等放入50条再说
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}

String keys = StringUtils.join(batch, ",");

if (batch.size() <= 0) {
continue;
}

Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
processedCount, beatDatums.size(), datums.size());

// update datum entry 发起异步请求进行数据同步
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
// 请求成功则解析result中的数据
List<JsonNode> datumList = JacksonUtils
.toObj(result.getData(), new TypeReference<List<JsonNode>>() {
});

for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
// 对比新老Datume实例信息
Datum oldDatum = getDatum(datumJson.get("key").asText());
// 如果旧数据不为null 且 新数据的时间戳比旧数据的时间戳早 则说明数据反而有点问题 日志记录 跳过该数据
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) {
Loggers.RAFT
.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.get("key").asText(),
datumJson.get("timestamp").asLong(), oldDatum.timestamp);
continue;
}
// 从结果中获取数据封装为newDatum 两个if处理不同情况 第一个if处理服务元数据 第二个if处理实例数据
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}

if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}

if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
// 本地写入新的数据
raftStore.write(newDatum);
// 同步内存数据
datums.put(newDatum.key, newDatum);
// 和实例注册逻辑一样,最终还是会调用:listener.onChange 方法
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);

local.resetLeaderDue();

if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}

raftStore.updateTerm(local.term.get());

Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);

} catch (Throwable e) {
Loggers.RAFT
.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
e);
} finally {
OPERATE_LOCK.unlock();
}
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
}
return;
}

@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
}

@Override
public void onCancel() {

}

});

batch.clear();

} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}

}
// 对于只有申明但是不存在的数据 遍历进行批量删除
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}

for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}

}

return local;
}

总结

本章主要是分析在 Nacos 服务端源码中,Nacos 是如何实现 Raft 协议的。

主要分了三个部分来分析源码:

本章围绕这三个方面,把整体的 Raft 协议流程在 Nacos 中的源码部分都分析了一遍。在本章中,主要是分析主流程,因为把 Raft 源码实现内容都放在一章来讲解,有些部分代码分析得不是特别详细。在文章的开始也说明了今天分析的这一套 Raft 实现其实是 Nacos 简化版本,并且在后期 Nacos 版本中也不会再使用,所以大家也不需要很仔细地分析,只需要了解大概整体流程即可。

当前Raft协议的实现的存在的问题:

编程 Java 项目