From 3250533516cb064c703ee3020ef4e30374daa9ce Mon Sep 17 00:00:00 2001 From: nkorange Date: Sat, 29 Dec 2018 13:44:12 +0800 Subject: [PATCH] #526 Make more effort --- distribution/conf/nacos-logback.xml | 20 +- .../naming/controllers/ServiceController.java | 4 + .../nacos/naming/core/DomainsManager.java | 187 +++++++------- .../naming/core/VirtualClusterDomain.java | 2 + .../alibaba/nacos/naming/misc/HttpClient.java | 2 +- .../nacos/naming/misc/UtilsAndCommons.java | 4 + .../nacos/naming/push/PushService.java | 12 +- .../alibaba/nacos/naming/raft/RaftCore.java | 36 +-- .../alibaba/nacos/naming/web/ApiCommands.java | 231 ++++++++++++++---- .../nacos/naming/core/DomainsManagerTest.java | 16 -- .../naming/AutoDeregisterInstance_ITCase.java | 4 - .../test/naming/SelectInstances_ITCase.java | 2 +- 12 files changed, 324 insertions(+), 196 deletions(-) diff --git a/distribution/conf/nacos-logback.xml b/distribution/conf/nacos-logback.xml index 3a74d3a27..b16eaa8c1 100644 --- a/distribution/conf/nacos-logback.xml +++ b/distribution/conf/nacos-logback.xml @@ -58,9 +58,9 @@ true ${LOG_HOME}/naming-raft.log.%d{yyyy-MM-dd}.%i - 20MB + 1GB 15 - 128MB + 3GB true @@ -82,9 +82,9 @@ true ${LOG_HOME}/naming-event.log.%d{yyyy-MM-dd}.%i - 20MB + 1GB 15 - 128MB + 3GB true @@ -106,9 +106,9 @@ true ${LOG_HOME}/naming-push.log.%d{yyyy-MM-dd}.%i - 20MB + 1GB 15 - 128MB + 3GB true @@ -139,9 +139,9 @@ true ${LOG_HOME}/naming-performance.log.%d{yyyy-MM-dd}.%i - 50MB + 1GB 15 - 512MB + 3GB true @@ -241,9 +241,9 @@ true ${LOG_HOME}/naming-debug.log.%d{yyyy-MM-dd}.%i - 20MB + 1GB 15 - 128MB + 3GB true diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java index a9699f7be..eac5df8fb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java @@ -25,6 +25,7 @@ import com.alibaba.nacos.naming.core.VirtualClusterDomain; import com.alibaba.nacos.naming.exception.NacosException; import com.alibaba.nacos.naming.healthcheck.HealthCheckMode; import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.alibaba.nacos.naming.raft.RaftCore; import com.alibaba.nacos.naming.selector.LabelSelector; import com.alibaba.nacos.naming.selector.NoneSelector; import com.alibaba.nacos.naming.selector.Selector; @@ -130,6 +131,9 @@ public class ServiceController { res.put("selector", domain.getSelector()); + res.put("instanceTimestamp", RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(domain)).timestamp.get()); + res.put("serviceTimestamp", RaftCore.getDatum(UtilsAndCommons.getDomStoreKey(domain)).timestamp.get()); + return res; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java index ce61dbb5c..79a670615 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/DomainsManager.java @@ -69,7 +69,7 @@ public class DomainsManager { * thread pool that processes getting domain detail from other server asynchronously */ private ExecutorService domainUpdateExecutor - = Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() { + = Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); @@ -202,8 +202,8 @@ public class DomainsManager { if (valid != ipAddress.isValid()) { ipAddress.setValid(valid); Loggers.EVT_LOG.info("{" + domName + "} {SYNC} " + - "{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp() - + ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName()); + "{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp() + + ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName()); } } @@ -287,72 +287,103 @@ public class DomainsManager { RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom), true); } - public void easyAddIP4Dom(String domName, List ips, long timestamp, long term) throws Exception { + public void easyAddIP4Dom(String domName, List ips, long term) throws Exception { + easyUpdateIP4Dom(domName, ips, term, "add"); + } + + public void easyRemvIP4Dom(String domName, List ips, long term) throws Exception { + easyUpdateIP4Dom(domName, ips, term, "remove"); + } + + public void easyUpdateIP4Dom(String domName, List ips, long term, String action) throws Exception { VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap().get(domName); if (dom == null) { throw new IllegalArgumentException("dom doesn't exist: " + domName); } - Datum datum1 = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom)); - String oldJson = StringUtils.EMPTY; + try { - if (datum1 != null) { - oldJson = datum1.value; - } - - List ipAddresses; - List currentIPs = dom.allIPs(); - Map map = new ConcurrentHashMap(currentIPs.size()); - - for (IpAddress ipAddress : currentIPs) { - map.put(ipAddress.toIPAddr(), ipAddress); - } - - ipAddresses = setValid(oldJson, map); - - Map ipAddressMap = new HashMap(ipAddresses.size()); - - for (IpAddress ipAddress : ipAddresses) { - ipAddressMap.put(ipAddress.getDatumKey(), ipAddress); - } - - for (IpAddress ipAddress : ips) { - if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) { - Cluster cluster = new Cluster(ipAddress.getClusterName()); - cluster.setDom(dom); - dom.getClusterMap().put(ipAddress.getClusterName(), cluster); - Loggers.SRV_LOG.warn("cluster: " + ipAddress.getClusterName() + " not found, ip: " + ipAddress.toJSON() - + ", will create new cluster with default configuration."); + if (!dom.getEnableClientBeat()) { + getDom2LockMap().get(domName).lock(); } - ipAddressMap.put(ipAddress.getDatumKey(), ipAddress); - } + Datum datum1 = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom)); + String oldJson = StringUtils.EMPTY; - if (ipAddressMap.size() <= 0) { - throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: " + if (datum1 != null) { + oldJson = datum1.value; + } + + List ipAddresses; + List currentIPs = dom.allIPs(); + Map map = new ConcurrentHashMap(currentIPs.size()); + + for (IpAddress ipAddress : currentIPs) { + map.put(ipAddress.toIPAddr(), ipAddress); + } + + ipAddresses = setValid(oldJson, map); + + Map ipAddressMap = new HashMap(ipAddresses.size()); + + for (IpAddress ipAddress : ipAddresses) { + ipAddressMap.put(ipAddress.getDatumKey(), ipAddress); + } + + for (IpAddress ipAddress : ips) { + if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) { + Cluster cluster = new Cluster(ipAddress.getClusterName()); + cluster.setDom(dom); + dom.getClusterMap().put(ipAddress.getClusterName(), cluster); + Loggers.SRV_LOG.warn("cluster: " + ipAddress.getClusterName() + " not found, ip: " + ipAddress.toJSON() + + ", will create new cluster with default configuration."); + } + + if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { + ipAddressMap.remove(ipAddress.getDatumKey()); + } else { + ipAddressMap.put(ipAddress.getDatumKey(), ipAddress); + } + + } + + if (ipAddressMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { + throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: " + JSON.toJSONString(ipAddressMap.values())); + } + + Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-UPDATE}" + ips + + ", action:" + action); + + String key = UtilsAndCommons.getIPListStoreKey(dom); + String value = JSON.toJSONString(ipAddressMap.values()); + + Datum datum = new Datum(); + datum.key = key; + datum.value = value; + + datum.timestamp.set(datum1 == null ? 1 : datum1.timestamp.get() + 1); + + Loggers.RAFT.info("datum " + key + " updated:" + datum.timestamp.get()); + + RaftPeer peer = new RaftPeer(); + peer.ip = RaftCore.getLeader().ip; + peer.term.set(term); + peer.voteFor = RaftCore.getLeader().voteFor; + peer.heartbeatDueMs = RaftCore.getLeader().heartbeatDueMs; + peer.leaderDueMs = RaftCore.getLeader().leaderDueMs; + peer.state = RaftCore.getLeader().state; + + boolean increaseTerm = !((VirtualClusterDomain) getDomain(domName)).getEnableClientBeat(); + + RaftCore.onPublish(datum, peer, increaseTerm); + } finally { + if (!dom.getEnableClientBeat()) { + getDom2LockMap().get(domName).unlock(); + } } - String key = UtilsAndCommons.getIPListStoreKey(dom); - String value = JSON.toJSONString(ipAddressMap.values()); - - Datum datum = new Datum(); - datum.key = key; - datum.value = value; - datum.timestamp.set(timestamp); - - RaftPeer peer = new RaftPeer(); - peer.ip = RaftCore.getLeader().ip; - peer.term.set(term); - peer.voteFor = RaftCore.getLeader().voteFor; - peer.heartbeatDueMs = RaftCore.getLeader().heartbeatDueMs; - peer.leaderDueMs = RaftCore.getLeader().leaderDueMs; - peer.state = RaftCore.getLeader().state; - - boolean increaseTerm = !((VirtualClusterDomain)getDomain(domName)).getEnableClientBeat(); - - RaftCore.onPublish(datum, peer, increaseTerm); } private List setValid(String oldJson, Map map) { @@ -380,50 +411,6 @@ public class DomainsManager { return ipAddresses; } - public void easyRemvIP4Dom(String domName, List ips) throws Exception { - Lock lock = dom2LockMap.get(domName); - if (lock == null) { - throw new IllegalStateException("no lock for " + domName + ", operation is disabled now."); - } - - try { - lock.lock(); - Domain dom = chooseDomMap().get(domName); - if (dom == null) { - throw new IllegalArgumentException("domain doesn't exist: " + domName); - } - - Datum datum = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom)); - String oldJson = StringUtils.EMPTY; - List currentIPs = dom.allIPs(); - - if (currentIPs.size() <= 0) { - return; - } - - Map map = new ConcurrentHashMap(currentIPs.size()); - - for (IpAddress ipAddress : currentIPs) { - map.put(ipAddress.toIPAddr(), ipAddress); - } - - if (datum != null) { - oldJson = datum.value; - } - - List ipAddrs = setValid(oldJson, map); - - ipAddrs.removeAll(ips); - - boolean locked = !((VirtualClusterDomain)getDomain(domName)).getEnableClientBeat(); - - RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs), locked); - - } finally { - lock.unlock(); - } - } - public Domain getDomain(String domName) { return chooseDomMap().get(domName); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/VirtualClusterDomain.java b/naming/src/main/java/com/alibaba/nacos/naming/core/VirtualClusterDomain.java index 1d6da8cdf..5446893e9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/VirtualClusterDomain.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/VirtualClusterDomain.java @@ -178,7 +178,9 @@ public class VirtualClusterDomain implements Domain, RaftListener { List ips = JSON.parseObject(value, new TypeReference>() { }); + for (IpAddress ip : ips) { + if (ip.getWeight() > 10000.0D) { ip.setWeight(10000.0D); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java index 58ca5d52f..24f9cfecc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java @@ -112,7 +112,7 @@ public class HttpClient { return getResult(conn); } catch (Exception e) { - Loggers.SRV_LOG.warn("VIPSRV", "Exception while request: " + url + ", caused: " + e.getMessage()); + Loggers.SRV_LOG.warn("VIPSRV {}", "Exception while request: " + url + ", caused: " + e.getMessage()); return new HttpResult(500, e.toString(), Collections.emptyMap()); } finally { if (conn != null) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java index d5839f94f..420fba1ed 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java @@ -104,6 +104,10 @@ public class UtilsAndCommons { public static final String API_DOM = "/api/dom"; + public static final String UPDATE_INSTANCE_ACTION_ADD = "add"; + + public static final String UPDATE_INSTANCE_ACTION_REMOVE = "remove"; + public static final String INSTANCE_LIST_PERSISTED_PROPERTY_KEY = "nacos.instanceListPersisted"; public static final boolean INSTANCE_LIST_PERSISTED = Boolean.getBoolean(INSTANCE_LIST_PERSISTED_PROPERTY_KEY); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java index 78c0fe374..7f1e9a943 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java @@ -114,13 +114,13 @@ public class PushService { try { removeClientIfZombie(); } catch (Throwable e) { - Loggers.PUSH.warn("VIPSRV-PUSH", "failed to remove client zombied"); + Loggers.PUSH.warn("VIPSRV-PUSH {}", "failed to remove client zombied"); } } }, 0, 20, TimeUnit.SECONDS); } catch (SocketException e) { - Loggers.SRV_LOG.error("VIPSRV-PUSH", "failed to init push service"); + Loggers.SRV_LOG.error("VIPSRV-PUSH {}", "failed to init push service"); } } @@ -181,7 +181,7 @@ public class PushService { size += clientConcurrentMap.size(); } - Loggers.PUSH.info("VIPSRV-PUSH", "clientMap size: " + size); + Loggers.PUSH.info("VIPSRV-PUSH {}", "clientMap size: " + size); } @@ -221,7 +221,9 @@ public class PushService { @Override public void run() { try { - Loggers.PUSH.info(dom + " is changed, add it to push queue."); + if (Loggers.PUSH.isDebugEnabled()) { + Loggers.PUSH.debug(dom + " is changed, add it to push queue."); + } ConcurrentMap clients = clientMap.get(dom); if (MapUtils.isEmpty(clients)) { return; @@ -247,7 +249,7 @@ public class PushService { compressData = (byte[]) (pair.getValue0()); data = (Map) pair.getValue1(); - Loggers.PUSH.debug("PUSH-CACHE", "cache hit: " + dom + ":" + client.getAddrStr()); + Loggers.PUSH.debug("PUSH-CACHE {}", "cache hit: " + dom + ":" + client.getAddrStr()); } if (compressData != null) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java index e507ddd9f..618988d0e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java @@ -86,11 +86,11 @@ public class RaftCore { private static volatile List listeners = new CopyOnWriteArrayList<>(); - private static ConcurrentMap datums = new ConcurrentHashMap(); + private static volatile ConcurrentMap datums = new ConcurrentHashMap(); private static PeerSet peers = new PeerSet(); - private static volatile Notifier notifier = new Notifier(); + public static volatile Notifier notifier = new Notifier(); public static void init() throws Exception { @@ -186,8 +186,9 @@ public class RaftCore { } long end = System.currentTimeMillis(); - Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key); - + if (Loggers.RAFT.isDebugEnabled()) { + Loggers.RAFT.debug("signalPublish cost " + (end - start) + " ms" + " : " + key); + } } public static void doSignalPublish(String key, String value, boolean locked) throws Exception { @@ -271,7 +272,9 @@ public class RaftCore { } long end = System.currentTimeMillis(); - Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key); + if (Loggers.RAFT.isDebugEnabled()) { + Loggers.RAFT.debug("signalPublish cost " + (end - start) + " ms" + " : " + key); + } } finally { RaftCore.OPERATE_LOCK.unlock(); } @@ -351,15 +354,6 @@ public class RaftCore { local.resetLeaderDue(); - Datum datumOrigin = RaftCore.getDatum(datum.key); - - if (datumOrigin != null && datumOrigin.timestamp.get() > datum.timestamp.get()) { - // refuse operation: - Loggers.RAFT.warn("out of date publish, pub-timestamp:" - + datumOrigin.timestamp.get() + ", cur-timestamp: " + datum.timestamp.get()); - return; - } - // do apply if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID) || UtilsAndCommons.INSTANCE_LIST_PERSISTED) { RaftStore.write(datum); @@ -384,7 +378,7 @@ public class RaftCore { notifier.addTask(datum, Notifier.ApplyAction.CHANGE); - Loggers.RAFT.info("data added/updated, key=" + datum.key + ", term: " + local.term); + Loggers.RAFT.info("data added/updated, key=" + datum.key + ", term: " + local.term + ", increaseTerm:" + increaseTerm); } public static void onDelete(JSONObject params) throws Exception { @@ -989,6 +983,10 @@ public class RaftCore { tasks.add(Pair.with(datum, action)); } + public int getTaskSize() { + return tasks.size(); + } + @Override public void run() { Loggers.RAFT.info("raft notifier started"); @@ -1011,7 +1009,9 @@ public class RaftCore { for (RaftListener listener : listeners) { if (listener instanceof VirtualClusterDomain) { - Loggers.RAFT.debug("listener: " + ((VirtualClusterDomain) listener).getName()); + if (Loggers.RAFT.isDebugEnabled()) { + Loggers.RAFT.debug("listener: " + ((VirtualClusterDomain) listener).getName()); + } } if (!listener.interests(datum.key)) { @@ -1036,8 +1036,10 @@ public class RaftCore { } } - Loggers.RAFT.debug("VIPSRV-RAFT", "datum change notified" + + if (Loggers.RAFT.isDebugEnabled()) { + Loggers.RAFT.debug("VIPSRV-RAFT", "datum change notified" + ", key: " + datum.key + "; listener count: " + count); + } } catch (Throwable e) { Loggers.RAFT.error("VIPSRV-RAFT", "Error while handling notifying task", e); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/web/ApiCommands.java b/naming/src/main/java/com/alibaba/nacos/naming/web/ApiCommands.java index b1586e077..efe445c1e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/web/ApiCommands.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/web/ApiCommands.java @@ -34,7 +34,10 @@ import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.push.ClientInfo; import com.alibaba.nacos.naming.push.DataSource; import com.alibaba.nacos.naming.push.PushService; -import com.alibaba.nacos.naming.raft.*; +import com.alibaba.nacos.naming.raft.Datum; +import com.alibaba.nacos.naming.raft.RaftCore; +import com.alibaba.nacos.naming.raft.RaftPeer; +import com.alibaba.nacos.naming.raft.RaftProxy; import com.ning.http.client.AsyncCompletionHandler; import com.ning.http.client.Response; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -321,7 +324,15 @@ public class ApiCommands { doAddCluster4Dom(MockHttpRequest.buildRequest(stringMap)); } + JSONObject result = new JSONObject(); + + result.put("clientBeatInterval", Switch.getClientBeatInterval()); + if (!virtualClusterDomain.allIPs().contains(ipAddress)) { + + if (!virtualClusterDomain.getEnableClientBeat()) { + return result; + } stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1])); stringMap.put("json", Arrays.asList("true").toArray(new String[1])); addIP4Dom(MockHttpRequest.buildRequest(stringMap)); @@ -355,10 +366,6 @@ public class ApiCommands { } } - JSONObject result = new JSONObject(); - - result.put("clientBeatInterval", Switch.getClientBeatInterval()); - return result; } @@ -851,8 +858,6 @@ public class ApiCommands { } } - long timestamp = Long.parseLong(WebUtils.required(request, "timestamp")); - if (CollectionUtils.isEmpty(newIPs)) { throw new IllegalArgumentException("Empty ip list"); } @@ -866,7 +871,7 @@ public class ApiCommands { + ", if you want to add them, remove updateOnly flag"); } } - domainsManager.easyAddIP4Dom(dom, newIPs, timestamp, term); + domainsManager.easyAddIP4Dom(dom, newIPs, term); return "ok"; } @@ -883,7 +888,9 @@ public class ApiCommands { proxyParams.put(entry.getKey(), entry.getValue()[0]); } - Loggers.DEBUG_LOG.debug("[ADD-IP] full arguments:" + proxyParams + ", client:" + request.getRemoteAddr()); + if (Loggers.DEBUG_LOG.isDebugEnabled()) { + Loggers.DEBUG_LOG.debug("[ADD-IP] full arguments:" + proxyParams + ", client:" + request.getRemoteAddr()); + } String ipListString = WebUtils.required(request, "ipList"); final List ipList; @@ -953,6 +960,7 @@ public class ApiCommands { String key = UtilsAndCommons.getIPListStoreKey(domain); Datum datum = RaftCore.getDatum(key); + if (datum == null) { try { domainsManager.getDom2LockMap().get(dom).lock(); @@ -967,16 +975,17 @@ public class ApiCommands { } } - long timestamp = RaftCore.getDatum(key).timestamp.incrementAndGet(); + long timestamp = RaftCore.getDatum(key).timestamp.get(); if (RaftCore.isLeader()) { - proxyParams.put("clientIP", NetUtils.localServer()); - proxyParams.put("notify", "true"); - proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term)); - proxyParams.put("timestamp", String.valueOf(timestamp)); - try { - domainsManager.getDom2LockMap().get(dom).lock(); + + RaftCore.OPERATE_LOCK.lock(); + + proxyParams.put("clientIP", NetUtils.localServer()); + proxyParams.put("notify", "true"); + proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term)); + proxyParams.put("timestamp", String.valueOf(timestamp)); onAddIP4Dom(MockHttpRequest.buildRequest2(proxyParams)); @@ -986,7 +995,7 @@ public class ApiCommands { asyncOnAddIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); } } finally { - domainsManager.getDom2LockMap().get(dom).unlock(); + RaftCore.OPERATE_LOCK.unlock(); } } @@ -994,24 +1003,35 @@ public class ApiCommands { return "ok"; } - private void syncOnAddIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) throws InterruptedException { + private void syncOnUpdateIP4Dom(String dom, List ipList, Map proxyParams, String clientIP, String action) throws InterruptedException { String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom)); final CountDownLatch countDownLatch = new CountDownLatch(RaftCore.getPeerSet().majorityCount()); - addIpPublish(dom, ipList, proxyParams, clientIP, countDownLatch); + updateIpPublish(dom, ipList, proxyParams, clientIP, countDownLatch, action); if (!countDownLatch.await(UtilsAndCommons.MAX_PUBLISH_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) { Loggers.RAFT.info("data publish failed, key=" + key, ",notify timeout."); throw new IllegalArgumentException("data publish failed, key=" + key); } } - private void asyncOnAddIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) { - addIpPublish(dom, ipList, proxyParams, clientIP, null); + private void syncOnAddIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) throws InterruptedException { + syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD); } + private void asyncOnAddIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) { + updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD); + } - private void addIpPublish(String dom, List ipList, Map proxyParams, String clientIP, CountDownLatch countDownLatch) { + private void syncOnRemvIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) throws InterruptedException { + syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE); + } + + private void asyncOnRemvIP4Dom(String dom, List ipList, Map proxyParams, String clientIP) { + updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE); + } + + private void updateIpPublish(String dom, List ipList, Map proxyParams, String clientIP, CountDownLatch countDownLatch, String action) { for (final String peer : RaftCore.getPeerSet().allServersWithoutMySelf()) { @@ -1025,8 +1045,10 @@ public class ApiCommands { server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort(); } + String api = action.equals("remove") ? "onRemvIP4Dom" : "onAddIP4Dom"; + String url = "http://" + server - + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/onAddIP4Dom"; + + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api; try { HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() { @@ -1045,15 +1067,11 @@ public class ApiCommands { } }); } catch (Exception e) { - Loggers.SRV_LOG.error("ADD-IP", "failed when publish to peer." + url, e); + Loggers.SRV_LOG.error(action + "-IP", "failed when publish to peer." + url, e); } } }); } - - Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-ADD}" + " new: " - + Arrays.toString(ipList.toArray()) + " operatorIP: " - + clientIP); } @NeedAuth @@ -1262,37 +1280,142 @@ public class ApiCommands { return result; } + @RequestMapping("/onRemvIP4Dom") + public void onRemvIP4Dom(HttpServletRequest request) throws Exception { + if (Switch.getDisableAddIP()) { + throw new AccessControlException("Deleting IP for dom is forbidden now."); + } + + String clientIP = WebUtils.required(request, "clientIP"); + long term = Long.parseLong(WebUtils.required(request, "term")); + + if (!RaftCore.isLeader(clientIP)) { + Loggers.RAFT.warn("peer(" + JSON.toJSONString(clientIP) + ") tried to publish " + + "data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader())); + throw new IllegalStateException("peer(" + clientIP + ") tried to publish " + + "data but wasn't leader"); + } + + if (term < RaftCore.getPeerSet().local().term.get()) { + Loggers.RAFT.warn("out of date publish, pub-term: " + + JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local())); + throw new IllegalStateException("out of date publish, pub-term:" + + term + ", cur-term: " + RaftCore.getPeerSet().local().term); + } + + RaftCore.getPeerSet().local().resetLeaderDue(); + + final String dom = WebUtils.required(request, "dom"); + if (domainsManager.getDomain(dom) == null) { + throw new IllegalStateException("dom doesn't exist: " + dom); + } + + List removedIPs = getIpAddresses(request); + + if (CollectionUtils.isEmpty(removedIPs)) { + throw new IllegalArgumentException("Empty ip list"); + } + + domainsManager.easyRemvIP4Dom(dom, removedIPs, term); + } + @NeedAuth @RequestMapping("/remvIP4Dom") public String remvIP4Dom(HttpServletRequest request) throws Exception { String dom = WebUtils.required(request, "dom"); String ipListString = WebUtils.required(request, "ipList"); - Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: serviceName:" + dom + ", iplist:" + ipListString); - - List newIPs = new ArrayList<>(); - List ipList = new ArrayList<>(); - if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) { - newIPs = JSON.parseObject(ipListString, new TypeReference>() { - }); - } else { - ipList = Arrays.asList(ipListString.split(",")); + Map proxyParams = new HashMap<>(16); + for (Map.Entry entry : request.getParameterMap().entrySet()) { + proxyParams.put(entry.getKey(), entry.getValue()[0]); } + if (Loggers.DEBUG_LOG.isDebugEnabled()) { + Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: params:" + proxyParams); + } + + List ipList = new ArrayList<>(); + List ipObjList = new ArrayList<>(ipList.size()); if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) { - ipObjList = newIPs; + ipList = Arrays.asList(ipListString); + ipObjList = JSON.parseObject(ipListString, new TypeReference>() { + }); } else { + ipList = Arrays.asList(ipListString.split(",")); for (String ip : ipList) { ipObjList.add(IpAddress.fromJSON(ip)); } } - domainsManager.easyRemvIP4Dom(dom, ipObjList); + if (!RaftCore.isLeader()) { + Loggers.RAFT.info("I'm not leader, will proxy to leader."); + if (RaftCore.getLeader() == null) { + throw new IllegalArgumentException("no leader now."); + } - Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " dead: " - + ipList + " operator: " - + WebUtils.optional(request, "clientIP", "unknown")); + RaftPeer leader = RaftCore.getLeader(); + + String server = leader.ip; + if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) { + server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort(); + } + + String url = "http://" + server + + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/remvIP4Dom"; + HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams); + + if (result1.code != HttpURLConnection.HTTP_OK) { + Loggers.SRV_LOG.warn("failed to remove ip for dom, caused " + result1.content); + throw new IllegalArgumentException("failed to remove ip for dom, caused " + result1.content); + } + + return "ok"; + } + + VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(dom); + + if (domain == null) { + throw new IllegalStateException("dom doesn't exist: " + dom); + } + + if (CollectionUtils.isEmpty(ipObjList)) { + throw new IllegalArgumentException("Empty ip list"); + } + + String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom)); + + long timestamp = 1; + if (RaftCore.getDatum(key) != null) { + timestamp = RaftCore.getDatum(key).timestamp.get(); + } + + if (RaftCore.isLeader()) { + + try { + + RaftCore.OPERATE_LOCK.lock(); + + proxyParams.put("clientIP", NetUtils.localServer()); + proxyParams.put("notify", "true"); + proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term)); + proxyParams.put("timestamp", String.valueOf(timestamp)); + + onRemvIP4Dom(MockHttpRequest.buildRequest2(proxyParams)); + + if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) { + syncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); + } else { + asyncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); + } + } finally { + RaftCore.OPERATE_LOCK.unlock(); + } + + Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " new: " + + ipListString + " operatorIP: " + + WebUtils.optional(request, "clientIP", "unknown")); + } return "ok"; } @@ -2046,6 +2169,7 @@ public class ApiCommands { result.put("cpu", SystemUtils.getCPU()); result.put("load", SystemUtils.getLoad()); result.put("mem", SystemUtils.getMem()); + result.put("notifyTask", RaftCore.notifier.getTaskSize()); return result; } @@ -2392,6 +2516,29 @@ public class ApiCommands { return pac; } + private List getIpAddresses(HttpServletRequest request) { + String ipListString = WebUtils.required(request, "ipList"); + final List ipList; + List newIPs = new ArrayList<>(); + + if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) { + newIPs = JSON.parseObject(ipListString, new TypeReference>() { + }); + } else { + ipList = Arrays.asList(ipListString.split(",")); + for (String ip : ipList) { + IpAddress ipAddr = IpAddress.fromJSON(ip); + if (ipAddr == null) { + throw new IllegalArgumentException("malformed ip ->" + ip); + } + + newIPs.add(ipAddr); + } + } + + return newIPs; + } + public void setDomainsManager(DomainsManager domainsManager) { this.domainsManager = domainsManager; } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/DomainsManagerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/DomainsManagerTest.java index a28c84f46..a170d18c0 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/DomainsManagerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/DomainsManagerTest.java @@ -49,22 +49,6 @@ public class DomainsManagerTest extends BaseTest { domainsManager.easyRemoveDom("nacos.test.1"); } - @Test - public void easyRemvIP4Dom() throws Exception { - - VirtualClusterDomain domain = new VirtualClusterDomain(); - domain.setName("nacos.test.1"); - - domainsManager.chooseDomMap().put("nacos.test.1", domain); - - IpAddress ipAddress = new IpAddress(); - ipAddress.setIp("1.1.1.1"); - List ipList = new ArrayList(); - ipList.add(ipAddress); - domainsManager.addLock("nacos.test.1"); - domainsManager.easyRemvIP4Dom("nacos.test.1", ipList); - } - @Test public void searchDom() throws Exception { VirtualClusterDomain domain = new VirtualClusterDomain(); diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/AutoDeregisterInstance_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/AutoDeregisterInstance_ITCase.java index 4db794aa3..2a7c8fcc3 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/AutoDeregisterInstance_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/AutoDeregisterInstance_ITCase.java @@ -85,7 +85,6 @@ public class AutoDeregisterInstance_ITCase { namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT); - //TimeUnit.SECONDS.sleep(40); verifyInstanceList(instances, 1, serviceName); instances = naming.getAllInstances(serviceName); @@ -123,7 +122,6 @@ public class AutoDeregisterInstance_ITCase { namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT); - //TimeUnit.SECONDS.sleep(40); verifyInstanceList(instances, 1, serviceName); instances = naming.getAllInstances(serviceName); @@ -156,7 +154,6 @@ public class AutoDeregisterInstance_ITCase { namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT); - //TimeUnit.SECONDS.sleep(40); verifyInstanceList(instances, 1, serviceName); instances = naming.getAllInstances(serviceName); @@ -200,7 +197,6 @@ public class AutoDeregisterInstance_ITCase { namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT); - //TimeUnit.SECONDS.sleep(40); verifyInstanceList(instances, 1, serviceName); instances = naming.getAllInstances(serviceName); diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/SelectInstances_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/SelectInstances_ITCase.java index b2e501a3c..508efa7d3 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/SelectInstances_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/SelectInstances_ITCase.java @@ -379,7 +379,7 @@ public class SelectInstances_ITCase { TimeUnit.SECONDS.sleep(10); ExpressionSelector expressionSelector = new ExpressionSelector(); - expressionSelector.setExpression("INSTANCE.metadata.registerSource = 'dubbo'"); + expressionSelector.setExpression("INSTANCE.label.registerSource = 'dubbo'"); ListView serviceList = naming.getServicesOfServer(1, 10, expressionSelector); Assert.assertTrue(serviceList.getData().contains(serviceName));