From e259dc34e895acb66b91af4d0eb712fecddbb211 Mon Sep 17 00:00:00 2001 From: rushsky518 Date: Mon, 22 Jul 2019 19:24:46 +0800 Subject: [PATCH] =?UTF-8?q?#1529=20distro=20=E4=BD=BF=E7=94=A8=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E6=97=B6=E9=97=B4=E8=AE=B0=E5=BD=95=E5=BF=83=E8=B7=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../naming/cluster/ServerListManager.java | 177 +++++++++--------- 1 file changed, 91 insertions(+), 86 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java index f29c5a2d8..5e53acec6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -54,6 +54,8 @@ public class ServerListManager { private Map> distroConfig = new ConcurrentHashMap<>(); + private Map distroBeats = new ConcurrentHashMap<>(16); + private Set liveSites = new HashSet<>(); private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; @@ -196,11 +198,17 @@ public class ServerListManager { throw new IllegalArgumentException("server: " + server.getKey() + " is not in serverlist"); } + Long lastBeat = distroBeats.get(server.getKey()); + long now = System.currentTimeMillis(); + if (null != lastBeat) { + server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); + } + distroBeats.put(server.getKey(), now); + Date date = new Date(Long.parseLong(params[2])); server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)); server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1); - server.setAlive(System.currentTimeMillis() - server.getLastRefTime() < switchDomain.getDistroServerExpiredMillis()); List list = distroConfig.get(server.getSite()); if (list == null || list.size() <= 0) { list = new ArrayList<>(); @@ -230,65 +238,6 @@ public class ServerListManager { distroConfig.put(server.getSite(), tmpServerList); } liveSites.addAll(distroConfig.keySet()); - - List servers = distroConfig.get(LOCALHOST_SITE); - if (CollectionUtils.isEmpty(servers)) { - return; - } - - //local site servers - List allLocalSiteSrvs = new ArrayList<>(); - for (Server server : servers) { - - if (server.getKey().endsWith(":0")) { - continue; - } - - server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey())); - - for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) { - - if (!allLocalSiteSrvs.contains(server.getKey())) { - allLocalSiteSrvs.add(server.getKey()); - } - - if (server.isAlive() && !newHealthyList.contains(server)) { - newHealthyList.add(server); - } - } - } - - Collections.sort(newHealthyList); - float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size(); - - if (autoDisabledHealthCheck - && curRatio > switchDomain.getDistroThreshold() - && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) { - Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " + - "stable now, enable health check. current ratio: {}", curRatio); - - switchDomain.setHealthCheckEnabled(true); - - // we must set this variable, otherwise it will conflict with user's action - autoDisabledHealthCheck = false; - } - - if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) { - // for every change disable healthy check for some while - if (switchDomain.isHealthCheckEnabled()) { - Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " + - "disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD, - healthyServers, newHealthyList); - - switchDomain.setHealthCheckEnabled(false); - autoDisabledHealthCheck = true; - - lastHealthServerMillis = System.currentTimeMillis(); - } - - healthyServers = newHealthyList; - notifyListeners(); - } } public void clean() { @@ -310,29 +259,14 @@ public class ServerListManager { } private void cleanInvalidServers() { - for (Map.Entry> entry : distroConfig.entrySet()) { - List tmpServers = null; - List currentServerList = entry.getValue(); - - for (Server server : entry.getValue()) { - if (!server.isAlive()) { - - tmpServers = new ArrayList<>(); - - for (Server server1 : currentServerList) { - String serverKey1 = server1.getKey() + "_" + server1.getSite(); - String serverKey = server.getKey() + "_" + server.getSite(); - - if (!serverKey.equals(serverKey1) && !tmpServers.contains(server1)) { - tmpServers.add(server1); - } - } - } - } - if (tmpServers != null) { - distroConfig.put(entry.getKey(), tmpServers); + List currentServers = entry.getValue(); + if (null == currentServers) { + distroConfig.remove(entry.getKey()); + continue; } + + currentServers.removeIf(server -> !server.isAlive()); } } @@ -397,11 +331,7 @@ public class ServerListManager { return; } - for (String key : distroConfig.keySet()) { - for (Server server : distroConfig.get(key)) { - server.setAlive(System.currentTimeMillis() - server.getLastRefTime() < switchDomain.getDistroServerExpiredMillis()); - } - } + checkDistroHeartbeat(); int weight = Runtime.getRuntime().availableProcessors() / 2; if (weight <= 0) { @@ -442,4 +372,79 @@ public class ServerListManager { } } + + private void checkDistroHeartbeat() { + + Loggers.EPHEMERAL.debug("check distro heartbeat."); + + List servers = distroConfig.get(LOCALHOST_SITE); + if (CollectionUtils.isEmpty(servers)) { + return; + } + + List newHealthyList = new ArrayList<>(servers.size()); + long now = System.currentTimeMillis(); + for (Server s: servers) { + Long lastBeat = distroBeats.get(s.getKey()); + if (null == lastBeat) { + continue; + } + s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); + } + + //local site servers + List allLocalSiteSrvs = new ArrayList<>(); + for (Server server : servers) { + + if (server.getKey().endsWith(":0")) { + continue; + } + + server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey())); + + for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) { + + if (!allLocalSiteSrvs.contains(server.getKey())) { + allLocalSiteSrvs.add(server.getKey()); + } + + if (server.isAlive() && !newHealthyList.contains(server)) { + newHealthyList.add(server); + } + } + } + + Collections.sort(newHealthyList); + float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size(); + + if (autoDisabledHealthCheck + && curRatio > switchDomain.getDistroThreshold() + && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) { + Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " + + "stable now, enable health check. current ratio: {}", curRatio); + + switchDomain.setHealthCheckEnabled(true); + + // we must set this variable, otherwise it will conflict with user's action + autoDisabledHealthCheck = false; + } + + if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) { + // for every change disable healthy check for some while + if (switchDomain.isHealthCheckEnabled()) { + Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " + + "disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD, + healthyServers, newHealthyList); + + switchDomain.setHealthCheckEnabled(false); + autoDisabledHealthCheck = true; + + lastHealthServerMillis = System.currentTimeMillis(); + } + + healthyServers = newHealthyList; + notifyListeners(); + } + } + }