From 6fb27500219346bad0a113bdf39bee935c154d6e Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Mon, 1 Jun 2020 15:35:21 +0800 Subject: [PATCH 1/4] fix: fixed compatibility issues with older versions --- .../alibaba/nacos/common/http/HttpUtils.java | 1 - .../alibaba/nacos/common/utils/ByteUtils.java | 7 +- .../alibaba/nacos/common/utils/MapUtils.java | 1 - .../nacos/common/utils/StringUtils.java | 5 +- .../nacos/common/utils/ThreadUtils.java | 1 - .../core/cluster/ServerMemberManager.java | 7 +- .../alibaba/nacos/naming/cluster/Server.java | 229 +++++++++++ .../naming/cluster/ServerListManager.java | 354 ++++++++++++++++++ .../nacos/naming/cluster/ServerUtils.java | 54 +++ .../DelegateConsistencyServiceImpl.java | 13 +- .../ephemeral/distro/DataSyncer.java | 29 +- .../distro/DistroConsistencyServiceImpl.java | 65 ++-- .../controllers/OperatorController.java | 64 ++-- .../nacos/naming/core/DistroMapper.java | 24 +- .../nacos/naming/core/ServiceManager.java | 32 +- .../META-INF/logback/naming-included.xml | 21 ++ .../core/cluster/MemberLookup_ITCase.java | 16 +- .../cluster/ServerMemberManager_ITCase.java | 12 +- 18 files changed, 819 insertions(+), 116 deletions(-) create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java diff --git a/common/src/main/java/com/alibaba/nacos/common/http/HttpUtils.java b/common/src/main/java/com/alibaba/nacos/common/http/HttpUtils.java index b29a69bbc..2a7e91dfc 100644 --- a/common/src/main/java/com/alibaba/nacos/common/http/HttpUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/http/HttpUtils.java @@ -21,7 +21,6 @@ import com.alibaba.nacos.common.utils.StringUtils; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.List; diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java index 2e393fad1..e8ad94738 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java @@ -17,12 +17,13 @@ package com.alibaba.nacos.common.utils; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; + import org.apache.commons.lang3.StringUtils; /** * @author liaochuntao */ +@SuppressWarnings("all") public final class ByteUtils { public static final byte[] EMPTY = new byte[0]; @@ -31,7 +32,7 @@ public final class ByteUtils { if (s == null) { return EMPTY; } - return s.getBytes(Charset.forName(StandardCharsets.UTF_8.name())); + return s.getBytes(Charset.forName("UTF-8")); } public static byte[] toBytes(Object s) { @@ -45,7 +46,7 @@ public final class ByteUtils { if (bytes == null) { return StringUtils.EMPTY; } - return new String(bytes, Charset.forName(StandardCharsets.UTF_8.name())); + return new String(bytes, Charset.forName("UTF-8")); } public static boolean isEmpty(byte[] data) { diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java index 06cf34b97..de33ea779 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java @@ -21,7 +21,6 @@ package com.alibaba.nacos.common.utils; import java.util.Collection; import java.util.Dictionary; import java.util.Map; -import java.util.Objects; /** * @author liaochuntao diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java index c4b13c425..a0967877a 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java @@ -15,13 +15,10 @@ */ package com.alibaba.nacos.common.utils; -import org.apache.commons.lang3.CharSequenceUtils; - import java.io.IOException; import java.io.StringWriter; import java.io.Writer; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Locale; @@ -41,7 +38,7 @@ public class StringUtils { public static final String EMPTY = ""; public static String newString4UTF8(byte[] bytes) { - return new String(bytes, Charset.forName(StandardCharsets.UTF_8.name())); + return new String(bytes, Charset.forName("UTF-8")); } public static boolean isBlank(String str) { diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index c43ba0336..bad5b2492 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -18,7 +18,6 @@ package com.alibaba.nacos.common.utils; import org.slf4j.Logger; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index d02d0ec2d..5f625960a 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -123,14 +123,15 @@ public class ServerMemberManager */ private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask(); - public ServerMemberManager(ServletContext servletContext) { + public ServerMemberManager(ServletContext servletContext) throws Exception { this.serverList = new ConcurrentSkipListMap(); ApplicationUtils.setContextPath(servletContext.getContextPath()); MemberUtils.setManager(this); + + init(); } - @PostConstruct - public void init() throws NacosException { + protected void init() throws NacosException { Loggers.CORE.info("Nacos-related cluster resource initialization"); this.port = ApplicationUtils.getProperty("server.port", Integer.class, 8848); this.localAddress = InetUtils.getSelfIp() + ":" + port; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java new file mode 100644 index 000000000..89644c429 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java @@ -0,0 +1,229 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster; + +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; + +/** + * Member node of Nacos cluster + * + * // TODO This object will be deleted sometime after version 1.3.0 + * + * @author nkorange + * @since 1.0.0 + * @deprecated 1.3.0 + */ +public class Server implements Comparable { + + /** + * IP of member + */ + private String ip; + + /** + * serving port of member. + */ + private int servePort; + + private String site = UtilsAndCommons.UNKNOWN_SITE; + + private int weight = 1; + + /** + * additional weight, used to adjust manually + */ + private int adWeight; + + private boolean alive = false; + + private long lastRefTime = 0L; + + private String lastRefTimeStr; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getServePort() { + return servePort; + } + + public void setServePort(int servePort) { + this.servePort = servePort; + } + + public String getSite() { + return site; + } + + public void setSite(String site) { + this.site = site; + } + + public int getWeight() { + return weight; + } + + public void setWeight(int weight) { + this.weight = weight; + } + + public int getAdWeight() { + return adWeight; + } + + public void setAdWeight(int adWeight) { + this.adWeight = adWeight; + } + + public boolean isAlive() { + return alive; + } + + public void setAlive(boolean alive) { + this.alive = alive; + } + + public long getLastRefTime() { + return lastRefTime; + } + + public void setLastRefTime(long lastRefTime) { + this.lastRefTime = lastRefTime; + } + + public String getLastRefTimeStr() { + return lastRefTimeStr; + } + + public void setLastRefTimeStr(String lastRefTimeStr) { + this.lastRefTimeStr = lastRefTimeStr; + } + + public String getKey() { + return ip + UtilsAndCommons.IP_PORT_SPLITER + servePort; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Server server = (Server) o; + return servePort == server.servePort && ip.equals(server.ip); + } + + @Override + public int hashCode() { + int result = ip.hashCode(); + result = 31 * result + servePort; + return result; + } + + @Override + public String toString() { + return JacksonUtils.toJson(this); + } + + @Override + public int compareTo(Server server) { + if (server == null) { + return 1; + } + return this.getKey().compareTo(server.getKey()); + } + + public static ServerBuilder builder() { + return new ServerBuilder(); + } + + public static final class ServerBuilder { + private String ip; + private int servePort; + private String site = UtilsAndCommons.UNKNOWN_SITE; + private int weight = 1; + private int adWeight; + private boolean alive = false; + private long lastRefTime = 0L; + private String lastRefTimeStr; + + private ServerBuilder() { + } + + public ServerBuilder ip(String ip) { + this.ip = ip; + return this; + } + + public ServerBuilder servePort(int servePort) { + this.servePort = servePort; + return this; + } + + public ServerBuilder site(String site) { + this.site = site; + return this; + } + + public ServerBuilder weight(int weight) { + this.weight = weight; + return this; + } + + public ServerBuilder adWeight(int adWeight) { + this.adWeight = adWeight; + return this; + } + + public ServerBuilder alive(boolean alive) { + this.alive = alive; + return this; + } + + public ServerBuilder lastRefTime(long lastRefTime) { + this.lastRefTime = lastRefTime; + return this; + } + + public ServerBuilder lastRefTimeStr(String lastRefTimeStr) { + this.lastRefTimeStr = lastRefTimeStr; + return this; + } + + public Server build() { + Server server = new Server(); + server.setIp(ip); + server.setServePort(servePort); + server.setSite(site); + server.setWeight(weight); + server.setAdWeight(adWeight); + server.setAlive(alive); + server.setLastRefTime(lastRefTime); + server.setLastRefTimeStr(lastRefTimeStr); + return server; + } + } +} \ No newline at end of file diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java new file mode 100644 index 000000000..40a26271a --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -0,0 +1,354 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster; + +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MemberChangeListener; +import com.alibaba.nacos.core.cluster.ServerMemberManager; +import com.alibaba.nacos.core.notify.NotifyCenter; +import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.naming.misc.GlobalExecutor; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.Message; +import com.alibaba.nacos.naming.misc.NamingProxy; +import com.alibaba.nacos.naming.misc.NetUtils; +import com.alibaba.nacos.naming.misc.ServerStatusSynchronizer; +import com.alibaba.nacos.naming.misc.SwitchDomain; +import com.alibaba.nacos.naming.misc.Synchronizer; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The manager to globally refresh and operate server list. + * + * // TODO This object will be deleted sometime after version 1.3.0 + * + * @author nkorange + * @since 1.0.0 + * @deprecated 1.3.0 + */ +@Component("serverListManager") +public class ServerListManager implements MemberChangeListener { + + private static final int STABLE_PERIOD = 60 * 1000; + + private final SwitchDomain switchDomain; + private final ServerMemberManager memberManager; + + private volatile List servers; + + private volatile List healthyServers = Collections.emptyList(); + + private Map> distroConfig = new ConcurrentHashMap<>(16); + + private Map distroBeats = new ConcurrentHashMap<>(16); + + private Set liveSites = new HashSet<>(); + + private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; + + private long lastHealthServerMillis = 0L; + + private boolean autoDisabledHealthCheck = false; + + private Synchronizer synchronizer = new ServerStatusSynchronizer(); + + public ServerListManager(final SwitchDomain switchDomain, + final ServerMemberManager memberManager) { + this.switchDomain = switchDomain; + this.memberManager = memberManager; + NotifyCenter.registerSubscribe(this); + this.servers = ServerUtils.toServers(memberManager.allMembers()); + } + + @PostConstruct + public void init() { + GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000); + } + + public boolean contains(String s) { + for (Server server : getServers()) { + if (server.getKey().equals(s)) { + return true; + } + } + return false; + } + + public List getServers() { + return servers; + } + + public synchronized void onReceiveServerStatus(String configInfo) { + + Loggers.SRV_LOG.info("receive config info: {}", configInfo); + + String[] configs = configInfo.split("\r\n"); + if (configs.length == 0) { + return; + } + + List newHealthyList = new ArrayList<>(); + List tmpServerList = new ArrayList<>(); + + for (String config : configs) { + tmpServerList.clear(); + // site:ip:lastReportTime:weight + String[] params = config.split("#"); + if (params.length <= 3) { + Loggers.SRV_LOG.warn("received malformed distro map data: {}", config); + continue; + } + + Server server = new Server(); + + server.setSite(params[0]); + server.setIp(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]); + server.setServePort(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1])); + server.setLastRefTime(Long.parseLong(params[2])); + + if (!contains(server.getKey())) { + throw new IllegalArgumentException("server: " + server.getKey() + " is not in serverlist"); + } + + Long lastBeat = distroBeats.get(server.getKey()); + long now = System.currentTimeMillis(); + if (null != lastBeat) { + server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); + } + distroBeats.put(server.getKey(), now); + + Date date = new Date(Long.parseLong(params[2])); + server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)); + + server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1); + List list = distroConfig.get(server.getSite()); + if (list == null || list.size() <= 0) { + list = new ArrayList<>(); + list.add(server); + distroConfig.put(server.getSite(), list); + } + + for (Server s : list) { + String serverId = s.getKey() + "_" + s.getSite(); + String newServerId = server.getKey() + "_" + server.getSite(); + + if (serverId.equals(newServerId)) { + if (s.isAlive() != server.isAlive() || s.getWeight() != server.getWeight()) { + Loggers.SRV_LOG.warn("server beat out of date, current: {}, last: {}", + JacksonUtils.toJson(server), JacksonUtils.toJson(s)); + } + tmpServerList.add(server); + continue; + } + tmpServerList.add(s); + } + + if (!tmpServerList.contains(server)) { + tmpServerList.add(server); + } + + distroConfig.put(server.getSite(), tmpServerList); + } + liveSites.addAll(distroConfig.keySet()); + } + + public void clean() { + cleanInvalidServers(); + + for (Map.Entry> entry : distroConfig.entrySet()) { + for (Server server : entry.getValue()) { + //request other server to clean invalid servers + if (!server.getKey().equals(NetUtils.localServer())) { + requestOtherServerCleanInvalidServers(server.getKey()); + } + } + + } + } + + public Set getLiveSites() { + return liveSites; + } + + private void cleanInvalidServers() { + for (Map.Entry> entry : distroConfig.entrySet()) { + List currentServers = entry.getValue(); + if (null == currentServers) { + distroConfig.remove(entry.getKey()); + continue; + } + + currentServers.removeIf(server -> !server.isAlive()); + } + } + + private void requestOtherServerCleanInvalidServers(String serverIP) { + Map params = new HashMap(1); + + params.put("action", "without-diamond-clean"); + try { + NamingProxy.reqAPI("distroStatus", params, serverIP, false); + } catch (Exception e) { + Loggers.SRV_LOG.warn("[DISTRO-STATUS-CLEAN] Failed to request to clean server status to " + serverIP, e); + } + } + + @Override + public void onEvent(MemberChangeEvent event) { + this.servers = ServerUtils.toServers(memberManager.allMembers()); + } + + private class ServerStatusReporter implements Runnable { + + @Override + public void run() { + try { + + if (ApplicationUtils.getPort() <= 0) { + return; + } + + checkDistroHeartbeat(); + + int weight = Runtime.getRuntime().availableProcessors() / 2; + if (weight <= 0) { + weight = 1; + } + + long curTime = System.currentTimeMillis(); + String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n"; + + //send status to itself + onReceiveServerStatus(status); + + List allServers = getServers(); + + if (!contains(NetUtils.localServer())) { + Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers); + return; + } + + if (allServers.size() > 0 && !ApplicationUtils.getLocalAddress().contains(UtilsAndCommons.LOCAL_HOST_IP)) { + for (Server server : allServers) { + if (server.getKey().equals(ApplicationUtils.getLocalAddress())) { + continue; + } + + Message msg = new Message(); + msg.setData(status); + + synchronizer.send(server.getKey(), msg); + + } + } + } catch (Exception e) { + Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e); + } finally { + GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis()); + } + + } + } + + private void checkDistroHeartbeat() { + + Loggers.SRV_LOG.debug("check distro heartbeat."); + + List servers = distroConfig.get(LOCALHOST_SITE); + if (CollectionUtils.isEmpty(servers)) { + return; + } + + List newHealthyList = new ArrayList<>(servers.size()); + long now = System.currentTimeMillis(); + for (Server s: servers) { + Long lastBeat = distroBeats.get(s.getKey()); + if (null == lastBeat) { + continue; + } + s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); + } + + //local site servers + List allLocalSiteSrvs = new ArrayList<>(); + for (Server server : servers) { + + if (server.getKey().endsWith(":0")) { + continue; + } + + server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey())); + + for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) { + + if (!allLocalSiteSrvs.contains(server.getKey())) { + allLocalSiteSrvs.add(server.getKey()); + } + + if (server.isAlive() && !newHealthyList.contains(server)) { + newHealthyList.add(server); + } + } + } + + Collections.sort(newHealthyList); + float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size(); + + if (autoDisabledHealthCheck + && curRatio > switchDomain.getDistroThreshold() + && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) { + Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " + + "stable now, enable health check. current ratio: {}", curRatio); + + switchDomain.setHealthCheckEnabled(true); + + // we must set this variable, otherwise it will conflict with user's action + autoDisabledHealthCheck = false; + } + + if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) { + // for every change disable healthy check for some while + Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, old: {}, new: {}", + healthyServers, newHealthyList); + if (switchDomain.isHealthCheckEnabled() && switchDomain.isAutoChangeHealthCheckEnabled()) { + Loggers.SRV_LOG.info("[NACOS-DISTRO] disable health check for {} ms from now on.", STABLE_PERIOD); + + switchDomain.setHealthCheckEnabled(false); + autoDisabledHealthCheck = true; + + lastHealthServerMillis = System.currentTimeMillis(); + } + + healthyServers = newHealthyList; + } + } +} \ No newline at end of file diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java new file mode 100644 index 000000000..f65839dc2 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java @@ -0,0 +1,54 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster; + +import com.alibaba.nacos.core.cluster.Member; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author liaochuntao + */ +public final class ServerUtils { + + public static Server memberToServer(Member member) { + return Server.builder() + .ip(member.getIp()) + .servePort(member.getPort()) + .alive(true) + .lastRefTime(System.currentTimeMillis()) + .build(); + } + + public static Member serverToMember(Server server) { + return Member.builder() + .ip(server.getIp()) + .port(server.getServePort()) + .build(); + } + + public static List toMembers(Collection servers) { + return servers.stream().map(ServerUtils::serverToMember).collect(Collectors.toList()); + } + + public static List toServers(Collection members) { + return members.stream().map(ServerUtils::memberToServer).collect(Collectors.toList()); + } + +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java index fa24eee6d..321d3e904 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java @@ -19,7 +19,6 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService; import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; import com.alibaba.nacos.naming.pojo.Record; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; @@ -33,11 +32,15 @@ import org.springframework.stereotype.Service; @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { - @Autowired - private PersistentConsistencyService persistentConsistencyService; + private final PersistentConsistencyService persistentConsistencyService; + private final EphemeralConsistencyService ephemeralConsistencyService; - @Autowired - private EphemeralConsistencyService ephemeralConsistencyService; + public DelegateConsistencyServiceImpl( + PersistentConsistencyService persistentConsistencyService, + EphemeralConsistencyService ephemeralConsistencyService) { + this.persistentConsistencyService = persistentConsistencyService; + this.ephemeralConsistencyService = ephemeralConsistencyService; + } @Override public void put(String key, Record value) throws NacosException { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java index 451e942ff..a15925b1d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java @@ -45,22 +45,23 @@ import java.util.concurrent.ConcurrentHashMap; @DependsOn("ProtocolManager") public class DataSyncer { - @Autowired - private DataStore dataStore; + private final DataStore dataStore; + private final GlobalConfig partitionConfig; + private final Serializer serializer; + private final DistroMapper distroMapper; + private final ServerMemberManager memberManager; - @Autowired - private GlobalConfig partitionConfig; + private Map taskMap = new ConcurrentHashMap<>(16); - @Autowired - private Serializer serializer; - - @Autowired - private DistroMapper distroMapper; - - @Autowired - private ServerMemberManager memberManager; - - private Map taskMap = new ConcurrentHashMap<>(); + public DataSyncer(DataStore dataStore, GlobalConfig partitionConfig, + Serializer serializer, DistroMapper distroMapper, + ServerMemberManager memberManager) { + this.dataStore = dataStore; + this.partitionConfig = partitionConfig; + this.serializer = serializer; + this.distroMapper = distroMapper; + this.memberManager = memberManager; + } @PostConstruct public void init() { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index d495907d2..6943d45ba 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.utils.Objects; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -34,12 +35,10 @@ import com.alibaba.nacos.naming.misc.GlobalConfig; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.NamingProxy; -import com.alibaba.nacos.naming.misc.NetUtils; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.pojo.Record; import org.apache.commons.lang3.StringUtils; import org.javatuples.Pair; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import javax.annotation.PostConstruct; @@ -49,7 +48,7 @@ import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; /** * A consistency protocol algorithm called Distro @@ -69,26 +68,19 @@ import java.util.concurrent.CopyOnWriteArrayList; @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { - @Autowired - private DistroMapper distroMapper; + private final DistroMapper distroMapper; - @Autowired - private DataStore dataStore; + private final DataStore dataStore; - @Autowired - private TaskDispatcher taskDispatcher; + private final TaskDispatcher taskDispatcher; - @Autowired - private Serializer serializer; + private final Serializer serializer; - @Autowired - private ServerMemberManager memberManager; + private final ServerMemberManager memberManager; - @Autowired - private SwitchDomain switchDomain; + private final SwitchDomain switchDomain; - @Autowired - private GlobalConfig globalConfig; + private final GlobalConfig globalConfig; private boolean initialized = false; @@ -96,10 +88,23 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService private LoadDataTask loadDataTask = new LoadDataTask(); - private Map> listeners = new ConcurrentHashMap<>(); + private Map> listeners = new ConcurrentHashMap<>(); private Map syncChecksumTasks = new ConcurrentHashMap<>(16); + public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, + TaskDispatcher taskDispatcher, Serializer serializer, + ServerMemberManager memberManager, SwitchDomain switchDomain, + GlobalConfig globalConfig) { + this.distroMapper = distroMapper; + this.dataStore = dataStore; + this.taskDispatcher = taskDispatcher; + this.serializer = serializer; + this.memberManager = memberManager; + this.switchDomain = switchDomain; + this.globalConfig = globalConfig; + } + @PostConstruct public void init() { GlobalExecutor.submit(loadDataTask); @@ -115,6 +120,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService if (!initialized) { GlobalExecutor .submit(this, globalConfig.getLoadDataRetryDelayMillis()); + } else { + Loggers.DISTRO.info("load data success"); } } catch (Exception e) { @@ -136,7 +143,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService for (Map.Entry entry : memberManager.getServerList().entrySet()) { final String address = entry.getValue().getAddress(); - if (NetUtils.localServer().equals(address)) { + if (ApplicationUtils.getLocalAddress().equals(address)) { continue; } if (Loggers.DISTRO.isDebugEnabled()) { @@ -237,10 +244,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService } } - if (Loggers.DISTRO.isDebugEnabled()) { - Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", + Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server); - } for (String key : toRemoveKeys) { onRemove(key); @@ -269,8 +274,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService try { byte[] data = NamingProxy.getAllData(server); - processData(data); - return true; + return processData(data); } catch (Exception e) { Loggers.DISTRO.error("sync full data from " + server + " failed!", e); @@ -278,7 +282,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService } } - public void processData(byte[] data) throws Exception { + public boolean processData(byte[] data) throws Exception { if (data.length > 0) { Map> datumMap = serializer .deserializeMap(data, Instances.class); @@ -300,7 +304,13 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); - listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0).onChange( + + // The Listener corresponding to the key value must not be empty + RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek(); + if (Objects.isNull(listener)) { + return false; + } + listener.onChange( KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } @@ -331,12 +341,13 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService dataStore.put(entry.getKey(), entry.getValue()); } } + return true; } @Override public void listen(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { - listeners.put(key, new CopyOnWriteArrayList<>()); + listeners.put(key, new ConcurrentLinkedQueue<>()); } if (listeners.get(key).contains(listener)) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java index 70dee0529..4403df97b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java @@ -23,9 +23,9 @@ import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.naming.cluster.ServerListManager; import com.alibaba.nacos.naming.cluster.ServerStatusManager; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore; -import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.ServiceManager; @@ -36,7 +36,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; @@ -53,29 +52,30 @@ import java.util.List; @RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator", UtilsAndCommons.NACOS_NAMING_CONTEXT + "/ops"}) public class OperatorController { - @Autowired - private PushService pushService; + private final PushService pushService; + private final SwitchManager switchManager; + private final ServerListManager serverListManager; + private final ServiceManager serviceManager; + private final ServerMemberManager memberManager; + private final ServerStatusManager serverStatusManager; + private final SwitchDomain switchDomain; + private final DistroMapper distroMapper; + private final RaftCore raftCore; - @Autowired - private SwitchManager switchManager; - - @Autowired - private ServiceManager serviceManager; - - @Autowired - private ServerMemberManager memberManager; - - @Autowired - private ServerStatusManager serverStatusManager; - - @Autowired - private SwitchDomain switchDomain; - - @Autowired - private DistroMapper distroMapper; - - @Autowired - private RaftCore raftCore; + public OperatorController(PushService pushService, SwitchManager switchManager, + ServerListManager serverListManager, ServiceManager serviceManager, ServerMemberManager memberManager, + ServerStatusManager serverStatusManager, SwitchDomain switchDomain, + DistroMapper distroMapper, RaftCore raftCore) { + this.pushService = pushService; + this.switchManager = switchManager; + this.serverListManager = serverListManager; + this.serviceManager = serviceManager; + this.memberManager = memberManager; + this.serverStatusManager = serverStatusManager; + this.switchDomain = switchDomain; + this.distroMapper = distroMapper; + this.raftCore = raftCore; + } @RequestMapping("/push/state") public ObjectNode pushState(@RequestParam(required = false) boolean detail, @RequestParam(required = false) boolean reset) { @@ -200,6 +200,22 @@ public class OperatorController { return result; } + /** + * This interface will be removed in a future release + * + * // TODO This object will be deleted sometime after version 1.3.0 + * + * @deprecated 1.3.0 + * @param serverStatus server status + * @return "ok" + */ + @Deprecated + @RequestMapping("/server/status") + public String serverStatus(@RequestParam String serverStatus) { + serverListManager.onReceiveServerStatus(serverStatus); + return "ok"; + } + @PutMapping("/log") public String setLogLevel(@RequestParam String logName, @RequestParam String logLevel) { Loggers.setLogLevel(logName, logLevel); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java index 448c1f7d5..ba2d5e2b0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java @@ -22,7 +22,6 @@ import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.NetUtils; import com.alibaba.nacos.naming.misc.SwitchDomain; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; @@ -31,7 +30,6 @@ import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; /** * @author nkorange @@ -85,8 +83,8 @@ public class DistroMapper implements MemberChangeListener { return false; } - int index = servers.indexOf(NetUtils.localServer()); - int lastIndex = servers.lastIndexOf(NetUtils.localServer()); + int index = servers.indexOf(ApplicationUtils.getLocalAddress()); + int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress()); if (lastIndex < 0 || index < 0) { return true; } @@ -99,24 +97,30 @@ public class DistroMapper implements MemberChangeListener { final List servers = healthyList; if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) { - return NetUtils.localServer(); + return ApplicationUtils.getLocalAddress(); } try { - return servers.get(distroHash(serviceName) % servers.size()); + int index = distroHash(serviceName) % servers.size(); + return servers.get(index); } catch (Throwable e) { - Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e); - return NetUtils.localServer(); + Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + ApplicationUtils.getLocalAddress(), e); + return ApplicationUtils.getLocalAddress(); } } public int distroHash(String serviceName) { - return Math.abs(Objects.hash(serviceName) % Integer.MAX_VALUE); + return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE); } @Override public void onEvent(MemberChangeEvent event) { - healthyList = Collections.unmodifiableList(MemberUtils.simpleMembers(event.getMembers())); + // Here, the node list must be sorted to ensure that all nacos-server's + // node list is in the same order + List list = MemberUtils.simpleMembers(event.getMembers()); + Collections.sort(list); + healthyList = Collections.unmodifiableList(list); + System.out.println(healthyList); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java index 3d1179bc3..1488ac0cf 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java @@ -58,7 +58,6 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -85,34 +84,39 @@ public class ServiceManager implements RecordListener { @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; - @Autowired - private SwitchDomain switchDomain; + private final SwitchDomain switchDomain; - @Autowired - private DistroMapper distroMapper; + private final DistroMapper distroMapper; - @Autowired - private ServerMemberManager memberManager; + private final ServerMemberManager memberManager; - @Autowired - private PushService pushService; + private final PushService pushService; - @Autowired - private RaftPeerSet raftPeerSet; - - @Value("${nacos.naming.empty-service.auto-clean:false}") - private boolean emptyServiceAutoClean; + private final RaftPeerSet raftPeerSet; private int maxFinalizeCount = 3; private final Object putServiceLock = new Object(); + @Value("${nacos.naming.empty-service.auto-clean:false}") + private boolean emptyServiceAutoClean; + @Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}") private int cleanEmptyServiceDelay; @Value("${nacos.naming.empty-service.clean.period-time-ms:20000}") private int cleanEmptyServicePeriod; + public ServiceManager(SwitchDomain switchDomain, DistroMapper distroMapper, + ServerMemberManager memberManager, PushService pushService, + RaftPeerSet raftPeerSet) { + this.switchDomain = switchDomain; + this.distroMapper = distroMapper; + this.memberManager = memberManager; + this.pushService = pushService; + this.raftPeerSet = raftPeerSet; + } + @PostConstruct public void init() { diff --git a/naming/src/main/resources/META-INF/logback/naming-included.xml b/naming/src/main/resources/META-INF/logback/naming-included.xml index 9f0bfca22..81258d44f 100644 --- a/naming/src/main/resources/META-INF/logback/naming-included.xml +++ b/naming/src/main/resources/META-INF/logback/naming-included.xml @@ -187,6 +187,23 @@ + + ${LOG_HOME}/naming-distro.log + true + + ${LOG_HOME}/naming-distro.log.%d{yyyy-MM-dd}.%i + 1GB + 7 + 3GB + true + + + %date %level %msg%n%n + UTF-8 + + + @@ -195,6 +212,10 @@ + + + + diff --git a/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java index 3581ebcaf..485bd42fa 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java @@ -55,19 +55,13 @@ public class MemberLookup_ITCase extends BaseTest { static final String name = "cluster.conf"; - static final ServerMemberManager memberManager = new ServerMemberManager( - new MockServletContext()); + ServerMemberManager memberManager; @Before public void before() throws Exception { System.setProperty("nacos.home", path); ApplicationUtils.injectEnvironment(new StandardEnvironment()); ApplicationUtils.setIsStandalone(false); - try { - memberManager.init(); - } - catch (Throwable ignore) { - } System.out.println(ApplicationUtils.getStandaloneMode()); System.out.println(Arrays.toString(LookupFactory.LookupType.values())); @@ -78,6 +72,14 @@ public class MemberLookup_ITCase extends BaseTest { String ip = InetUtils.getSelfIp(); DiskUtils.writeFile(file, (ip + ":8848," + ip + ":8847," + ip + ":8849").getBytes( StandardCharsets.UTF_8), false); + + try { + memberManager = new ServerMemberManager( + new MockServletContext()); + } + catch (Exception e) { + e.printStackTrace(); + } } @After diff --git a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java index aa6bae3b9..068168379 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java @@ -48,13 +48,21 @@ import java.util.concurrent.atomic.AtomicInteger; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ServerMemberManager_ITCase { - private ServerMemberManager memberManager = new ServerMemberManager(new MockServletContext()); + private ServerMemberManager memberManager; + + { + try { + memberManager = new ServerMemberManager(new MockServletContext()); + } + catch (Exception e) { + e.printStackTrace(); + } + } @Before public void init() throws Exception { ApplicationUtils.setIsStandalone(true); ApplicationUtils.injectEnvironment(new StandardEnvironment()); - memberManager.init(); } @After From a7556fa7762a889863379cd8ca559813d9670205 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Mon, 1 Jun 2020 16:59:01 +0800 Subject: [PATCH 2/4] fix: fix old version information not available in new version reality issue --- .../core/cluster/MemberMetaDataConstants.java | 7 +- .../nacos/core/cluster/MemberUtils.java | 4 +- .../core/cluster/ServerMemberManager.java | 6 +- .../core/distributed/raft/JRaftProtocol.java | 2 +- distribution/logs/start.out | 2 - .../alibaba/nacos/naming/cluster/Server.java | 229 --------------- .../naming/cluster/ServerListManager.java | 275 +++++------------- .../nacos/naming/cluster/ServerUtils.java | 54 ---- .../controllers/OperatorController.java | 9 + .../nacos/naming/misc/GlobalExecutor.java | 6 +- 10 files changed, 87 insertions(+), 507 deletions(-) delete mode 100644 distribution/logs/start.out delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java index a0a077f49..0a6e4c000 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java @@ -25,19 +25,22 @@ public class MemberMetaDataConstants { /** * Raft port,This parameter is dropped when GRPC is used as a whole */ - public static final String RAFT_PORT = "raft_port"; + public static final String RAFT_PORT = "raftPort"; public static final String SITE_KEY = "site"; - public static final String AD_WEIGHT = "adweight"; + public static final String AD_WEIGHT = "adWeight"; public static final String WEIGHT = "weight"; + public static final String LAST_REFRESH_TIME = "lastRefreshTime"; + public static final String[] META_KEY_LIST = new String[]{ RAFT_PORT, SITE_KEY, AD_WEIGHT, WEIGHT, + LAST_REFRESH_TIME, }; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java index 191d8edfa..449f0d687 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java @@ -134,9 +134,7 @@ public class MemberUtils { @SuppressWarnings("PMD.UndefineMagicConstantRule") public static Collection kRandom(Collection members, - Predicate filter) { - int k = ApplicationUtils - .getProperty("nacos.core.member.report.random-num", Integer.class, 3); + Predicate filter, int k) { Set kMembers = new HashSet<>(); diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index 5f625960a..0f0263af7 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -199,11 +199,7 @@ public class ServerMemberManager Loggers.CLUSTER.debug("Node information update : {}", newMember); String address = newMember.getAddress(); - - if (Objects.equals(newMember, self)) { - serverList.put(newMember.getAddress(), newMember); - return true; - } + newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis()); if (!serverList.containsKey(address)) { return false; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java index 12ec66956..f2c3f0513 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java @@ -204,7 +204,7 @@ public class JRaftProtocol private void injectProtocolMetaData(ProtocolMetaData metaData) { Member member = memberManager.getSelf(); - member.setExtendVal("raft_meta_data", metaData); + member.setExtendVal("raftMetaData", metaData); memberManager.update(member); } diff --git a/distribution/logs/start.out b/distribution/logs/start.out deleted file mode 100644 index 015d9fdff..000000000 --- a/distribution/logs/start.out +++ /dev/null @@ -1,2 +0,0 @@ -/Users/liaochuntao/.sdkman/candidates/java/current/bin/java -DembeddedStorage=true -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Volumes/resources/github/nacos/distribution/logs/java_heapdump.hprof -XX:-UseLargePages -Dnacos.member.list=127.0.0.1:8080 -Djava.ext.dirs=/Users/liaochuntao/.sdkman/candidates/java/current/jre/lib/ext:/Users/liaochuntao/.sdkman/candidates/java/current/lib/ext -Xloggc:/Volumes/resources/github/nacos/distribution/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dloader.path=/Volumes/resources/github/nacos/distribution/plugins/health,/Volumes/resources/github/nacos/distribution/plugins/cmdb,/Volumes/resources/github/nacos/distribution/plugins/mysql -Dnacos.home=/Volumes/resources/github/nacos/distribution -jar /Volumes/resources/github/nacos/distribution/target/nacos-server.jar --spring.config.location=classpath:/,classpath:/config/,file:./,file:./config/,file:/Volumes/resources/github/nacos/distribution/conf/ --logging.config=/Volumes/resources/github/nacos/distribution/conf/nacos-logback.xml --server.max-http-header-size=524288 -Error: Unable to access jarfile /Volumes/resources/github/nacos/distribution/target/nacos-server.jar diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java deleted file mode 100644 index 89644c429..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/Server.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.cluster; - -import com.alibaba.nacos.common.utils.JacksonUtils; -import com.alibaba.nacos.naming.misc.UtilsAndCommons; - -/** - * Member node of Nacos cluster - * - * // TODO This object will be deleted sometime after version 1.3.0 - * - * @author nkorange - * @since 1.0.0 - * @deprecated 1.3.0 - */ -public class Server implements Comparable { - - /** - * IP of member - */ - private String ip; - - /** - * serving port of member. - */ - private int servePort; - - private String site = UtilsAndCommons.UNKNOWN_SITE; - - private int weight = 1; - - /** - * additional weight, used to adjust manually - */ - private int adWeight; - - private boolean alive = false; - - private long lastRefTime = 0L; - - private String lastRefTimeStr; - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public int getServePort() { - return servePort; - } - - public void setServePort(int servePort) { - this.servePort = servePort; - } - - public String getSite() { - return site; - } - - public void setSite(String site) { - this.site = site; - } - - public int getWeight() { - return weight; - } - - public void setWeight(int weight) { - this.weight = weight; - } - - public int getAdWeight() { - return adWeight; - } - - public void setAdWeight(int adWeight) { - this.adWeight = adWeight; - } - - public boolean isAlive() { - return alive; - } - - public void setAlive(boolean alive) { - this.alive = alive; - } - - public long getLastRefTime() { - return lastRefTime; - } - - public void setLastRefTime(long lastRefTime) { - this.lastRefTime = lastRefTime; - } - - public String getLastRefTimeStr() { - return lastRefTimeStr; - } - - public void setLastRefTimeStr(String lastRefTimeStr) { - this.lastRefTimeStr = lastRefTimeStr; - } - - public String getKey() { - return ip + UtilsAndCommons.IP_PORT_SPLITER + servePort; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Server server = (Server) o; - return servePort == server.servePort && ip.equals(server.ip); - } - - @Override - public int hashCode() { - int result = ip.hashCode(); - result = 31 * result + servePort; - return result; - } - - @Override - public String toString() { - return JacksonUtils.toJson(this); - } - - @Override - public int compareTo(Server server) { - if (server == null) { - return 1; - } - return this.getKey().compareTo(server.getKey()); - } - - public static ServerBuilder builder() { - return new ServerBuilder(); - } - - public static final class ServerBuilder { - private String ip; - private int servePort; - private String site = UtilsAndCommons.UNKNOWN_SITE; - private int weight = 1; - private int adWeight; - private boolean alive = false; - private long lastRefTime = 0L; - private String lastRefTimeStr; - - private ServerBuilder() { - } - - public ServerBuilder ip(String ip) { - this.ip = ip; - return this; - } - - public ServerBuilder servePort(int servePort) { - this.servePort = servePort; - return this; - } - - public ServerBuilder site(String site) { - this.site = site; - return this; - } - - public ServerBuilder weight(int weight) { - this.weight = weight; - return this; - } - - public ServerBuilder adWeight(int adWeight) { - this.adWeight = adWeight; - return this; - } - - public ServerBuilder alive(boolean alive) { - this.alive = alive; - return this; - } - - public ServerBuilder lastRefTime(long lastRefTime) { - this.lastRefTime = lastRefTime; - return this; - } - - public ServerBuilder lastRefTimeStr(String lastRefTimeStr) { - this.lastRefTimeStr = lastRefTimeStr; - return this; - } - - public Server build() { - Server server = new Server(); - server.setIp(ip); - server.setServePort(servePort); - server.setSite(site); - server.setWeight(weight); - server.setAdWeight(adWeight); - server.setAlive(alive); - server.setLastRefTime(lastRefTime); - server.setLastRefTimeStr(lastRefTimeStr); - return server; - } - } -} \ No newline at end of file diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java index 40a26271a..f3c19cd40 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -17,34 +17,33 @@ package com.alibaba.nacos.naming.cluster; import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberChangeEvent; import com.alibaba.nacos.core.cluster.MemberChangeListener; +import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; +import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Message; import com.alibaba.nacos.naming.misc.NamingProxy; -import com.alibaba.nacos.naming.misc.NetUtils; import com.alibaba.nacos.naming.misc.ServerStatusSynchronizer; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.misc.Synchronizer; import com.alibaba.nacos.naming.misc.UtilsAndCommons; -import org.apache.commons.collections.CollectionUtils; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; /** * The manager to globally refresh and operate server list. @@ -58,55 +57,46 @@ import java.util.concurrent.ConcurrentHashMap; @Component("serverListManager") public class ServerListManager implements MemberChangeListener { - private static final int STABLE_PERIOD = 60 * 1000; + private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; private final SwitchDomain switchDomain; private final ServerMemberManager memberManager; + private final Synchronizer synchronizer = new ServerStatusSynchronizer(); - private volatile List servers; - - private volatile List healthyServers = Collections.emptyList(); - - private Map> distroConfig = new ConcurrentHashMap<>(16); - - private Map distroBeats = new ConcurrentHashMap<>(16); - - private Set liveSites = new HashSet<>(); - - private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; - - private long lastHealthServerMillis = 0L; - - private boolean autoDisabledHealthCheck = false; - - private Synchronizer synchronizer = new ServerStatusSynchronizer(); + private volatile List servers; public ServerListManager(final SwitchDomain switchDomain, final ServerMemberManager memberManager) { this.switchDomain = switchDomain; this.memberManager = memberManager; NotifyCenter.registerSubscribe(this); - this.servers = ServerUtils.toServers(memberManager.allMembers()); + this.servers = new ArrayList<>(memberManager.allMembers()); } @PostConstruct public void init() { GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000); + GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater()); } public boolean contains(String s) { - for (Server server : getServers()) { - if (server.getKey().equals(s)) { + for (Member server : getServers()) { + if (Objects.equals(s, server.getAddress())) { return true; } } return false; } - public List getServers() { + public List getServers() { return servers; } + @Override + public void onEvent(MemberChangeEvent event) { + this.servers = new ArrayList<>(event.getMembers()); + } + public synchronized void onReceiveServerStatus(String configInfo) { Loggers.SRV_LOG.info("receive config info: {}", configInfo); @@ -116,11 +106,7 @@ public class ServerListManager implements MemberChangeListener { return; } - List newHealthyList = new ArrayList<>(); - List tmpServerList = new ArrayList<>(); - for (String config : configs) { - tmpServerList.clear(); // site:ip:lastReportTime:weight String[] params = config.split("#"); if (params.length <= 3) { @@ -128,103 +114,56 @@ public class ServerListManager implements MemberChangeListener { continue; } - Server server = new Server(); + Member server = Member.builder() + .ip(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]) + .state(NodeState.UP) + .port(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1])) + .build(); - server.setSite(params[0]); - server.setIp(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]); - server.setServePort(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1])); - server.setLastRefTime(Long.parseLong(params[2])); + server.setExtendVal(MemberMetaDataConstants.SITE_KEY, params[0]); + server.setExtendVal(MemberMetaDataConstants.WEIGHT, params.length == 4 ? Integer.parseInt(params[3]) : 1); + memberManager.update(server); - if (!contains(server.getKey())) { - throw new IllegalArgumentException("server: " + server.getKey() + " is not in serverlist"); + if (!contains(server.getAddress())) { + throw new IllegalArgumentException("server: " + server.getAddress() + " is not in serverlist"); + } + } + } + + private class ServerInfoUpdater implements Runnable { + + private int cursor = 0; + + @Override + public void run() { + List members = servers; + if (members.isEmpty()) { + return; } - Long lastBeat = distroBeats.get(server.getKey()); - long now = System.currentTimeMillis(); - if (null != lastBeat) { - server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); + this.cursor = (this.cursor + 1) % members.size(); + Member target = members.get(cursor); + String path = UtilsAndCommons.NACOS_NAMING_OPERATOR_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT + "/state"; + Map params = Maps.newHashMapWithExpectedSize(2); + String server = target.getAddress(); + if (Objects.equals(target.getAddress(), ApplicationUtils.getLocalAddress())) { + server = UtilsAndCommons.LOCAL_HOST_IP + ":" + target.getPort(); } - distroBeats.put(server.getKey(), now); - - Date date = new Date(Long.parseLong(params[2])); - server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)); - - server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1); - List list = distroConfig.get(server.getSite()); - if (list == null || list.size() <= 0) { - list = new ArrayList<>(); - list.add(server); - distroConfig.put(server.getSite(), list); - } - - for (Server s : list) { - String serverId = s.getKey() + "_" + s.getSite(); - String newServerId = server.getKey() + "_" + server.getSite(); - - if (serverId.equals(newServerId)) { - if (s.isAlive() != server.isAlive() || s.getWeight() != server.getWeight()) { - Loggers.SRV_LOG.warn("server beat out of date, current: {}, last: {}", - JacksonUtils.toJson(server), JacksonUtils.toJson(s)); + try { + String content = NamingProxy.reqCommon(path, params, server, false); + if (!StringUtils.EMPTY.equals(content)) { + RaftPeer raftPeer = JacksonUtils.toObj(content, RaftPeer.class); + if (null != raftPeer) { + String json = JacksonUtils.toJson(raftPeer); + Map map = JacksonUtils.toObj(json, HashMap.class); + target.setExtendVal("naming", map); + memberManager.update(target); } - tmpServerList.add(server); - continue; } - tmpServerList.add(s); + } catch (Exception ignore) { + // } - - if (!tmpServerList.contains(server)) { - tmpServerList.add(server); - } - - distroConfig.put(server.getSite(), tmpServerList); } - liveSites.addAll(distroConfig.keySet()); - } - - public void clean() { - cleanInvalidServers(); - - for (Map.Entry> entry : distroConfig.entrySet()) { - for (Server server : entry.getValue()) { - //request other server to clean invalid servers - if (!server.getKey().equals(NetUtils.localServer())) { - requestOtherServerCleanInvalidServers(server.getKey()); - } - } - - } - } - - public Set getLiveSites() { - return liveSites; - } - - private void cleanInvalidServers() { - for (Map.Entry> entry : distroConfig.entrySet()) { - List currentServers = entry.getValue(); - if (null == currentServers) { - distroConfig.remove(entry.getKey()); - continue; - } - - currentServers.removeIf(server -> !server.isAlive()); - } - } - - private void requestOtherServerCleanInvalidServers(String serverIP) { - Map params = new HashMap(1); - - params.put("action", "without-diamond-clean"); - try { - NamingProxy.reqAPI("distroStatus", params, serverIP, false); - } catch (Exception e) { - Loggers.SRV_LOG.warn("[DISTRO-STATUS-CLEAN] Failed to request to clean server status to " + serverIP, e); - } - } - - @Override - public void onEvent(MemberChangeEvent event) { - this.servers = ServerUtils.toServers(memberManager.allMembers()); } private class ServerStatusReporter implements Runnable { @@ -237,37 +176,31 @@ public class ServerListManager implements MemberChangeListener { return; } - checkDistroHeartbeat(); - int weight = Runtime.getRuntime().availableProcessors() / 2; if (weight <= 0) { weight = 1; } long curTime = System.currentTimeMillis(); - String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n"; + String status = LOCALHOST_SITE + "#" + ApplicationUtils.getLocalAddress() + "#" + curTime + "#" + weight + "\r\n"; - //send status to itself - onReceiveServerStatus(status); + List allServers = getServers(); - List allServers = getServers(); - - if (!contains(NetUtils.localServer())) { - Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers); + if (!contains(ApplicationUtils.getLocalAddress())) { + Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", ApplicationUtils.getLocalAddress(), allServers); return; } if (allServers.size() > 0 && !ApplicationUtils.getLocalAddress().contains(UtilsAndCommons.LOCAL_HOST_IP)) { - for (Server server : allServers) { - if (server.getKey().equals(ApplicationUtils.getLocalAddress())) { + for (Member server : allServers) { + if (Objects.equals(server.getAddress(), ApplicationUtils.getLocalAddress())) { continue; } Message msg = new Message(); msg.setData(status); - synchronizer.send(server.getKey(), msg); - + synchronizer.send(server.getAddress(), msg); } } } catch (Exception e) { @@ -279,76 +212,4 @@ public class ServerListManager implements MemberChangeListener { } } - private void checkDistroHeartbeat() { - - Loggers.SRV_LOG.debug("check distro heartbeat."); - - List servers = distroConfig.get(LOCALHOST_SITE); - if (CollectionUtils.isEmpty(servers)) { - return; - } - - List newHealthyList = new ArrayList<>(servers.size()); - long now = System.currentTimeMillis(); - for (Server s: servers) { - Long lastBeat = distroBeats.get(s.getKey()); - if (null == lastBeat) { - continue; - } - s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); - } - - //local site servers - List allLocalSiteSrvs = new ArrayList<>(); - for (Server server : servers) { - - if (server.getKey().endsWith(":0")) { - continue; - } - - server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey())); - - for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) { - - if (!allLocalSiteSrvs.contains(server.getKey())) { - allLocalSiteSrvs.add(server.getKey()); - } - - if (server.isAlive() && !newHealthyList.contains(server)) { - newHealthyList.add(server); - } - } - } - - Collections.sort(newHealthyList); - float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size(); - - if (autoDisabledHealthCheck - && curRatio > switchDomain.getDistroThreshold() - && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) { - Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " + - "stable now, enable health check. current ratio: {}", curRatio); - - switchDomain.setHealthCheckEnabled(true); - - // we must set this variable, otherwise it will conflict with user's action - autoDisabledHealthCheck = false; - } - - if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) { - // for every change disable healthy check for some while - Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, old: {}, new: {}", - healthyServers, newHealthyList); - if (switchDomain.isHealthCheckEnabled() && switchDomain.isAutoChangeHealthCheckEnabled()) { - Loggers.SRV_LOG.info("[NACOS-DISTRO] disable health check for {} ms from now on.", STABLE_PERIOD); - - switchDomain.setHealthCheckEnabled(false); - autoDisabledHealthCheck = true; - - lastHealthServerMillis = System.currentTimeMillis(); - } - - healthyServers = newHealthyList; - } - } } \ No newline at end of file diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java deleted file mode 100644 index f65839dc2..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.cluster; - -import com.alibaba.nacos.core.cluster.Member; - -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @author liaochuntao - */ -public final class ServerUtils { - - public static Server memberToServer(Member member) { - return Server.builder() - .ip(member.getIp()) - .servePort(member.getPort()) - .alive(true) - .lastRefTime(System.currentTimeMillis()) - .build(); - } - - public static Member serverToMember(Server server) { - return Member.builder() - .ip(server.getIp()) - .port(server.getServePort()) - .build(); - } - - public static List toMembers(Collection servers) { - return servers.stream().map(ServerUtils::serverToMember).collect(Collectors.toList()); - } - - public static List toServers(Collection members) { - return members.stream().map(ServerUtils::memberToServer).collect(Collectors.toList()); - } - -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java index 4403df97b..f5b6055d9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java @@ -222,6 +222,15 @@ public class OperatorController { return "ok"; } + /** + * This interface will be removed in a future release + * + * // TODO This object will be deleted sometime after version 1.3.0 + * + * @deprecated 1.3.0 + * @return {@link JsonNode} + */ + @Deprecated @RequestMapping(value = "/cluster/state", method = RequestMethod.GET) public JsonNode getClusterStates() { return JacksonUtils.transferToJsonNode(serviceManager.getMySelfClusterState()); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java index a87112906..32411ee73 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java @@ -30,8 +30,6 @@ public class GlobalExecutor { public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L); - private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5); - private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5); private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5); @@ -150,8 +148,8 @@ public class GlobalExecutor { executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS); } - public static void registerServerListUpdater(Runnable runnable) { - executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS); + public static void registerServerInfoUpdater(Runnable runnable) { + executorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS); } public static void registerServerStatusReporter(Runnable runnable, long delay) { From 31694246058ef6c226cba4bf7d39ad3fc4a65362 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Mon, 1 Jun 2020 21:54:29 +0800 Subject: [PATCH 3/4] refactor: optimize old version information synchronization logic --- .../nacos/common/http/BaseHttpClient.java | 1 + .../nacos/common/utils/VersionUtils.java | 4 +-- .../src/main/resources/application.properties | 2 +- .../core/cluster/MemberMetaDataConstants.java | 3 ++ .../nacos/core/cluster/MemberUtils.java | 2 ++ .../core/cluster/ServerMemberManager.java | 14 ++++++++- .../nacos/core/notify/NotifyCenter.java | 4 +-- .../naming/cluster/ServerListManager.java | 30 +++++++++++++++---- .../naming/misc/ServerStatusSynchronizer.java | 1 - .../test/core/notify/NotifyCenter_ITCase.java | 2 +- 10 files changed, 49 insertions(+), 14 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/http/BaseHttpClient.java b/common/src/main/java/com/alibaba/nacos/common/http/BaseHttpClient.java index 157adc690..ad3595e2f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/http/BaseHttpClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/http/BaseHttpClient.java @@ -65,6 +65,7 @@ public abstract class BaseHttpClient { try { final String body = EntityUtils.toString(response.getEntity()); RestResult data = ResponseHandler.convert(body, type); + data.setCode(response.getStatusLine().getStatusCode()); callback.onReceive(data); } catch (Throwable e) { diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/VersionUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/VersionUtils.java index 4fd9ef909..6dbe00987 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/VersionUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/VersionUtils.java @@ -28,7 +28,7 @@ public class VersionUtils { /** * 获取当前version */ - public static final String VERSION_DEFAULT = "${project.version}"; + public static final String VERSION_PLACEHOLDER = "${project.version}"; static { @@ -39,7 +39,7 @@ public class VersionUtils { Properties props = new Properties(); props.load(in); String val = props.getProperty("version"); - if (val != null && !VERSION_DEFAULT.equals(val)) { + if (val != null && !VERSION_PLACEHOLDER.equals(val)) { VERSION = val; } } catch (Exception e) { diff --git a/console/src/main/resources/application.properties b/console/src/main/resources/application.properties index 3883dbe39..4f011b693 100644 --- a/console/src/main/resources/application.properties +++ b/console/src/main/resources/application.properties @@ -132,7 +132,7 @@ nacos.istio.mcp.server.enabled=false ### MemberLookup ### Addressing pattern category, If set, the priority is highest -# nacos.core.member.lookup.type=[file,address-server,discovery] +# nacos.core.member.lookup.type=[file,address-server] ## Set the cluster list with a configuration file or command-line argument # nacos.member.list=192.168.16.101:8847?raft_port=8807,192.168.16.101?raft_port=8808,192.168.16.101:8849?raft_port=8809 ## for AddressServerMemberLookup diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java index 0a6e4c000..c15f1c6e6 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java @@ -35,12 +35,15 @@ public class MemberMetaDataConstants { public static final String LAST_REFRESH_TIME = "lastRefreshTime"; + public static final String VERSION = "version"; + public static final String[] META_KEY_LIST = new String[]{ RAFT_PORT, SITE_KEY, AD_WEIGHT, WEIGHT, LAST_REFRESH_TIME, + VERSION, }; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java index 449f0d687..abb515594 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.core.cluster; import com.alibaba.nacos.common.utils.ExceptionUtil; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Loggers; import java.util.concurrent.ThreadLocalRandom; @@ -74,6 +75,7 @@ public class MemberUtils { Map extendInfo = new HashMap<>(4); // The Raft Port information needs to be set by default extendInfo.put(MemberMetaDataConstants.RAFT_PORT, String.valueOf(calculateRaftPort(target))); + extendInfo.put(MemberMetaDataConstants.VERSION, VersionUtils.VERSION); target.setExtendInfo(extendInfo); return target; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index 0f0263af7..b9905e114 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -27,12 +27,14 @@ import com.alibaba.nacos.common.http.param.Query; import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ExceptionUtil; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.cluster.lookup.LookupFactory; import com.alibaba.nacos.core.notify.Event; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.notify.listener.Subscribe; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Commons; +import com.alibaba.nacos.core.utils.Constants; import com.alibaba.nacos.core.utils.GenericType; import com.alibaba.nacos.core.utils.GlobalExecutor; import com.alibaba.nacos.core.utils.InetUtils; @@ -40,6 +42,7 @@ import com.alibaba.nacos.core.utils.Loggers; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.web.context.WebServerInitializedEvent; import org.springframework.context.ApplicationListener; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -235,6 +238,10 @@ public class ServerMemberManager return this.self; } + public Member find(String address) { + return serverList.get(address); + } + public Collection allMembers() { // We need to do a copy to avoid affecting the real data HashSet set = new HashSet<>(serverList.values()); @@ -405,10 +412,15 @@ public class ServerMemberManager "/cluster/report"); try { - asyncHttpClient.post(url, Header.EMPTY, Query.EMPTY, getSelf(), + asyncHttpClient.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, + VersionUtils.VERSION), Query.EMPTY, getSelf(), reference.getType(), new Callback() { @Override public void onReceive(RestResult result) { + if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) { + Loggers.CLUSTER.warn("{} version is too low, it is recommended to upgrade the version : {}", target, VersionUtils.VERSION); + return; + } if (result.ok()) { MemberUtils.onSuccess(target); } diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java index d74a383a7..6c45e7980 100644 --- a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java @@ -67,11 +67,11 @@ public class NotifyCenter { static { // Internal ArrayBlockingQueue buffer size. For applications with high write throughput, // this value needs to be increased appropriately. default value is 16384 - String ringBufferSizeProperty = "com.alibaba.nacos.core.notify.ringBufferSize"; + String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size"; RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384); // The size of the public publisher's message staging queue buffer - String shareBufferSizeProperty = "com.alibaba.nacos.core.notify.shareBufferSize"; + String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size"; SHATE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024); ServiceLoader loader = ServiceLoader.load(EventPublisher.class); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java index f3c19cd40..080833539 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * The manager to globally refresh and operate server list. @@ -97,6 +98,11 @@ public class ServerListManager implements MemberChangeListener { this.servers = new ArrayList<>(event.getMembers()); } + /** + * Compatible with older version logic, In version 1.2.1 and before + * + * @param configInfo site:ip:lastReportTime:weight + */ public synchronized void onReceiveServerStatus(String configInfo) { Loggers.SRV_LOG.info("receive config info: {}", configInfo); @@ -114,11 +120,11 @@ public class ServerListManager implements MemberChangeListener { continue; } - Member server = Member.builder() + Member server = Optional.ofNullable(memberManager.find(params[1])).orElse(Member.builder() .ip(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]) .state(NodeState.UP) .port(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1])) - .build(); + .build()); server.setExtendVal(MemberMetaDataConstants.SITE_KEY, params[0]); server.setExtendVal(MemberMetaDataConstants.WEIGHT, params.length == 4 ? Integer.parseInt(params[3]) : 1); @@ -143,12 +149,19 @@ public class ServerListManager implements MemberChangeListener { this.cursor = (this.cursor + 1) % members.size(); Member target = members.get(cursor); - String path = UtilsAndCommons.NACOS_NAMING_OPERATOR_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT + "/state"; - Map params = Maps.newHashMapWithExpectedSize(2); - String server = target.getAddress(); if (Objects.equals(target.getAddress(), ApplicationUtils.getLocalAddress())) { - server = UtilsAndCommons.LOCAL_HOST_IP + ":" + target.getPort(); + return; } + + // This metadata information exists from 1.3.0 onwards "version" + if (target.getExtendVal(MemberMetaDataConstants.VERSION) != null) { + return; + } + + final String path = UtilsAndCommons.NACOS_NAMING_OPERATOR_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT + "/state"; + final Map params = Maps.newHashMapWithExpectedSize(2); + final String server = target.getAddress(); + try { String content = NamingProxy.reqCommon(path, params, server, false); if (!StringUtils.EMPTY.equals(content)) { @@ -197,6 +210,11 @@ public class ServerListManager implements MemberChangeListener { continue; } + // This metadata information exists from 1.3.0 onwards "version" + if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) { + return; + } + Message msg = new Message(); msg.setData(status); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java index 13a82dba5..5e93881b3 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java @@ -55,7 +55,6 @@ public class ServerStatusSynchronizer implements Synchronizer { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP); - return 1; } return 0; diff --git a/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java index 18ed6826a..fb49b4d96 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java @@ -47,7 +47,7 @@ public class NotifyCenter_ITCase { } static { - System.setProperty("com.alibaba.nacos.core.notify.shareBufferSize", "8"); + System.setProperty("nacos.core.notify.share-buffer-size", "8"); } @Test From 01bb02b943ce75441d57ab8f72fe2bbca5ead868 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 2 Jun 2020 10:32:18 +0800 Subject: [PATCH 4/4] refactor: long volume, annotation optimization --- .../java/com/alibaba/nacos/common/utils/ByteUtils.java | 5 +++-- .../java/com/alibaba/nacos/common/utils/StringUtils.java | 4 +++- .../alibaba/nacos/naming/cluster/ServerListManager.java | 4 +--- .../nacos/naming/controllers/OperatorController.java | 8 ++------ .../nacos/naming/misc/ServerStatusSynchronizer.java | 1 + 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java index e8ad94738..343354aac 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ByteUtils.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.common.utils; import java.nio.charset.Charset; +import com.alibaba.nacos.api.common.Constants; import org.apache.commons.lang3.StringUtils; /** @@ -32,7 +33,7 @@ public final class ByteUtils { if (s == null) { return EMPTY; } - return s.getBytes(Charset.forName("UTF-8")); + return s.getBytes(Charset.forName(Constants.ENCODE)); } public static byte[] toBytes(Object s) { @@ -46,7 +47,7 @@ public final class ByteUtils { if (bytes == null) { return StringUtils.EMPTY; } - return new String(bytes, Charset.forName("UTF-8")); + return new String(bytes, Charset.forName(Constants.ENCODE)); } public static boolean isEmpty(byte[] data) { diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java index a0967877a..94f50ad02 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/StringUtils.java @@ -15,6 +15,8 @@ */ package com.alibaba.nacos.common.utils; +import com.alibaba.nacos.api.common.Constants; + import java.io.IOException; import java.io.StringWriter; import java.io.Writer; @@ -38,7 +40,7 @@ public class StringUtils { public static final String EMPTY = ""; public static String newString4UTF8(byte[] bytes) { - return new String(bytes, Charset.forName("UTF-8")); + return new String(bytes, Charset.forName(Constants.ENCODE)); } public static boolean isBlank(String str) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java index 080833539..ed1adf622 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -49,11 +49,9 @@ import java.util.Optional; /** * The manager to globally refresh and operate server list. * - * // TODO This object will be deleted sometime after version 1.3.0 - * * @author nkorange * @since 1.0.0 - * @deprecated 1.3.0 + * @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0 */ @Component("serverListManager") public class ServerListManager implements MemberChangeListener { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java index f5b6055d9..09af7fa68 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java @@ -203,9 +203,7 @@ public class OperatorController { /** * This interface will be removed in a future release * - * // TODO This object will be deleted sometime after version 1.3.0 - * - * @deprecated 1.3.0 + * @deprecated 1.3.0 This function will be deleted sometime after version 1.3.0 * @param serverStatus server status * @return "ok" */ @@ -225,9 +223,7 @@ public class OperatorController { /** * This interface will be removed in a future release * - * // TODO This object will be deleted sometime after version 1.3.0 - * - * @deprecated 1.3.0 + * @deprecated 1.3.0 This function will be deleted sometime after version 1.3.0 * @return {@link JsonNode} */ @Deprecated diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java index 5e93881b3..e5708d357 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/ServerStatusSynchronizer.java @@ -27,6 +27,7 @@ import java.util.Map; /** * Report local server status to other server * + * @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0 * @author nacos */ public class ServerStatusSynchronizer implements Synchronizer {