diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java index 98f5a7ab8..4d4afb5c1 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java @@ -285,7 +285,7 @@ public class DomainsManager { virtualClusterDomain = (VirtualClusterDomain) newDom; newDom = virtualClusterDomain; } - RaftCore.signalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom)); + RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom)); } public void easyReplaceIP4Dom(String domName, String clusterName, List ips) throws Exception { @@ -367,7 +367,7 @@ public class DomainsManager { } if (timestamp == -1) { - RaftCore.signalPublish(UtilsAndCommons.getIPListStoreKey(dom), + RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddressMap.values())); } else { String key = UtilsAndCommons.getIPListStoreKey(dom); @@ -452,7 +452,7 @@ public class DomainsManager { ipAddrs.removeAll(ips); - RaftCore.signalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs)); + RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs)); } finally { lock.unlock(); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/Switch.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/Switch.java index a1521137b..8386f7afe 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/Switch.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/Switch.java @@ -135,7 +135,7 @@ public class Switch { public static void save() { try { - RaftCore.signalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom)); + RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom)); } catch (Exception e) { Loggers.SRV_LOG.error("VIPSRV-SWITCH", "failed to save switch", e); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java index ef7dfb2b0..3a0000c36 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java @@ -179,6 +179,50 @@ public class RaftCore { } public static void signalPublish(String key, String value) throws Exception { + + long start = System.currentTimeMillis(); + final Datum datum = new Datum(); + datum.key = key; + datum.value = value; + datum.timestamp = System.currentTimeMillis(); + + JSONObject json = new JSONObject(); + json.put("datum", datum); + json.put("source", peers.local()); + + onPublish(datum, peers.local()); + + final String content = JSON.toJSONString(json); + + for (final String server : peers.allServersIncludeMyself()) { + if (isLeader(server)) { + continue; + } + final String url = buildURL(server, API_ON_PUB); + HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler() { + @Override + public Integer onCompleted(Response response) throws Exception { + if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { + Loggers.RAFT.warn("RAFT", "failed to publish data to peer, datumId=" + datum.key + ", peer=" + server + ", http code=" + response.getStatusCode()); + return 1; + } + return 0; + } + + @Override + public STATE onContentWriteCompleted() { + return STATE.CONTINUE; + } + }); + + } + + long end = System.currentTimeMillis(); + Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key); + + } + + public static void doSignalPublish(String key, String value) throws Exception { if (!RaftCore.isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); @@ -195,6 +239,15 @@ public class RaftCore { throw new IllegalStateException("I'm not leader, can not handle update/delete operation"); } + if (key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) { + signalPublishLocked(key, value); + } else { + signalPublish(key, value); + } + } + + public static void signalPublishLocked(String key, String value) throws Exception { + try { RaftCore.OPERATE_LOCK.lock(); long start = System.currentTimeMillis(); @@ -237,6 +290,12 @@ public class RaftCore { } + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + // only majority servers return success can we consider this update success + Loggers.RAFT.info("data publish failed, caused failed to notify majority, key=" + key); + throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); + } + long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key); } finally { @@ -930,10 +989,19 @@ public class RaftCore { public static class Notifier implements Runnable { + private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024); + private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024); public void addTask(Datum datum, ApplyAction action) { + + if (services.containsKey(datum.key) && action == ApplyAction.CHANGE) { + return; + } tasks.add(Pair.with(datum, action)); + if (action == ApplyAction.CHANGE) { + services.put(datum.key, StringUtils.EMPTY); + } } @Override @@ -951,6 +1019,9 @@ public class RaftCore { Datum datum = (Datum) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); + + services.remove(datum.key); + int count = 0; for (RaftListener listener : listeners) { @@ -966,7 +1037,7 @@ public class RaftCore { try { if (action == ApplyAction.CHANGE) { - listener.onChange(datum.key, datum.value); + listener.onChange(datum.key, getDatum(datum.key).value); continue; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/web/RaftCommands.java b/naming/src/main/java/com/alibaba/nacos/naming/web/RaftCommands.java index 7aebbcd04..03d028734 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/web/RaftCommands.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/web/RaftCommands.java @@ -113,7 +113,7 @@ public class RaftCommands { String value = Arrays.asList(entity).toArray(new String[1])[0]; JSONObject json = JSON.parseObject(value); - RaftCore.signalPublish(json.getString("key"), json.getString("value")); + RaftCore.doSignalPublish(json.getString("key"), json.getString("value")); return "ok"; }