«

Nacos源码学习计划-Day17-配置中心-配置变化推送与客户端感知配置变化

ZealSinger 发布于 阅读:41 技术文档


上一章中我们讨论了Nacos客户端是如何加载和读取远程配置,通过源码了解到了Nacos远程配置的优先级和读取顺序以及读取位置在磁盘而不是数据库

在上一章最后,我们也说到了,直接修改数据库是无法被感知的,那么今天探讨的内容是:Nacos中,服务端配置变化之后,是如何通知的客户端?客户端又是如何感知到这个变化的呢?

在讲解配置中心第一个章节的时候,在源码中我们看到了 ConfigService 对象,但没有讲解具体的使用。在这个对象提供了获取配置、监听配置、发布配置等操作,而客户端感知配置文件变更也正是利用了这个对象来实现的。

ConfigService

首先我们大致来看一下ConfigService的内容,可以看到ConfigService中有很多getConfig,publishConfig这种和Config操作关联性很强的方法,除此之外,还有一个addListener这个方法,可以预测一下应该是监听者模式,可能会设置监听器从而监听变化,这个或许和配置的变化是有关的

image-20251122205115379

我们其实可以在Nacos的源码中写一段小代码,来操作一下这个ConfigService接口

获取配置和发布配置

ConfigService对象可以通过NacosFactory工厂类进行创建,我们可以自己创建这个对象,然后利用调用其方法进行配置的获取和发布

// 配置信息
String serverAddr = "127.0.0.1:8848";
String dataId = "service-test.yaml";
String group = "DEFAULT_GROUP";

Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);

// 获取配置中心服务
ConfigService configService = NacosFactory.createConfigService(properties);

创建完ConfigService对象之后,我们就可以进行配置的发布和拉取,在 DOME 中我们先获取了一次配置数据,然后发布新的配置,紧接着又一次重新获取数据,发现第二次获取的配置数据已经发生了变化,也说明我们发布配置成功了,如下图:

// 从配置中心拉取置
String content = configService.getConfig(dataId, group, 5000);
System.out.println("发布配置前" + content);

//发布配置
configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());

Thread.sleep(300L);
content = configService.getConfig(dataId, group, 5000);
System.out.println("发布配置后" + content);

image-20251122211713299

添加监听器

我们来尝试使用一下addListener这个方法,addListener() 添加监听器方法,这样客户端就会对这个配置文件进行监听,当在 Nacos 服务端发生了配置文件变更后,客户端的监听器就可以立马感知到,代码如下:

// 注册监听器
configService.addListener(dataId, group, new Listener() {
   @Override
   public void receiveConfigInfo(String configInfo) {
       System.out.println("感知配置变化:" + configInfo);
  }

   @Override
   public Executor getExecutor() {
       return null;
  }
});

// 阻断进程关闭
Thread.sleep(Integer.MAX_VALUE);

通过 DOME 我们可以得知,通过 dataId、group 两个参数,就可以注册一个监听器,当对应的 dataId、group 配置发生了改变,这样就可以被监听器感知到,从而去刷新我们的属性。

image-20251122214521386

从这里就可以知道,应该就是利用了这个监听器从而让客户端能感知服务端配置的变化从而重新读取配置,那么我们现在就是需要能找到客户端添加监听器的地方分析源码即可

客户端注册监听器源码分析

spring.factories 文件,有一个 NacosConfigAutoConfiguration 配置类,在配置类中定义了 NacosContextRefresher 对象,这个类实现了ApplicationListener接口并且监听了ApplicationReadyEvent事件

在Spring Boot中,ApplicationReadyEvent是一个非常重要的事件,它标志着Spring应用程序已经准备好接收请求。这个事件发生在所有的初始化步骤完成之后,是Spring Boot生命周期中的一个关键点。开发者可以监听这个事件来执行一些只有在应用程序完全启动后才能执行的操作,Nacos依赖中对应的监听到事件后的处理逻辑代码如下

image-20251122223908204

