Merge pull request #1587 from rushsky518/develop

distro use local time to record heartbeat
This commit is contained in:
Fury Zhu 2019-07-30 16:35:16 +08:00 committed by GitHub
commit 1c68568f4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -54,6 +54,8 @@ public class ServerListManager {
private Map<String, List<Server>> distroConfig = new ConcurrentHashMap<>();
private Map<String, Long> distroBeats = new ConcurrentHashMap<>(16);
private Set<String> liveSites = new HashSet<>();
private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
@ -196,11 +198,17 @@ public class ServerListManager {
throw new IllegalArgumentException("server: " + server.getKey() + " is not in serverlist");
}
Long lastBeat = distroBeats.get(server.getKey());
long now = System.currentTimeMillis();
if (null != lastBeat) {
server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
distroBeats.put(server.getKey(), now);
Date date = new Date(Long.parseLong(params[2]));
server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1);
server.setAlive(System.currentTimeMillis() - server.getLastRefTime() < switchDomain.getDistroServerExpiredMillis());
List<Server> list = distroConfig.get(server.getSite());
if (list == null || list.size() <= 0) {
list = new ArrayList<>();
@ -230,65 +238,6 @@ public class ServerListManager {
distroConfig.put(server.getSite(), tmpServerList);
}
liveSites.addAll(distroConfig.keySet());
List<Server> servers = distroConfig.get(LOCALHOST_SITE);
if (CollectionUtils.isEmpty(servers)) {
return;
}
//local site servers
List<String> allLocalSiteSrvs = new ArrayList<>();
for (Server server : servers) {
if (server.getKey().endsWith(":0")) {
continue;
}
server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {
if (!allLocalSiteSrvs.contains(server.getKey())) {
allLocalSiteSrvs.add(server.getKey());
}
if (server.isAlive() && !newHealthyList.contains(server)) {
newHealthyList.add(server);
}
}
}
Collections.sort(newHealthyList);
float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size();
if (autoDisabledHealthCheck
&& curRatio > switchDomain.getDistroThreshold()
&& System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) {
Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " +
"stable now, enable health check. current ratio: {}", curRatio);
switchDomain.setHealthCheckEnabled(true);
// we must set this variable, otherwise it will conflict with user's action
autoDisabledHealthCheck = false;
}
if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
// for every change disable healthy check for some while
if (switchDomain.isHealthCheckEnabled()) {
Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " +
"disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD,
healthyServers, newHealthyList);
switchDomain.setHealthCheckEnabled(false);
autoDisabledHealthCheck = true;
lastHealthServerMillis = System.currentTimeMillis();
}
healthyServers = newHealthyList;
notifyListeners();
}
}
public void clean() {
@ -310,29 +259,14 @@ public class ServerListManager {
}
private void cleanInvalidServers() {
for (Map.Entry<String, List<Server>> entry : distroConfig.entrySet()) {
List<Server> tmpServers = null;
List<Server> currentServerList = entry.getValue();
for (Server server : entry.getValue()) {
if (!server.isAlive()) {
tmpServers = new ArrayList<>();
for (Server server1 : currentServerList) {
String serverKey1 = server1.getKey() + "_" + server1.getSite();
String serverKey = server.getKey() + "_" + server.getSite();
if (!serverKey.equals(serverKey1) && !tmpServers.contains(server1)) {
tmpServers.add(server1);
}
}
}
}
if (tmpServers != null) {
distroConfig.put(entry.getKey(), tmpServers);
List<Server> currentServers = entry.getValue();
if (null == currentServers) {
distroConfig.remove(entry.getKey());
continue;
}
currentServers.removeIf(server -> !server.isAlive());
}
}
@ -397,11 +331,7 @@ public class ServerListManager {
return;
}
for (String key : distroConfig.keySet()) {
for (Server server : distroConfig.get(key)) {
server.setAlive(System.currentTimeMillis() - server.getLastRefTime() < switchDomain.getDistroServerExpiredMillis());
}
}
checkDistroHeartbeat();
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {
@ -442,4 +372,79 @@ public class ServerListManager {
}
}
private void checkDistroHeartbeat() {
Loggers.EPHEMERAL.debug("check distro heartbeat.");
List<Server> servers = distroConfig.get(LOCALHOST_SITE);
if (CollectionUtils.isEmpty(servers)) {
return;
}
List<Server> newHealthyList = new ArrayList<>(servers.size());
long now = System.currentTimeMillis();
for (Server s: servers) {
Long lastBeat = distroBeats.get(s.getKey());
if (null == lastBeat) {
continue;
}
s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//local site servers
List<String> allLocalSiteSrvs = new ArrayList<>();
for (Server server : servers) {
if (server.getKey().endsWith(":0")) {
continue;
}
server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {
if (!allLocalSiteSrvs.contains(server.getKey())) {
allLocalSiteSrvs.add(server.getKey());
}
if (server.isAlive() && !newHealthyList.contains(server)) {
newHealthyList.add(server);
}
}
}
Collections.sort(newHealthyList);
float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size();
if (autoDisabledHealthCheck
&& curRatio > switchDomain.getDistroThreshold()
&& System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) {
Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " +
"stable now, enable health check. current ratio: {}", curRatio);
switchDomain.setHealthCheckEnabled(true);
// we must set this variable, otherwise it will conflict with user's action
autoDisabledHealthCheck = false;
}
if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
// for every change disable healthy check for some while
if (switchDomain.isHealthCheckEnabled()) {
Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " +
"disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD,
healthyServers, newHealthyList);
switchDomain.setHealthCheckEnabled(false);
autoDisabledHealthCheck = true;
lastHealthServerMillis = System.currentTimeMillis();
}
healthyServers = newHealthyList;
notifyListeners();
}
}
}