初始化将参数配置追加到Environment
spring.factories
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration
public class NacosConfigBootstrapConfiguration {
// 初始化Nacos配置信息
@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}
// 初始化NacosConfigService
@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
// 从Nacos加载远程配置文件
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigManager nacosConfigManager) {
return new NacosPropertySourceLocator(nacosConfigManager);
}
}
public class NacosPropertySourceLocator implements PropertySourceLocator {
@Override
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
// 优先级 本应用配置>扩展配置>共享配置
// 加载共享配置
loadSharedConfiguration(composite);
// 加载扩展配置
loadExtConfiguration(composite);
// 加载本应用配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
三类配置的加载源码主要逻辑是从NacosConfigService.getConfigInner方法根据dataId和group远程获取配置(/v1/cs/configs),存本地快照
Nacos配置修改动态刷新
初始化NacosContextRefresher给所有data-id添加监听器
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosConfigAutoConfiguration
public class NacosConfigAutoConfiguration {
// 初始化NacosContextRefresher
@Bean
public NacosContextRefresher nacosContextRefresher(
NacosConfigManager nacosConfigManager,
NacosRefreshHistory nacosRefreshHistory) {
// Consider that it is not necessary to be compatible with the previous
// configuration
// and use the new configuration if necessary.
return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}
}
public class NacosContextRefresher
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
// 遍历所有data-id添加监听器
/**
* register Nacos Listeners.
*/
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
// 有变化发送RefreshEvent事件
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
}
客户端长轮询拉取Nacos配置
初始化NacosConfigService的时候会初始化ClientWorker
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
// 初始化ClientWorker
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
// 初始化单线程线程池,每10MS执行一次checkConfigInfo()
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
执行LongPollingRunnable
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 检查本地配置
// check failover config
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 长轮询-根据dataId和group 服务端获取配置(/v1/cs/configs/listener)
// 如果有变化返回group
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 去服务端获取最新的配置更新本地数据cacheMap
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 遍历listeners,取到NacosContextRefresher注册的listener,执行innerReceive方法,发送RefreshEvent事件
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
服务端长轮询逻辑
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
// 判断是否是长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
}
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// 长轮询时间为30s-500ms=29.5s
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
class ClientLongPolling implements Runnable {
// 29.5s后执行
// 无变化从allSubs移除当前请求
// 响应客户端
@Override
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
// 将当前请求加入到allSubs
allSubs.add(this);
}
如果这期间发生变更,会发送 LocalDataChangeEvent事件,
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// 注册订阅者监听LocalDataChangeEvent事件
// Register A Subscriber to subscribe LocalDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
DataChangeTask接收消息遍历allSubs 找到对应的客户端请求返回给客户端结果(group+dataId)
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
// 会cancelasyncTimeoutFuture
// 响应客户端变化的group
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
}
Spring RefreshEvent事件刷新配置文件解析
RefreshEvent监听者
public class RefreshEventListener implements SmartApplicationListener {
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
// 执行刷新
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
}
public abstract class ContextRefresher {
private RefreshScope scope;
protected RefreshScope getScope() {
return this.scope;
}
public synchronized Set<String> refresh() {
// 发送EnvironmentChangeEvent事件
// 监听该事件的监听器多个,只需要关注LoggingBebinde和ConfigurationPropertiesRebinder
// ConfigurationPropertiesRebinder.rebind() 销毁原有的Bean,在重新初始化Bean
// LoggingBebinde通过LoggingSystem重新设置日志级别
Set<String> keys = refreshEnvironment();
// 刷新@RefreshScope注解的类
// 对于@RefreshScope注解的类,当每次被调用的时候,都会进行初始化,同时采用懒代理的方法,将作用域充当 初始值的缓存,当缓存存在时,不会再进行初始化。因此,对于刷新@RefreshScope注解的类,只需要将其缓存 进行清空,则在下一次访问的时候,依赖新的配置源,将生成新的缓存
this.scope.refreshAll();
return keys;
}
Tips:
Nacos2.0+采用grpc替换长轮询,后面分享