这段代码if 使用 CAS 并发控制操作,确保只有一个线程能够执行 registerNacosListenersForApplications()方法,在这个方法中就会为每一个 dataId、group 进行 Nacos 监听器注册。

private void registerNacosListenersForApplications() {
  if (isRefreshEnabled()) {
      // 获取全部的配置
     for (NacosPropertySource propertySource : NacosPropertySourceRepository
          .getAll()) {
        // 判断当前配置是否需要刷新
        if (!propertySource.isRefreshable()) {
           continue;
        }
        String dataId = propertySource.getDataId();
        // 真正注册监听器的方法
        registerNacosListener(propertySource.getGroup(), dataId);
    }
  }
}
//-------------------------//
private void registerNacosListener(final String groupKey, final String dataKey) {
  String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
  // 创建监听器
  Listener listener = listenerMap.computeIfAbsent(key,
        lst -> new AbstractSharedListener() {
           @Override
           public void innerReceive(String dataId, String group,
                 String configInfo) {
               
              // 监听器回调方法处理逻辑
              refreshCountIncrement();
             
              // 记录历史刷新
              nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
             
              // 发布 RefreshEvent 刷新事件
              applicationContext.publishEvent(
                    new RefreshEvent(this, null, "Refresh Nacos config"));
              if (log.isDebugEnabled()) {
                 log.debug(String.format(
                       "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                       group, dataId, configInfo));
              }
          }
        });
  try {
      // 注册监听器
     configService.addListener(dataKey, groupKey, listener);
  }
  catch (NacosException e) {
     log.warn(String.format(
           "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
           groupKey), e);
  }
}

registerNacosListener中可以看到,他是直接创建的一个Listener对象,其回调处理逻辑主要是refreshCountIncrement()addRefreshRecord() 以及publishEvent(RefreshEvent)方法

refreshCountIncrement()方法

这个方法逻辑很简单,就是一个AtomicLong类型的常量REFRESH_COUNT使用incrementAndGet()让这个常量原子性+1,这个常量其实就是一个刷新计数器,记录刷新次数

public static void refreshCountIncrement() {
REFRESH_COUNT.incrementAndGet();
}

 

addRefreshRecord()方法

记录保存配置刷新记录,例用records这个LinkedList<Record>进行保存,用一个链表进行记录配置变更时间,数据ID,分组等信息,方便后续的问题排查,监控警告以及运维可观测性,也能一定程度上用本地缓存防止配置丢失

public void addRefreshRecord(String dataId, String group, String data) {
       records.addFirst(new Record(DATE_FORMAT.get().format(new Date()), dataId, group,md5(data), null));
if (records.size() > MAX_SIZE) {
records.removeLast();
}
}

 

publishEvent(RefreshEvent)方法

这里就是发布了一个RefreshEvent事件,所以我们需要找到处理这个事件的监听器,看看是如何处理的

我们先找到是在哪里处理这个事件,利用 IDEA 的全文搜索,在 RefreshEventListener 类中会来处理这个事件,代码如下:

@Override
public void onApplicationEvent(ApplicationEvent event) {
  if (event instanceof ApplicationReadyEvent) {
     handle((ApplicationReadyEvent) event);
  }
  // 处理刷新事件
  else if (event instanceof RefreshEvent) {
     handle((RefreshEvent) event);
  }
}

注意,这个 RefreshEventListener 类其实就已经不再是 Nacos 源码的部分了,简单看看,在 handle() 方法中是如何处理的,代码如下:


private RefreshScope scope;

public void handle(RefreshEvent event) {
  if (this.ready.get()) { // don't handle events before app is ready
     log.debug("Event received " + event.getEventDesc());
     // 调用了 容器刷新的方法
     Set<String> keys = this.refresh.refresh();
     log.info("Refresh keys changed: " + keys);
  }
}

public synchronized Set<String> refresh() {
  // 刷新最新的配置环境
  Set<String> keys = refreshEnvironment();
 
  // 销毁 RefreshScope 的对象
  this.scope.refreshAll();
  return keys;
}

refreshEnvironment() 这个里面的代码我就不带大家进一步去看了,这里做的事情就是把最新的配置文件刷新到容器中就可以了。在 refreshAll() 方法中,会调用被 @RefreshScope 注解修饰类的 destroy() 方法,进行销毁 Bean。

public void refreshAll() {
  // 销毁 Bean 对象
  super.destroy();
  this.context.publishEvent(new RefreshScopeRefreshedEvent());
}

把 Bean 对象销毁之后,在后面需要用到这个 Bean 的时候,会重新进行创建,此时会获取最新的配置文件,从而完成动态刷新的效果。

ClientWorker类

我们在上面提到的ConfigService接口中,存在addListener这个方法,在registerNacosListener的最后也调用了这个方法,我们可以看一下这个addListener

// NacosConfigService.addListener() 核心逻辑
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
   // 校验参数...
   String key = GroupKey.getKey(dataId, group);
   // 创建或获取缓存对象(持有监听器列表)
   CacheData cacheData = cacheMap.get().computeIfAbsent(key, k -> new CacheData(configFilterChainManager, dataId, group));
   // 向缓存对象添加监听器
   cacheData.addListener(listener);
   // 触发ClientWorker的长轮询任务(确保监听生效)
   clientWorker.startMonitor();
}

ClientWorker内部维护了cacheMap(存储所有CacheData),并通过定时线程池(executorService)周期性检查这些CacheData,发起长轮询请求:

// ClientWorker初始化时启动定时任务
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
   // ...初始化参数
   this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
           Thread t = new Thread(r, "com.alibaba.nacos.client.Worker." + agent.getName());
           t.setDaemon(true);
           return t;
      }
  });
   // 启动定时任务(10毫秒延迟,周期性执行)
   this.executorService.scheduleWithFixedDelay(new Runnable() {
       @Override
       public void run() {
           try {
               // 检查并发送长轮询请求
               checkConfigInfo();
          } catch (Throwable e) {
               log.error("schedule check config info error", e);
          }
      }
  }, 1L, 10L, TimeUnit.MILLISECONDS);
}

