diff --git a/console/src/main/resources/META-INF/nacos-default.properties b/console/src/main/resources/META-INF/nacos-default.properties
index 35b95f507..c55907564 100644
--- a/console/src/main/resources/META-INF/nacos-default.properties
+++ b/console/src/main/resources/META-INF/nacos-default.properties
@@ -71,3 +71,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=false
+nacos.naming.expireInstance=true
diff --git a/distribution/conf/application.properties b/distribution/conf/application.properties
index 9c916225d..9ec050758 100644
--- a/distribution/conf/application.properties
+++ b/distribution/conf/application.properties
@@ -45,3 +45,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=true
+nacos.naming.expireInstance=true
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
index b1de41652..88e332834 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.cluster.ServerMode;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
+import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
@@ -32,13 +33,15 @@ import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.pojo.Record;
+import org.apache.commons.lang3.StringUtils;
+import org.javatuples.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
/**
* A consistency protocol algorithm called Partition
@@ -57,6 +60,18 @@ import java.util.concurrent.ConcurrentHashMap;
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
+ private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+
+ t.setDaemon(true);
+ t.setName("com.alibaba.nacos.naming.distro.notifier");
+
+ return t;
+ }
+ });
+
@Autowired
private DistroMapper distroMapper;
@@ -83,7 +98,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
private boolean initialized = false;
- private volatile Map> listeners = new ConcurrentHashMap<>();
+ public volatile Notifier notifier = new Notifier();
+
+ private volatile Map> listeners = new ConcurrentHashMap<>();
@PostConstruct
public void init() throws Exception {
@@ -97,6 +114,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
});
+
+ executor.submit(notifier);
}
public void load() throws Exception {
@@ -154,13 +173,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
if (!listeners.containsKey(key)) {
return;
}
- for (RecordListener listener : listeners.get(key)) {
- try {
- listener.onChange(key, value);
- } catch (Exception e) {
- Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
- }
- }
+
+ notifier.addTask(key, ApplyAction.CHANGE);
}
public void onRemove(String key) {
@@ -170,13 +184,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
if (!listeners.containsKey(key)) {
return;
}
- for (RecordListener listener : listeners.get(key)) {
- try {
- listener.onDelete(key);
- } catch (Exception e) {
- Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
- }
- }
+
+ notifier.addTask(key, ApplyAction.DELETE);
}
public void onReceiveChecksums(Map checksumMap, String server) {
@@ -288,7 +297,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
- listeners.put(key, new ArrayList<>());
+ listeners.put(key, new CopyOnWriteArrayList<>());
}
listeners.get(key).add(listener);
}
@@ -314,4 +323,78 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
public boolean isInitialized() {
return initialized || !globalConfig.isDataWarmup();
}
+
+ public class Notifier implements Runnable {
+
+ private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);
+
+ private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);
+
+ public void addTask(String datumKey, ApplyAction action) {
+
+ if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
+ return;
+ }
+ if (action == ApplyAction.CHANGE) {
+ services.put(datumKey, StringUtils.EMPTY);
+ }
+ tasks.add(Pair.with(datumKey, action));
+ }
+
+ public int getTaskSize() {
+ return tasks.size();
+ }
+
+ @Override
+ public void run() {
+ Loggers.EPHEMERAL.info("distro notifier started");
+
+ while (true) {
+ try {
+
+ Pair pair = tasks.take();
+
+ if (pair == null) {
+ continue;
+ }
+
+ String datumKey = (String) pair.getValue0();
+ ApplyAction action = (ApplyAction) pair.getValue1();
+
+ services.remove(datumKey);
+
+ int count = 0;
+
+ if (!listeners.containsKey(datumKey)) {
+ continue;
+ }
+
+ for (RecordListener listener : listeners.get(datumKey)) {
+
+ count++;
+
+ try {
+ if (action == ApplyAction.CHANGE) {
+ listener.onChange(datumKey, dataStore.get(datumKey).value);
+ continue;
+ }
+
+ if (action == ApplyAction.DELETE) {
+ listener.onDelete(datumKey);
+ continue;
+ }
+ } catch (Throwable e) {
+ Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {} {}", datumKey, e);
+ }
+ }
+
+ if (Loggers.EPHEMERAL.isDebugEnabled()) {
+ Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}", datumKey, count);
+ }
+ } catch (Throwable e) {
+ Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task", e);
+ }
+ }
+ }
+ }
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
index ade5ea24e..23ac874be 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
@@ -557,7 +557,7 @@ public class RaftCore {
}
});
} catch (Exception e) {
- Loggers.RAFT.error("VIPSRV error while sending heart-beat to peer: {} {}", server, e);
+ Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
index 8b8cd2bff..2aa1498b7 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
@@ -154,7 +154,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
@Override
public void onChange(String key, Instances value) throws Exception {
- Loggers.RAFT.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
+ Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance ip : value.getInstanceList()) {
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java
index c33c13bae..70f19b554 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java
@@ -22,10 +22,7 @@ import com.alibaba.nacos.naming.boot.SpringContext;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
-import com.alibaba.nacos.naming.misc.HttpClient;
-import com.alibaba.nacos.naming.misc.Loggers;
-import com.alibaba.nacos.naming.misc.NamingProxy;
-import com.alibaba.nacos.naming.misc.UtilsAndCommons;
+import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.push.PushService;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response;
@@ -57,6 +54,10 @@ public class ClientBeatCheckTask implements Runnable {
return SpringContext.getAppContext().getBean(DistroMapper.class);
}
+ public GlobalConfig getGlobalConfig() {
+ return SpringContext.getAppContext().getBean(GlobalConfig.class);
+ }
+
public String taskKey() {
return service.getName();
}
@@ -85,6 +86,10 @@ public class ClientBeatCheckTask implements Runnable {
}
}
+ if (!getGlobalConfig().isExpireInstance()) {
+ return;
+ }
+
// then remove obsolete instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java
index 294910c00..49281da5f 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java
@@ -42,6 +42,9 @@ public class GlobalConfig {
@Value("${nacos.naming.data.warmup}")
private boolean dataWarmup = false;
+ @Value("${nacos.naming.expireInstance}")
+ private boolean expireInstance = true;
+
public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
@@ -61,4 +64,8 @@ public class GlobalConfig {
public boolean isDataWarmup() {
return dataWarmup;
}
+
+ public boolean isExpireInstance() {
+ return expireInstance;
+ }
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
index 22088155d..dc54484b2 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
@@ -112,7 +112,7 @@ public class HttpClient {
return getResult(conn);
} catch (Exception e) {
- Loggers.SRV_LOG.warn("[VIPSRV] Exception while request: {}, caused: {}", url, e);
+ Loggers.SRV_LOG.warn("Exception while request: {}, caused: {}", url, e);
return new HttpResult(500, e.toString(), Collections.emptyMap());
} finally {
if (conn != null) {
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java
index c9318d4c4..3abf979cb 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java
@@ -139,10 +139,9 @@ public class SwitchDomain implements Record, Cloneable {
// the followings are not implemented
public String getName() {
- return "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
+ return UtilsAndCommons.SWITCH_DOMAIN_NAME;
}
-
public void update(SwitchDomain domain) {
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java
index 995189000..0891ae2b4 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java
@@ -77,7 +77,7 @@ public class UtilsAndCommons {
public static final String IPADDRESS_DATA_ID_PRE = "com.alibaba.nacos.naming.iplist.";
- public static final String SWITCH_DOMAIN_NAME = "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
+ public static final String SWITCH_DOMAIN_NAME = "00-00---000-NACOS_SWITCH_DOMAIN-000---00-00";
public static final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+";
diff --git a/naming/src/main/resources/application.properties b/naming/src/main/resources/application.properties
index 96472b51e..3dcc16718 100644
--- a/naming/src/main/resources/application.properties
+++ b/naming/src/main/resources/application.properties
@@ -33,3 +33,4 @@ nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=true
+nacos.naming.expireInstance=true
diff --git a/test/src/test/resources/application.properties b/test/src/test/resources/application.properties
index 370947e95..b88b84cb4 100644
--- a/test/src/test/resources/application.properties
+++ b/test/src/test/resources/application.properties
@@ -25,3 +25,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=false
+nacos.naming.expireInstance=true