Nacos客户端磁盘降级需求 issues:11053 (#11207)

* 使用磁盘实现Nacos客户端版本降级

* 使用磁盘实现Nacos客户端版本降级

* 使用磁盘实现Nacos客户端版本降级

* 使用磁盘实现Nacos客户端版本降级

* 创建对象时去除cachedir,去除磁盘定时保存任务

* 创建对象时去除cachedir,去除磁盘定时保存任务

* 删除saveFailoverData方法

* 移除getServiceInfo方法中的容灾逻辑

* 移除getServiceInfo方法中的容灾逻辑

* 删除掉注释的方法

* 使用serviceMap对比

* 使用serviceMap对比

* failoverSwith去除serviceNameSet集合

* failoverSwith去除serviceNameSet集合

* failoverSwith去除serviceNameSet集合

* 特殊情况下增补日志

* 格式化checkstyle

* 对UtilAndComs  checkstyle

* 对test类  checkstyle

* 去除无效变量

* 去除无效变量

* 修改serviceInfo对象被重新覆盖的问题

* 优化getServiceInfo方法

* 降级开关初次开启时发送通知

* 降级开关开启时,也发送通知

* 当降级开启时,不再订阅服务信息。去除多余的init方法

* 本地修复CI报错

* 本地修复CI报错

* 本地修复CI报错

* 监控指标优化内部版本同步

* 监控指标优化内部版本同步

* serviceName字段去除""
This commit is contained in:
guozongkang 2023-12-05 10:21:52 +08:00 committed by GitHub
parent 18a4672dd9
commit 380c019fdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 736 additions and 270 deletions

View File

@ -125,6 +125,10 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -39,6 +39,7 @@ import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.utils.PreInitUtils;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
@ -47,6 +48,8 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Nacos Naming Service.
*
@ -240,17 +243,8 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, false);
}
List<Instance> list;
ServiceInfo serviceInfo = getServiceInfo(serviceName, groupName, clusters, subscribe);
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<>();
}
@ -300,17 +294,7 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, false);
}
ServiceInfo serviceInfo = getServiceInfo(serviceName, groupName, clusters, subscribe);
return selectInstances(serviceInfo, healthy);
}
@ -331,6 +315,41 @@ public class NacosNamingService implements NamingService {
return list;
}
private ServiceInfo getServiceInfoByFailover(String serviceName, String groupName, String clusterString) {
return serviceInfoHolder.getFailoverServiceInfo(serviceName, groupName, clusterString);
}
private ServiceInfo getServiceInfoBySubscribe(String serviceName, String groupName, String clusterString,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, false);
}
return serviceInfo;
}
private ServiceInfo getServiceInfo(String serviceName, String groupName, List<String> clusters, boolean subscribe)
throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (serviceInfoHolder.isFailoverSwitch()) {
serviceInfo = getServiceInfoByFailover(serviceName, groupName, clusterString);
if (serviceInfo != null && serviceInfo.getHosts().size() > 0) {
NAMING_LOGGER.debug("getServiceInfo from failover,serviceName: {} data:{}", serviceName,
JacksonUtils.toJson(serviceInfo.getHosts()));
return serviceInfo;
}
}
serviceInfo = getServiceInfoBySubscribe(serviceName, groupName, clusterString, subscribe);
return serviceInfo;
}
@Override
public Instance selectOneHealthyInstance(String serviceName) throws NacosException {
return selectOneHealthyInstance(serviceName, new ArrayList<>());
@ -372,17 +391,8 @@ public class NacosNamingService implements NamingService {
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
return Balancer.RandomByWeight.selectHost(serviceInfo);
} else {
ServiceInfo serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, false);
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
ServiceInfo serviceInfo = getServiceInfo(serviceName, groupName, clusters, subscribe);
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
@Override

View File

@ -0,0 +1,67 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups;
/**
* Failover Data.
*
* @author zongkang.guo
*/
public class FailoverData {
/**
* failover type,naming or config.
*/
private DataType dataType;
/**
* failover data.
*/
private Object data;
public FailoverData(DataType dataType, Object data) {
this.data = data;
this.dataType = dataType;
}
public enum DataType {
/**
* naming.
*/
naming,
/**
* config.
*/
config
}
public DataType getDataType() {
return dataType;
}
public void setDataType(DataType dataType) {
this.dataType = dataType;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups;
import java.util.Map;
/**
* Failover Service Interface.
*
* @author Nacos
*/
public interface FailoverDataSource {
/**
* Get current disaster recovery switch.
*
* @return
*/
FailoverSwitch getSwitch();
/**
* Get current disaster recovery data.
*
* @return map
*/
Map<String, FailoverData> getFailoverData();
}

View File

@ -18,28 +18,22 @@ package com.alibaba.nacos.client.naming.backups;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.cache.ConcurrentDiskUtil;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Metrics;
import java.io.BufferedReader;
import java.io.File;
import java.io.StringReader;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -54,29 +48,29 @@ import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
*/
public class FailoverReactor implements Closeable {
private static final String FAILOVER_DIR = "/failover";
private static final String IS_FAILOVER_MODE = "1";
private static final String NO_FAILOVER_MODE = "0";
private static final String FAILOVER_MODE_PARAM = "failover-mode";
private Map<String, ServiceInfo> serviceMap = new ConcurrentHashMap<>();
private final Map<String, String> switchParams = new ConcurrentHashMap<>();
private static final long DAY_PERIOD_MINUTES = 24 * 60;
private final String failoverDir;
private boolean failoverSwitchEnable;
private final ServiceInfoHolder serviceInfoHolder;
private final ScheduledExecutorService executorService;
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
private FailoverDataSource failoverDataSource;
private String notifierEventScope;
private HashMap<String, Meter> meterMap = new HashMap<>(10);
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String notifierEventScope) {
this.serviceInfoHolder = serviceInfoHolder;
this.failoverDir = cacheDir + FAILOVER_DIR;
this.notifierEventScope = notifierEventScope;
Collection<FailoverDataSource> dataSources = NacosServiceLoader.load(FailoverDataSource.class);
for (FailoverDataSource dataSource : dataSources) {
failoverDataSource = dataSource;
NAMING_LOGGER.info("FailoverDataSource type is {}", dataSource.getClass());
break;
}
// init executorService
this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r);
@ -92,28 +86,87 @@ public class FailoverReactor implements Closeable {
*/
public void init() {
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
executorService.scheduleWithFixedDelay(new FailoverSwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
}
class FailoverSwitchRefresher implements Runnable {
// backup file on startup if failover directory is empty.
executorService.schedule(() -> {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
FailoverSwitch fSwitch = failoverDataSource.getSwitch();
if (fSwitch == null) {
failoverSwitchEnable = false;
return;
}
File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
if (fSwitch.getEnabled() != failoverSwitchEnable) {
NAMING_LOGGER.info("failover switch changed, new: {}", fSwitch.getEnabled());
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
if (fSwitch.getEnabled()) {
Map<String, ServiceInfo> failoverMap = new ConcurrentHashMap<>(200);
Map<String, FailoverData> failoverData = failoverDataSource.getFailoverData();
for (Map.Entry<String, FailoverData> entry : failoverData.entrySet()) {
ServiceInfo newService = (ServiceInfo) entry.getValue().getData();
ServiceInfo oldService = serviceMap.get(entry.getKey());
if (serviceInfoHolder.isChangedServiceInfo(oldService, newService)) {
NAMING_LOGGER.info("[NA] failoverdata isChangedServiceInfo. newService:{}",
JacksonUtils.toJson(newService));
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, newService.getName(),
newService.getGroupName(), newService.getClusters(), newService.getHosts()));
}
failoverMap.put(entry.getKey(), (ServiceInfo) entry.getValue().getData());
}
if (failoverMap.size() > 0) {
failoverServiceCntMetrics(failoverMap);
serviceMap = failoverMap;
}
failoverSwitchEnable = true;
return;
}
if (failoverSwitchEnable && !fSwitch.getEnabled()) {
Map<String, ServiceInfo> serviceInfoMap = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : serviceMap.entrySet()) {
ServiceInfo oldService = entry.getValue();
ServiceInfo newService = serviceInfoMap.get(entry.getKey());
if (newService != null) {
boolean changed = serviceInfoHolder.isChangedServiceInfo(oldService, newService);
if (changed) {
NotifyCenter.publishEvent(
new InstancesChangeEvent(notifierEventScope, newService.getName(),
newService.getGroupName(), newService.getClusters(),
newService.getHosts()));
}
}
}
serviceMap.clear();
failoverSwitchEnable = false;
failoverServiceCntMetricsClear();
return;
}
} catch (Exception e) {
NAMING_LOGGER.error("FailoverSwitchRefresher run err", e);
}
}, 10000L, TimeUnit.MILLISECONDS);
}
}
public boolean isFailoverSwitch() {
return failoverSwitchEnable;
}
public ServiceInfo getService(String key) {
ServiceInfo serviceInfo = serviceMap.get(key);
if (serviceInfo == null) {
serviceInfo = new ServiceInfo();
serviceInfo.setName(key);
}
return serviceInfo;
}
/**
@ -130,6 +183,11 @@ public class FailoverReactor implements Closeable {
return startDT.getTime();
}
/**
* shutdown ThreadPool.
*
* @throws NacosException Nacos exception
*/
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
@ -138,153 +196,29 @@ public class FailoverReactor implements Closeable {
NAMING_LOGGER.info("{} do shutdown stop", className);
}
class SwitchRefresher implements Runnable {
long lastModifiedMillis = 0L;
@Override
public void run() {
try {
File switchFile = Paths.get(failoverDir, UtilAndComs.FAILOVER_SWITCH).toFile();
if (!switchFile.exists()) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
return;
}
long modified = switchFile.lastModified();
if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
String failover = ConcurrentDiskUtil.getFileContent(switchFile.getPath(),
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
String[] lines = failover.split(DiskCache.getLineSeparator());
for (String line : lines) {
String line1 = line.trim();
if (IS_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
} else if (NO_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.info("failover-mode is off");
}
}
} else {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
}
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
private void failoverServiceCntMetrics(Map<String, ServiceInfo> failoverMap) {
try {
for (Map.Entry<String, ServiceInfo> entry : failoverMap.entrySet()) {
String serviceName = entry.getKey();
Gauge register = Gauge.builder("nacos_naming_client_failover_instances",
((ServiceInfo) failoverMap.get(serviceName)).ipCount(), Integer::intValue)
.tag("service_name", serviceName).description("Nacos failover data service count")
.register(Metrics.globalRegistry);
meterMap.put(serviceName, register);
}
} catch (Exception e) {
NAMING_LOGGER.info("[NA] registerFailoverServiceCnt fail.", e);
}
}
class FailoverFileReader implements Runnable {
@Override
public void run() {
Map<String, ServiceInfo> domMap = new HashMap<>(16);
BufferedReader reader = null;
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null) {
return;
}
for (File file : files) {
if (!file.isFile()) {
continue;
}
if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
continue;
}
ServiceInfo dom = null;
try {
dom = new ServiceInfo(URLDecoder.decode(file.getName(), StandardCharsets.UTF_8.name()));
String dataString = ConcurrentDiskUtil.getFileContent(file,
Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JacksonUtils.toObj(json, ServiceInfo.class);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (Exception e) {
//ignore
}
}
if (dom != null && !CollectionUtils.isEmpty(dom.getHosts())) {
domMap.put(dom.getKey(), dom);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
if (domMap.size() > 0) {
serviceMap = domMap;
private void failoverServiceCntMetricsClear() {
try {
for (Map.Entry<String, Meter> entry : meterMap.entrySet()) {
Metrics.globalRegistry.remove(entry.getValue());
}
meterMap.clear();
} catch (Exception e) {
NAMING_LOGGER.info("[NA] registerFailoverServiceCnt fail.", e);
}
}
class DiskFileWriter extends TimerTask {
@Override
public void run() {
Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
continue;
}
DiskCache.write(serviceInfo, failoverDir);
}
}
}
public boolean isFailoverSwitch() {
return Boolean.parseBoolean(switchParams.get(FAILOVER_MODE_PARAM));
}
public ServiceInfo getService(String key) {
ServiceInfo serviceInfo = serviceMap.get(key);
if (serviceInfo == null) {
serviceInfo = new ServiceInfo();
serviceInfo.setName(key);
}
return serviceInfo;
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups;
/**
* Failover switch.
*
* @author zongkang.guo
*/
public class FailoverSwitch {
/**
* Failover switch enable.
*/
private boolean enabled;
public boolean getEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public FailoverSwitch(boolean enabled) {
this.enabled = enabled;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
/**
* Naming Failover Data.
*
* @author zongkang.guo
*/
public class NamingFailoverData extends FailoverData {
private NamingFailoverData(ServiceInfo serviceInfo) {
super(DataType.naming, serviceInfo);
}
public static NamingFailoverData newNamingFailoverData(ServiceInfo serviceInfo) {
return new NamingFailoverData(serviceInfo);
}
}

View File

@ -0,0 +1,191 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups.datasource;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.backups.FailoverData;
import com.alibaba.nacos.client.naming.backups.FailoverDataSource;
import com.alibaba.nacos.client.naming.backups.FailoverSwitch;
import com.alibaba.nacos.client.naming.backups.NamingFailoverData;
import com.alibaba.nacos.client.naming.cache.ConcurrentDiskUtil;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.utils.CacheDirUtil;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.StringReader;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Failover Data Disk Impl.
*
* @author zongkang.guo
*/
public class DiskFailoverDataSource implements FailoverDataSource {
private static final String FAILOVER_DIR = "/failover";
private static final String IS_FAILOVER_MODE = "1";
private static final String NO_FAILOVER_MODE = "0";
private static final String FAILOVER_MODE_PARAM = "failover-mode";
private Map<String, FailoverData> serviceMap = new ConcurrentHashMap<>();
private final Map<String, String> switchParams = new ConcurrentHashMap<>();
private String failoverDir;
private long lastModifiedMillis = 0L;
public DiskFailoverDataSource() {
failoverDir = CacheDirUtil.gettCacheDir() + FAILOVER_DIR;
}
class FailoverFileReader implements Runnable {
@Override
public void run() {
Map<String, FailoverData> domMap = new HashMap<>(200);
BufferedReader reader = null;
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null) {
return;
}
for (File file : files) {
if (!file.isFile()) {
continue;
}
if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
continue;
}
ServiceInfo dom = null;
try {
dom = new ServiceInfo(URLDecoder.decode(file.getName(), StandardCharsets.UTF_8.name()));
String dataString = ConcurrentDiskUtil.getFileContent(file,
Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JacksonUtils.toObj(json, ServiceInfo.class);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (Exception e) {
//ignore
}
}
if (dom != null && !CollectionUtils.isEmpty(dom.getHosts())) {
domMap.put(dom.getKey(), NamingFailoverData.newNamingFailoverData(dom));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
if (domMap.size() > 0) {
serviceMap = domMap;
}
}
}
@Override
public FailoverSwitch getSwitch() {
try {
File switchFile = Paths.get(failoverDir, UtilAndComs.FAILOVER_SWITCH).toFile();
if (!switchFile.exists()) {
NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
return new FailoverSwitch(Boolean.FALSE);
}
long modified = switchFile.lastModified();
if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
String failover = ConcurrentDiskUtil.getFileContent(switchFile.getPath(),
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
String[] lines = failover.split(DiskCache.getLineSeparator());
for (String line : lines) {
String line1 = line.trim();
if (IS_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
return new FailoverSwitch(Boolean.TRUE);
} else if (NO_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.info("failover-mode is off");
}
}
}
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
}
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
return new FailoverSwitch(Boolean.FALSE);
}
@Override
public Map<String, FailoverData> getFailoverData() {
if (Boolean.parseBoolean(switchParams.get(FAILOVER_MODE_PARAM))) {
return serviceMap;
}
return new ConcurrentHashMap<>(0);
}
}

View File

@ -25,13 +25,13 @@ import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.utils.CacheDirUtil;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -50,14 +50,6 @@ import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
*/
public class ServiceInfoHolder implements Closeable {
private static final String JM_SNAPSHOT_PATH_PROPERTY = "JM.SNAPSHOT.PATH";
private static final String FILE_PATH_NACOS = "nacos";
private static final String FILE_PATH_NAMING = "naming";
private static final String USER_HOME_PROPERTY = "user.home";
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
private final FailoverReactor failoverReactor;
@ -69,50 +61,33 @@ public class ServiceInfoHolder implements Closeable {
private String notifierEventScope;
public ServiceInfoHolder(String namespace, String notifierEventScope, NacosClientProperties properties) {
initCacheDir(namespace, properties);
cacheDir = CacheDirUtil.initCacheDir(namespace, properties);
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.failoverReactor = new FailoverReactor(this, notifierEventScope);
this.pushEmptyProtection = isPushEmptyProtect(properties);
this.notifierEventScope = notifierEventScope;
}
private void initCacheDir(String namespace, NacosClientProperties properties) {
String jmSnapshotPath = properties.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
String namingCacheRegistryDir = "";
if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
}
if (!StringUtils.isBlank(jmSnapshotPath)) {
cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
} else {
cacheDir = properties.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
}
}
private boolean isLoadCacheAtStart(NacosClientProperties properties) {
boolean loadCacheAtStart = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
loadCacheAtStart = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
if (properties != null && StringUtils.isNotEmpty(
properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
loadCacheAtStart = ConvertUtils.toBoolean(
properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
}
return loadCacheAtStart;
}
private boolean isPushEmptyProtect(NacosClientProperties properties) {
boolean pushEmptyProtection = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION))) {
pushEmptyProtection = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION));
if (properties != null && StringUtils.isNotEmpty(
properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION))) {
pushEmptyProtection = ConvertUtils.toBoolean(
properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION));
}
return pushEmptyProtection;
}
@ -122,12 +97,8 @@ public class ServiceInfoHolder implements Closeable {
}
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
return serviceInfoMap.get(key);
}
@ -168,8 +139,11 @@ public class ServiceInfoHolder implements Closeable {
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
if (!failoverReactor.isFailoverSwitch()) {
NotifyCenter.publishEvent(
new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
}
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
@ -179,7 +153,14 @@ public class ServiceInfoHolder implements Closeable {
return null == serviceInfo.getHosts() || (pushEmptyProtection && !serviceInfo.validate());
}
private boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
/**
* isChangedServiceInfo.
*
* @param oldService old service data
* @param newService new service data
* @return
*/
public boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
if (null == oldService) {
NAMING_LOGGER.info("init new ips({}) service: {} -> {}", newService.ipCount(), newService.getKey(),
JacksonUtils.toJson(newService.getHosts()));
@ -204,8 +185,7 @@ public class ServiceInfoHolder implements Closeable {
Set<Instance> newHosts = new HashSet<>();
Set<Instance> remvHosts = new HashSet<>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<>(
newHostMap.entrySet());
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
@ -225,7 +205,7 @@ public class ServiceInfoHolder implements Closeable {
if (newHostMap.containsKey(key)) {
continue;
}
//add to remove hosts
remvHosts.add(host);
}
@ -250,6 +230,20 @@ public class ServiceInfoHolder implements Closeable {
return changed;
}
public String getCacheDir() {
return cacheDir;
}
public boolean isFailoverSwitch() {
return failoverReactor.isFailoverSwitch();
}
public ServiceInfo getFailoverServiceInfo(final String serviceName, final String groupName, final String clusters) {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
return failoverReactor.getService(key);
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();

View File

@ -0,0 +1,75 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.utils;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.common.utils.StringUtils;
import java.io.File;
import com.alibaba.nacos.client.env.NacosClientProperties;
/**
* Cache Dir Utils.
*
* @author zongkang.guo
*/
public class CacheDirUtil {
private static String cacheDir;
private static final String JM_SNAPSHOT_PATH_PROPERTY = "JM.SNAPSHOT.PATH";
private static final String FILE_PATH_NACOS = "nacos";
private static final String FILE_PATH_NAMING = "naming";
private static final String USER_HOME_PROPERTY = "user.home";
/**
* Init cache dir.
*
* @param namespace namespace.
* @param properties nacosClientProperties.
* @return
*/
public static String initCacheDir(String namespace, NacosClientProperties properties) {
String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
String namingCacheRegistryDir = "";
if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
namingCacheRegistryDir =
File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
}
if (!StringUtils.isBlank(jmSnapshotPath)) {
cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir + File.separator
+ FILE_PATH_NAMING + File.separator + namespace;
} else {
cacheDir =
System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
}
return cacheDir;
}
public static String gettCacheDir() {
return cacheDir;
}
}

View File

@ -0,0 +1,18 @@
#
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
com.alibaba.nacos.client.naming.backups.datasource.DiskFailoverDataSource

View File

@ -26,6 +26,7 @@ import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.util.Date;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
public class FailoverReactorTest {
@ -34,7 +35,7 @@ public class FailoverReactorTest {
public void testInit() throws NacosException, NoSuchFieldException, IllegalAccessException {
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
FailoverReactor failoverReactor = new FailoverReactor(holder, "/tmp");
FailoverReactor failoverReactor = new FailoverReactor(holder, UUID.randomUUID().toString());
Field executorService = FailoverReactor.class.getDeclaredField("executorService");
executorService.setAccessible(true);
ScheduledExecutorService o = (ScheduledExecutorService) executorService.get(failoverReactor);
@ -47,7 +48,7 @@ public class FailoverReactorTest {
public void testAddDay() throws NacosException {
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
FailoverReactor failoverReactor = new FailoverReactor(holder, "/tmp");
FailoverReactor failoverReactor = new FailoverReactor(holder, UUID.randomUUID().toString());
Date date = new Date();
Date actual = failoverReactor.addDay(date, 1);
Assert.assertEquals(date.getTime() + 24 * 60 * 60 * 1000, actual.getTime());
@ -58,7 +59,7 @@ public class FailoverReactorTest {
public void testIsFailoverSwitch() throws NacosException {
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
FailoverReactor failoverReactor = new FailoverReactor(holder, "/tmp");
FailoverReactor failoverReactor = new FailoverReactor(holder, UUID.randomUUID().toString());
Assert.assertFalse(failoverReactor.isFailoverSwitch());
failoverReactor.shutdown();
@ -68,10 +69,9 @@ public class FailoverReactorTest {
public void testGetService() throws NacosException {
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
FailoverReactor failoverReactor = new FailoverReactor(holder, "/tmp");
FailoverReactor failoverReactor = new FailoverReactor(holder, UUID.randomUUID().toString());
ServiceInfo info = failoverReactor.getService("aa@@bb");
Assert.assertEquals(new ServiceInfo("aa@@bb").toString(), info.toString());
failoverReactor.shutdown();
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.backups.datasource;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.utils.CacheDirUtil;
import junit.framework.TestCase;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.Properties;
public class DiskFailoverDataSourceTest extends TestCase {
@Test
public void testGetSwitch() {
Properties prop = new Properties();
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
final NacosClientProperties properties = NacosClientProperties.PROTOTYPE.derive(prop);
String cacheDir = CacheDirUtil.initCacheDir("public", properties);
DiskFailoverDataSource diskFailoverDataSource = new DiskFailoverDataSource();
diskFailoverDataSource.getSwitch();
}
@Test
public void testGetFailoverData() {
Properties prop = new Properties();
ServiceInfoHolder holder = Mockito.mock(ServiceInfoHolder.class);
Mockito.when(holder.getServiceInfoMap()).thenReturn(new HashMap<>());
final NacosClientProperties properties = NacosClientProperties.PROTOTYPE.derive(prop);
String cacheDir = CacheDirUtil.initCacheDir("public", properties);
DiskFailoverDataSource diskFailoverDataSource = new DiskFailoverDataSource();
diskFailoverDataSource.getFailoverData();
}
}