ClientWorker通过长轮询(v1/cs/configs/listener接口)感知到配置变化后,会更新CacheData中的配置内容,并遍历其持有的监听器,触发receiveConfigInfo回调:

// ClientWorker处理服务端响应后,更新缓存并触发监听
private void processServiceResponse(String response) {
   // 解析服务端返回的变化配置...
   for (CacheData cacheData : changedCaches) {
       // 更新本地缓存
       cacheData.setContent(content);
       // 触发所有注册的监听器
       cacheData.checkListenerMd5();
  }
}

// CacheData.checkListenerMd5() 触发监听器
void checkListenerMd5() {
   for (Listener listener : listeners) {
       // 调用监听器的receiveConfigInfo方法
       listener.receiveConfigInfo(content);
  }
}

所以从这里可以看到,所谓的监听器监听实际上就是底层线程池通过HTTP访问获取最新配置然后MD5进行比对后进行的判断,当然,这是在1.4版本中,在2.x版本修改为了gRPC的方式访问,代码和这里肯定不一样

总结

结合ConfigService和ConfigWorker来看,Nacos客户端感知服务端配置变化的流程如下

image-20251123221152803

今天主要介绍了 ConfigService 对象的使用方式,它可以获取配置、发布配置、监听配置,我们也通过一个 DOME 演示了这么几个功能。在了解 ConfigService 对象之后,我们通过源码的方式找到了 Nacos 是如何为每一个 dataId、group 注册监听器的,并且在监听器的回调方法当中,会去刷新容器中的最新配置文件数据,最后再把被 @RefreshScope 注解修饰的类进行销毁,销毁之后在容器下一次使用这个 Bean 的时候会发现没有了,就会重新去创建这个 Bean 对象,在创建的时候,就会使用最新的配置文件内容。

 

编程 Java 项目