From 7ca6143550adcaaca71f76f10990a1c966e4c0ba Mon Sep 17 00:00:00 2001 From: nkorange Date: Sat, 16 Mar 2019 20:07:04 +0800 Subject: [PATCH] #502 Fix concurrent problem. --- .../META-INF/nacos-default.properties | 1 + distribution/conf/application.properties | 1 + .../distro/DistroConsistencyServiceImpl.java | 117 +++++++++++++++--- .../consistency/persistent/raft/RaftCore.java | 2 +- .../alibaba/nacos/naming/core/Service.java | 2 +- .../healthcheck/ClientBeatCheckTask.java | 13 +- .../nacos/naming/misc/GlobalConfig.java | 7 ++ .../alibaba/nacos/naming/misc/HttpClient.java | 2 +- .../nacos/naming/misc/SwitchDomain.java | 3 +- .../nacos/naming/misc/UtilsAndCommons.java | 2 +- .../src/main/resources/application.properties | 1 + .../src/test/resources/application.properties | 1 + 12 files changed, 125 insertions(+), 27 deletions(-) diff --git a/console/src/main/resources/META-INF/nacos-default.properties b/console/src/main/resources/META-INF/nacos-default.properties index 35b95f507..c55907564 100644 --- a/console/src/main/resources/META-INF/nacos-default.properties +++ b/console/src/main/resources/META-INF/nacos-default.properties @@ -71,3 +71,4 @@ nacos.naming.distro.batchSyncKeyCount=1000 nacos.naming.distro.initDataRatio=0.9 nacos.naming.distro.syncRetryDelay=5000 nacos.naming.data.warmup=false +nacos.naming.expireInstance=true diff --git a/distribution/conf/application.properties b/distribution/conf/application.properties index 9c916225d..9ec050758 100644 --- a/distribution/conf/application.properties +++ b/distribution/conf/application.properties @@ -45,3 +45,4 @@ nacos.naming.distro.batchSyncKeyCount=1000 nacos.naming.distro.initDataRatio=0.9 nacos.naming.distro.syncRetryDelay=5000 nacos.naming.data.warmup=true +nacos.naming.expireInstance=true diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index b1de41652..88e332834 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.cluster.ServerMode; import com.alibaba.nacos.naming.cluster.ServerStatus; import com.alibaba.nacos.naming.cluster.servers.Server; import com.alibaba.nacos.naming.cluster.transport.Serializer; +import com.alibaba.nacos.naming.consistency.ApplyAction; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.RecordListener; @@ -32,13 +33,15 @@ import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.pojo.Record; +import org.apache.commons.lang3.StringUtils; +import org.javatuples.Pair; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; /** * A consistency protocol algorithm called Partition @@ -57,6 +60,18 @@ import java.util.concurrent.ConcurrentHashMap; @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { + private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + t.setDaemon(true); + t.setName("com.alibaba.nacos.naming.distro.notifier"); + + return t; + } + }); + @Autowired private DistroMapper distroMapper; @@ -83,7 +98,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService private boolean initialized = false; - private volatile Map> listeners = new ConcurrentHashMap<>(); + public volatile Notifier notifier = new Notifier(); + + private volatile Map> listeners = new ConcurrentHashMap<>(); @PostConstruct public void init() throws Exception { @@ -97,6 +114,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService } } }); + + executor.submit(notifier); } public void load() throws Exception { @@ -154,13 +173,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService if (!listeners.containsKey(key)) { return; } - for (RecordListener listener : listeners.get(key)) { - try { - listener.onChange(key, value); - } catch (Exception e) { - Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e); - } - } + + notifier.addTask(key, ApplyAction.CHANGE); } public void onRemove(String key) { @@ -170,13 +184,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService if (!listeners.containsKey(key)) { return; } - for (RecordListener listener : listeners.get(key)) { - try { - listener.onDelete(key); - } catch (Exception e) { - Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e); - } - } + + notifier.addTask(key, ApplyAction.DELETE); } public void onReceiveChecksums(Map checksumMap, String server) { @@ -288,7 +297,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @Override public void listen(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { - listeners.put(key, new ArrayList<>()); + listeners.put(key, new CopyOnWriteArrayList<>()); } listeners.get(key).add(listener); } @@ -314,4 +323,78 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService public boolean isInitialized() { return initialized || !globalConfig.isDataWarmup(); } + + public class Notifier implements Runnable { + + private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024); + + private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024); + + public void addTask(String datumKey, ApplyAction action) { + + if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { + return; + } + if (action == ApplyAction.CHANGE) { + services.put(datumKey, StringUtils.EMPTY); + } + tasks.add(Pair.with(datumKey, action)); + } + + public int getTaskSize() { + return tasks.size(); + } + + @Override + public void run() { + Loggers.EPHEMERAL.info("distro notifier started"); + + while (true) { + try { + + Pair pair = tasks.take(); + + if (pair == null) { + continue; + } + + String datumKey = (String) pair.getValue0(); + ApplyAction action = (ApplyAction) pair.getValue1(); + + services.remove(datumKey); + + int count = 0; + + if (!listeners.containsKey(datumKey)) { + continue; + } + + for (RecordListener listener : listeners.get(datumKey)) { + + count++; + + try { + if (action == ApplyAction.CHANGE) { + listener.onChange(datumKey, dataStore.get(datumKey).value); + continue; + } + + if (action == ApplyAction.DELETE) { + listener.onDelete(datumKey); + continue; + } + } catch (Throwable e) { + Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {} {}", datumKey, e); + } + } + + if (Loggers.EPHEMERAL.isDebugEnabled()) { + Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}", datumKey, count); + } + } catch (Throwable e) { + Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task", e); + } + } + } + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java index ade5ea24e..23ac874be 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java @@ -557,7 +557,7 @@ public class RaftCore { } }); } catch (Exception e) { - Loggers.RAFT.error("VIPSRV error while sending heart-beat to peer: {} {}", server, e); + Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java index 8b8cd2bff..2aa1498b7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java @@ -154,7 +154,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement @Override public void onChange(String key, Instances value) throws Exception { - Loggers.RAFT.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); + Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance ip : value.getInstanceList()) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index c33c13bae..70f19b554 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -22,10 +22,7 @@ import com.alibaba.nacos.naming.boot.SpringContext; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Service; -import com.alibaba.nacos.naming.misc.HttpClient; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.NamingProxy; -import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.push.PushService; import com.ning.http.client.AsyncCompletionHandler; import com.ning.http.client.Response; @@ -57,6 +54,10 @@ public class ClientBeatCheckTask implements Runnable { return SpringContext.getAppContext().getBean(DistroMapper.class); } + public GlobalConfig getGlobalConfig() { + return SpringContext.getAppContext().getBean(GlobalConfig.class); + } + public String taskKey() { return service.getName(); } @@ -85,6 +86,10 @@ public class ClientBeatCheckTask implements Runnable { } } + if (!getGlobalConfig().isExpireInstance()) { + return; + } + // then remove obsolete instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java index 294910c00..49281da5f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java @@ -42,6 +42,9 @@ public class GlobalConfig { @Value("${nacos.naming.data.warmup}") private boolean dataWarmup = false; + @Value("${nacos.naming.expireInstance}") + private boolean expireInstance = true; + public int getTaskDispatchPeriod() { return taskDispatchPeriod; } @@ -61,4 +64,8 @@ public class GlobalConfig { public boolean isDataWarmup() { return dataWarmup; } + + public boolean isExpireInstance() { + return expireInstance; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java index 22088155d..dc54484b2 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java @@ -112,7 +112,7 @@ public class HttpClient { return getResult(conn); } catch (Exception e) { - Loggers.SRV_LOG.warn("[VIPSRV] Exception while request: {}, caused: {}", url, e); + Loggers.SRV_LOG.warn("Exception while request: {}, caused: {}", url, e); return new HttpResult(500, e.toString(), Collections.emptyMap()); } finally { if (conn != null) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java index c9318d4c4..3abf979cb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java @@ -139,10 +139,9 @@ public class SwitchDomain implements Record, Cloneable { // the followings are not implemented public String getName() { - return "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00"; + return UtilsAndCommons.SWITCH_DOMAIN_NAME; } - public void update(SwitchDomain domain) { } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java index 995189000..0891ae2b4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java @@ -77,7 +77,7 @@ public class UtilsAndCommons { public static final String IPADDRESS_DATA_ID_PRE = "com.alibaba.nacos.naming.iplist."; - public static final String SWITCH_DOMAIN_NAME = "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00"; + public static final String SWITCH_DOMAIN_NAME = "00-00---000-NACOS_SWITCH_DOMAIN-000---00-00"; public static final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+"; diff --git a/naming/src/main/resources/application.properties b/naming/src/main/resources/application.properties index 96472b51e..3dcc16718 100644 --- a/naming/src/main/resources/application.properties +++ b/naming/src/main/resources/application.properties @@ -33,3 +33,4 @@ nacos.naming.distro.initDataRatio=0.9 nacos.naming.distro.syncRetryDelay=5000 nacos.naming.data.warmup=true +nacos.naming.expireInstance=true diff --git a/test/src/test/resources/application.properties b/test/src/test/resources/application.properties index 370947e95..b88b84cb4 100644 --- a/test/src/test/resources/application.properties +++ b/test/src/test/resources/application.properties @@ -25,3 +25,4 @@ nacos.naming.distro.batchSyncKeyCount=1000 nacos.naming.distro.initDataRatio=0.9 nacos.naming.distro.syncRetryDelay=5000 nacos.naming.data.warmup=false +nacos.naming.expireInstance=true