From 2526c5d8f253909880d7680c794ba18b9ddad65f Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Sun, 3 May 2020 20:26:35 +0800 Subject: [PATCH] fix: fix naming request redirect --- .gitignore | 1 + .../nacos/api/naming/pojo/Instance.java | 3 +- .../nacos/common/http/HttpClientManager.java | 119 +-- .../nacos/common/model/RestResultUtils.java | 19 +- .../config/server/filter/NacosWebFilter.java | 10 +- .../server/filter/TransferToLeaderFilter.java | 5 +- .../config/server/utils/PropertyUtil.java | 19 +- .../src/main/resources/application.properties | 2 +- .../alibaba/nacos/core/cluster/Member.java | 7 +- .../nacos/core/cluster/MemberChangeEvent.java | 14 +- .../core/cluster/ServerMemberManager.java | 31 +- .../lookup/AddressServerMemberLookup.java | 2 +- .../cluster/lookup/DiscoveryMemberLookup.java | 10 +- .../controller/NacosClusterController.java | 21 +- .../core/distributed/ProtocolManager.java | 2 +- .../nacos/core/notify/NotifyCenter.java | 26 +- .../com/alibaba/nacos/core/utils/Loggers.java | 2 + .../nacos/core}/utils/TimerContext.java | 28 +- .../main/resources/META-INF/logback/nacos.xml | 22 + distribution/conf/application.properties | 2 +- distribution/conf/nacos-logback.xml | 22 + .../distro/DistroConsistencyServiceImpl.java | 759 +++++++++--------- .../persistent/raft/RaftPeerSet.java | 21 +- .../controllers/InstanceController.java | 77 +- .../nacos/naming/core/DistroMapper.java | 54 +- .../alibaba/nacos/naming/misc/NetUtils.java | 13 +- .../nacos/naming/web/DistroFilter.java | 59 +- pom.xml | 25 + ...Case.java => ConfigDerbyRaft_DITCase.java} | 197 +---- .../nacos/test/core/BaseClusterTest.java | 236 ++++++ .../cluster/ServerMemberManager_ITCase.java | 31 + .../test/core/notify/NotifyCenter_ITCase.java | 3 +- .../nacos/test/naming/NamingRaft_DITCase.java | 68 ++ 33 files changed, 1064 insertions(+), 846 deletions(-) rename {common/src/main/java/com/alibaba/nacos/common => core/src/main/java/com/alibaba/nacos/core}/utils/TimerContext.java (63%) rename test/src/test/java/com/alibaba/nacos/test/config/{ConfigDerbyRaft_ITCase.java => ConfigDerbyRaft_DITCase.java} (69%) create mode 100644 test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java create mode 100644 test/src/test/java/com/alibaba/nacos/test/naming/NamingRaft_DITCase.java diff --git a/.gitignore b/.gitignore index beab5c80f..00989f876 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ target .DS_Store .factorypath /logs +/lib *.iml node_modules test/derby.log diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java b/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java index fb57b6396..ba6c7dd2c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java @@ -187,8 +187,7 @@ public class Instance { } Instance host = (Instance) obj; - - return strEquals(toString(), host.toString()); + return strEquals(host.toString(), toString()); } @Override diff --git a/common/src/main/java/com/alibaba/nacos/common/http/HttpClientManager.java b/common/src/main/java/com/alibaba/nacos/common/http/HttpClientManager.java index 93cdaa43b..317249fa6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/http/HttpClientManager.java +++ b/common/src/main/java/com/alibaba/nacos/common/http/HttpClientManager.java @@ -23,9 +23,6 @@ import org.apache.http.impl.nio.client.HttpAsyncClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - /** * Use the same HttpClient object in the same space * @@ -41,131 +38,35 @@ public class HttpClientManager { private static final RequestConfig DEFAULT_CONFIG = RequestConfig.custom() .setConnectTimeout(TIMEOUT).setSocketTimeout(TIMEOUT << 1).build(); - private static final Object SYNC_MONITOR = new Object(); - - private static final Object ASYNC_MONITOR = new Object(); - - private static final Map HTTP_SYNC_CLIENT_MAP = new HashMap( - 8); - - private static final Map HTTP_ASYNC_CLIENT_MAP = new HashMap( - 8); - - private static final NSyncHttpClient SHARE_SYNC_HTTP_CLIENT = new NacosSyncHttpClient( + private static final NSyncHttpClient SYNC_HTTP_CLIENT = new NacosSyncHttpClient( HttpClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build()); - private static final NAsyncHttpClient SHARE_ASYNC_HTTP_CLIENT = new NacosAsyncHttpClient( + private static final NAsyncHttpClient ASYNC_HTTP_CLIENT = new NacosAsyncHttpClient( HttpAsyncClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build()); static { ShutdownUtils.addShutdownHook(new Runnable() { @Override public void run() { - logger.warn("[NSyncHttpClient] Start destroying HttpClient"); + logger.warn("[HttpClientManager] Start destroying HttpClient"); try { - for (Map.Entry entry : HTTP_SYNC_CLIENT_MAP - .entrySet()) { - entry.getValue().close(); - } - SHARE_SYNC_HTTP_CLIENT.close(); - HTTP_SYNC_CLIENT_MAP.clear(); + SYNC_HTTP_CLIENT.close(); + ASYNC_HTTP_CLIENT.close(); } catch (Exception ignore) { } - logger.warn("[NSyncHttpClient] Destruction of the end"); - } - }); - - ShutdownUtils.addShutdownHook(new Runnable() { - @Override - public void run() { - logger.warn("[NAsyncHttpClient] Start destroying HttpClient"); - try { - for (Map.Entry entry : HTTP_ASYNC_CLIENT_MAP - .entrySet()) { - entry.getValue().close(); - } - SHARE_ASYNC_HTTP_CLIENT.close(); - HTTP_ASYNC_CLIENT_MAP.clear(); - } - catch (Exception ignore) { - } - logger.warn("[NAsyncHttpClient] Destruction of the end"); + logger.warn("[HttpClientManager] Destruction of the end"); } }); } - public static NSyncHttpClient getShareSyncHttpClient() { - return SHARE_SYNC_HTTP_CLIENT; + public static NSyncHttpClient getSyncHttpClient() { + return SYNC_HTTP_CLIENT; } - public static NAsyncHttpClient getShareAsyncHttpClient() { - return SHARE_ASYNC_HTTP_CLIENT; - } - - public static NSyncHttpClient newSyncHttpClient(String namespace) { - synchronized (SYNC_MONITOR) { - NSyncHttpClient nSyncHttpClient = HTTP_SYNC_CLIENT_MAP.get(namespace); - - if (nSyncHttpClient != null) { - return nSyncHttpClient; - } - - nSyncHttpClient = new NacosSyncHttpClient( - HttpClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build()); - HTTP_SYNC_CLIENT_MAP.put(namespace, nSyncHttpClient); - return nSyncHttpClient; - } - } - - public static NAsyncHttpClient newAsyncHttpClient(String namespace) { - synchronized (ASYNC_MONITOR) { - NAsyncHttpClient nAsyncHttpClient = HTTP_ASYNC_CLIENT_MAP.get(namespace); - - if (nAsyncHttpClient != null) { - return nAsyncHttpClient; - } - - nAsyncHttpClient = new NacosAsyncHttpClient( - HttpAsyncClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG) - .build()); - HTTP_ASYNC_CLIENT_MAP.put(namespace, nAsyncHttpClient); - return nAsyncHttpClient; - } - } - - public static NSyncHttpClient newSyncHttpClient(String namespace, - RequestConfig requestConfig) { - synchronized (SYNC_MONITOR) { - NSyncHttpClient nSyncHttpClient = HTTP_SYNC_CLIENT_MAP.get(namespace); - - if (nSyncHttpClient != null) { - return nSyncHttpClient; - } - - nSyncHttpClient = new NacosSyncHttpClient( - HttpClients.custom().setDefaultRequestConfig(requestConfig).build()); - HTTP_SYNC_CLIENT_MAP.put(namespace, nSyncHttpClient); - return nSyncHttpClient; - } - } - - public static NAsyncHttpClient newAsyncHttpClient(String namespace, - RequestConfig requestConfig) { - synchronized (ASYNC_MONITOR) { - NAsyncHttpClient nAsyncHttpClient = HTTP_ASYNC_CLIENT_MAP.get(namespace); - - if (nAsyncHttpClient != null) { - return nAsyncHttpClient; - } - - nAsyncHttpClient = new NacosAsyncHttpClient( - HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig) - .build()); - HTTP_ASYNC_CLIENT_MAP.put(namespace, nAsyncHttpClient); - return nAsyncHttpClient; - } + public static NAsyncHttpClient getAsyncHttpClient() { + return ASYNC_HTTP_CLIENT; } } diff --git a/common/src/main/java/com/alibaba/nacos/common/model/RestResultUtils.java b/common/src/main/java/com/alibaba/nacos/common/model/RestResultUtils.java index 14e0861d9..f75a13b7c 100644 --- a/common/src/main/java/com/alibaba/nacos/common/model/RestResultUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/model/RestResultUtils.java @@ -41,6 +41,12 @@ public class RestResultUtils { .build(); } + public static RestResult failed() { + return RestResult.builder() + .withCode(500) + .build(); + } + public static RestResult failed(String errMsg) { return RestResult.builder() .withCode(500) @@ -48,12 +54,6 @@ public class RestResultUtils { .build(); } - public static RestResult failed() { - return RestResult.builder() - .withCode(500) - .build(); - } - public static RestResult failed(int code, T data) { return RestResult.builder() .withCode(code) @@ -69,6 +69,13 @@ public class RestResultUtils { .build(); } + public static RestResult failedWithMsg(int code, String errMsg) { + return RestResult.builder() + .withCode(code) + .withMsg(errMsg) + .build(); + } + public static RestResult failedWithData(T data) { return RestResult.builder() .withCode(500) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/filter/NacosWebFilter.java b/config/src/main/java/com/alibaba/nacos/config/server/filter/NacosWebFilter.java index 8c91f28e0..42d8b880c 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/filter/NacosWebFilter.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/filter/NacosWebFilter.java @@ -16,10 +16,14 @@ package com.alibaba.nacos.config.server.filter; import com.alibaba.nacos.config.server.constant.Constants; -import org.springframework.core.annotation.Order; -import javax.servlet.*; -import javax.servlet.annotation.WebFilter; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; import java.io.IOException; import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/filter/TransferToLeaderFilter.java b/config/src/main/java/com/alibaba/nacos/config/server/filter/TransferToLeaderFilter.java index 5ceb9ddde..0f664b8f1 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/filter/TransferToLeaderFilter.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/filter/TransferToLeaderFilter.java @@ -102,6 +102,8 @@ public class TransferToLeaderFilter implements Filter { ReuseHttpRequest req = null; HttpServletResponse resp = (HttpServletResponse) response; + String urlString = ((HttpServletRequest) request).getRequestURI(); + if (StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)) { req = new ReuseUploadFileHttpServletRequest((HttpServletRequest) request); @@ -110,12 +112,9 @@ public class TransferToLeaderFilter implements Filter { req = new ReuseHttpServletRequest((HttpServletRequest) request); } - String urlString = req.getRequestURI(); - if (StringUtils.isNotBlank(req.getQueryString())) { urlString += "?" + req.getQueryString(); } - try { String path = new URI(req.getRequestURI()).getPath(); Method method = controllerMethodsCache.getMethod(req.getMethod(), path); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java index 0010b8bb9..b4dc2dbad 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java @@ -226,13 +226,7 @@ public class PropertyUtil implements ApplicationContextInitializerliaochuntao */ -public class Member implements Cloneable { +public class Member implements Comparable, Cloneable { private String ip; @@ -144,6 +144,11 @@ public class Member implements Cloneable { return Objects.hash(ip, port); } + @Override + public int compareTo(Member o) { + return getAddress().compareTo(o.getAddress()); + } + public static final class MemberBuilder { private String ip; private int port; diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java index 40b48f540..06bf9567c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java @@ -38,7 +38,7 @@ public class MemberChangeEvent implements Event { private static final long serialVersionUID = 7308126651076668976L; - private Collection allMembers; + private Collection members; private long no = SEQUENCE.getAndIncrement(); @@ -46,12 +46,12 @@ public class MemberChangeEvent implements Event { return new MemberChangeEventBuilder(); } - public Collection getAllMembers() { - return allMembers; + public Collection getMembers() { + return members; } - public void setAllMembers(Collection allMembers) { - this.allMembers = allMembers; + public void setMembers(Collection members) { + this.members = members; } @Override @@ -65,14 +65,14 @@ public class MemberChangeEvent implements Event { private MemberChangeEventBuilder() { } - public MemberChangeEventBuilder allNodes(Collection allMembers) { + public MemberChangeEventBuilder members(Collection allMembers) { this.allMembers = allMembers; return this; } public MemberChangeEvent build() { MemberChangeEvent memberChangeEvent = new MemberChangeEvent(); - memberChangeEvent.setAllMembers(allMembers); + memberChangeEvent.setMembers(allMembers); return memberChangeEvent; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index 913dfada5..39830abc6 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -80,13 +80,12 @@ import java.util.function.BiFunction; public class ServerMemberManager implements ApplicationListener { - private final NAsyncHttpClient asyncHttpClient = HttpClientManager - .newAsyncHttpClient(ServerMemberManager.class.getCanonicalName()); + private final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient(); /** * Cluster node list */ - private volatile ConcurrentSkipListMap serverList = new ConcurrentSkipListMap<>(); + private volatile ConcurrentSkipListMap serverList; /** * Is this node in the cluster list @@ -108,6 +107,9 @@ public class ServerMemberManager */ private MemberLookup lookup; + /** + * self member obj + */ private Member self; /** @@ -121,6 +123,7 @@ public class ServerMemberManager private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask(); public ServerMemberManager(ServletContext servletContext) { + this.serverList = new ConcurrentSkipListMap(); ApplicationUtils.setContextPath(servletContext.getContextPath()); MemberUtils.setManager(this); } @@ -226,7 +229,9 @@ public class ServerMemberManager public Collection allMembers() { // We need to do a copy to avoid affecting the real data - return new ArrayList<>(serverList.values()); + List list = new ArrayList<>(serverList.values()); + Collections.sort(list); + return list; } public List allMembersWithoutSelf() { @@ -255,7 +260,7 @@ public class ServerMemberManager } boolean hasChange = false; - ConcurrentSkipListMap tmpMap = new ConcurrentSkipListMap<>(); + ConcurrentSkipListMap tmpMap = new ConcurrentSkipListMap(); Set tmpAddressInfo = new ConcurrentHashSet<>(); for (Member member : members) { final String address = member.getAddress(); @@ -284,7 +289,7 @@ public class ServerMemberManager if (hasChange) { MemberUtils.syncToFile(members); Loggers.CLUSTER.warn("member has changed : {}", members); - Event event = MemberChangeEvent.builder().allNodes(members).build(); + Event event = MemberChangeEvent.builder().members(allMembers()).build(); NotifyCenter.publishEvent(event); } @@ -292,15 +297,13 @@ public class ServerMemberManager } public synchronized boolean memberJoin(Collection members) { - Set set = new HashSet<>(); - set.addAll(members); + Set set = new HashSet<>(members); set.addAll(allMembers()); return memberChange(set); } public synchronized boolean memberLeave(Collection members) { - Set set = new HashSet<>(); - set.addAll(allMembers()); + Set set = new HashSet<>(allMembers()); set.removeAll(members); return memberChange(set); } @@ -385,7 +388,7 @@ public class ServerMemberManager this.cursor = (this.cursor + 1) % members.size(); Member target = members.get(cursor); - Loggers.CLUSTER.debug("report the metadata to the node : {}", target); + Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress()); final String url = HttpUtils.buildUrl(false, target.getAddress(), ApplicationUtils.getContextPath(), Commons.NACOS_CORE_CONTEXT, @@ -402,7 +405,7 @@ public class ServerMemberManager else { Loggers.CLUSTER .warn("failed to report new info to target node : {}, result : {}", - target, result); + target.getAddress(), result); MemberUtils.onFail(target); } } @@ -411,7 +414,7 @@ public class ServerMemberManager public void onError(Throwable throwable) { Loggers.CLUSTER .error("failed to report new info to target node : {}, error : {}", - target, throwable); + target.getAddress(), throwable); MemberUtils.onFail(target); } }); @@ -419,7 +422,7 @@ public class ServerMemberManager catch (Throwable ex) { Loggers.CLUSTER .error("failed to report new info to target node : {}, error : {}", - target, ex); + target.getAddress(), ex); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/AddressServerMemberLookup.java b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/AddressServerMemberLookup.java index 680f1943c..7cb08eef5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/AddressServerMemberLookup.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/AddressServerMemberLookup.java @@ -54,7 +54,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup { private volatile boolean isAddressServerHealth = true; private int addressServerFailCount = 0; private int maxFailCount = 12; - private NSyncHttpClient syncHttpClient = HttpClientManager.getShareSyncHttpClient(); + private NSyncHttpClient syncHttpClient = HttpClientManager.getSyncHttpClient(); private volatile boolean shutdown = false; @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/DiscoveryMemberLookup.java b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/DiscoveryMemberLookup.java index 4c529c31c..5d19813a9 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/DiscoveryMemberLookup.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/DiscoveryMemberLookup.java @@ -23,13 +23,11 @@ import com.alibaba.nacos.common.http.NAsyncHttpClient; import com.alibaba.nacos.common.http.param.Header; import com.alibaba.nacos.common.http.param.Query; import com.alibaba.nacos.common.model.RestResult; -import com.alibaba.nacos.common.utils.ExceptionUtil; -import com.alibaba.nacos.common.utils.TimerContext; +import com.alibaba.nacos.core.utils.TimerContext; import com.alibaba.nacos.core.cluster.AbstractMemberLookup; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberUtils; import com.alibaba.nacos.core.cluster.NodeState; -import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.Task; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Commons; @@ -40,7 +38,6 @@ import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -83,8 +80,7 @@ import java.util.Set; */ public class DiscoveryMemberLookup extends AbstractMemberLookup { - NAsyncHttpClient asyncHttpClient = HttpClientManager - .newAsyncHttpClient(ServerMemberManager.class.getCanonicalName()); + NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient(); MemberListSyncTask syncTask; @@ -186,7 +182,7 @@ public class DiscoveryMemberLookup extends AbstractMemberLookup { Loggers.CLUSTER.error("node state report task has error : {}", e); } finally { - TimerContext.end(Loggers.CLUSTER); + TimerContext.end(); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java b/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java index a9c49a65a..60c95764b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java +++ b/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java @@ -28,7 +28,6 @@ import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberUtils; import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.ServerMemberManager; -import com.alibaba.nacos.core.cluster.lookup.LookupFactory; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Commons; import com.alibaba.nacos.core.utils.GenericType; @@ -36,12 +35,10 @@ import com.alibaba.nacos.core.utils.Loggers; import java.util.ArrayList; import java.util.Collection; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -49,8 +46,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import javax.annotation.PostConstruct; - /** * @author liaochuntao */ @@ -58,8 +53,11 @@ import javax.annotation.PostConstruct; @RequestMapping(Commons.NACOS_CORE_CONTEXT + "/cluster") public class NacosClusterController { - @Autowired - private ServerMemberManager memberManager; + private final ServerMemberManager memberManager; + + public NacosClusterController(ServerMemberManager memberManager) { + this.memberManager = memberManager; + } @GetMapping(value = "/self") public RestResult self() { @@ -67,7 +65,7 @@ public class NacosClusterController { } @GetMapping(value = "/nodes") - public RestResult> listAllNode(@RequestParam(value = "keyword", required = false) String ipKeyWord) { + public RestResult> listNodes(@RequestParam(value = "keyword", required = false) String ipKeyWord) { Collection members = memberManager.allMembers(); Collection result = new ArrayList<>(); @@ -90,7 +88,7 @@ public class NacosClusterController { @GetMapping(value = "/simple/nodes") public RestResult> listSimpleNodes() { - return RestResultUtils.success(MemberUtils.simpleMembers(memberManager.allMembers())); + return RestResultUtils.success(memberManager.getMemberAddressInfos()); } @GetMapping("/health") @@ -101,7 +99,7 @@ public class NacosClusterController { @PostMapping(value = {"/report"}) public RestResult report(@RequestBody Member node) { if (!node.check()) { - return RestResultUtils.failedWithData("Node information is illegal"); + return RestResultUtils.failedWithMsg(400, "Node information is illegal"); } Loggers.CLUSTER.debug("node state report, receive info : {}", node); node.setState(NodeState.UP); @@ -125,8 +123,7 @@ public class NacosClusterController { public RestResult leave(@RequestBody Collection params) throws Exception { Collection memberList = MemberUtils.multiParse(params); memberManager.memberLeave(memberList); - final NAsyncHttpClient asyncHttpClient = HttpClientManager - .newAsyncHttpClient(ServerMemberManager.class.getCanonicalName()); + final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient(); final GenericType> genericType = new GenericType>() {}; final Collection notifyList = memberManager.allMembersWithoutSelf(); notifyList.removeAll(memberList); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java index 4aee5bf63..9a55e4aff 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java @@ -171,7 +171,7 @@ public class ProtocolManager // time T2 after a period of time. // (T1 < T2) - Set copy = new HashSet<>(event.getAllMembers()); + Set copy = new HashSet<>(event.getMembers()); // Node change events between different protocols should not block each other if (Objects.nonNull(apProtocol)) { diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java index 4df8b75ad..e7b9fb773 100644 --- a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java @@ -280,33 +280,32 @@ public class NotifyCenter { public static class Publisher extends Thread { - private final Class eventType; - private final CopyOnWriteArraySet subscribes = new CopyOnWriteArraySet<>(); - private int queueMaxSize = -1; private volatile boolean initialized = false; private volatile boolean canOpen = false; private volatile boolean shutdown = false; + + private final Class eventType; + private final CopyOnWriteArraySet subscribes = new CopyOnWriteArraySet<>(); + private int queueMaxSize = -1; private BlockingQueue queue; private Supplier supplier; private long lastEventSequence = -1L; // judge the subscribe can deal Event - private BiPredicate filter = new BiPredicate() { - @Override - public boolean test(Event event, Subscribe subscribe) { - return true; - } - }; + private BiPredicate filter; Publisher(final Class eventType) { this(eventType, RING_BUFFER_SIZE); } Publisher(final Class eventType, final int queueMaxSize) { - this.eventType = eventType; - this.queueMaxSize = queueMaxSize; - this.queue = new ArrayBlockingQueue<>(queueMaxSize); + this(eventType, new BiPredicate() { + @Override + public boolean test(Event event, Subscribe subscribe) { + return true; + } + }, queueMaxSize); } Publisher(final Class eventType, @@ -399,9 +398,6 @@ public class NotifyCenter { if (!initialized) { throw new IllegalStateException("Publisher does not start"); } - if (shutdown) { - throw new IllegalStateException("Publisher already shutdown"); - } } void shutdown() { diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/Loggers.java b/core/src/main/java/com/alibaba/nacos/core/utils/Loggers.java index 104e85f39..e69cdddc8 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/Loggers.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/Loggers.java @@ -33,4 +33,6 @@ public class Loggers { public static final Logger RAFT = LoggerFactory.getLogger("com.alibaba.nacos.core.protocol.raft"); public static final Logger CLUSTER = LoggerFactory.getLogger("com.alibaba.nacos.core.cluster"); + + public static final Logger JOB_TIMER = LoggerFactory.getLogger("com.alibaba.nacos.core.job.timer"); } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/TimerContext.java b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java similarity index 63% rename from common/src/main/java/com/alibaba/nacos/common/utils/TimerContext.java rename to core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java index 63289452c..f53c76e35 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/TimerContext.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java @@ -14,9 +14,11 @@ * limitations under the License. */ -package com.alibaba.nacos.common.utils; +package com.alibaba.nacos.core.utils; -import org.slf4j.Logger; +import com.alibaba.nacos.common.utils.Pair; + +import java.util.concurrent.Callable; /** * @author liaochuntao @@ -30,11 +32,29 @@ public class TimerContext { TIME_RECORD.set(Pair.with(name, startTime)); } - public static void end(Logger logger) { + public static void end() { long endTime = System.currentTimeMillis(); Pair record = TIME_RECORD.get(); - logger.debug("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond())); + Loggers.JOB_TIMER.info("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond())); TIME_RECORD.remove(); } + public static void run(Runnable job, String name) { + start(name); + try { + job.run(); + } finally { + end(); + } + } + + public static V run(Callable job, String name) throws Exception { + start(name); + try { + return job.call(); + } finally { + end(); + } + } + } diff --git a/core/src/main/resources/META-INF/logback/nacos.xml b/core/src/main/resources/META-INF/logback/nacos.xml index 8313602b7..9ca4512f3 100644 --- a/core/src/main/resources/META-INF/logback/nacos.xml +++ b/core/src/main/resources/META-INF/logback/nacos.xml @@ -137,6 +137,23 @@ + + ${LOG_HOME}/nacos-job-timer.log + true + + ${LOG_HOME}/nacos-job-timer.log.%d{yyyy-MM-dd}.%i + 2GB + 7 + 7GB + true + + + %date %level %msg%n%n + UTF-8 + + + @@ -172,6 +189,11 @@ + + + + + diff --git a/distribution/conf/application.properties b/distribution/conf/application.properties index 2d98cfdc2..88ee10835 100644 --- a/distribution/conf/application.properties +++ b/distribution/conf/application.properties @@ -99,7 +99,7 @@ nacos.security.ignore.urls=/,/error,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/ nacos.core.auth.system.type=nacos ### If turn on auth system: -nacos.core.auth.enabled=true +nacos.core.auth.enabled=false ### The token expiration in seconds: nacos.core.auth.default.token.expire.seconds=18000 diff --git a/distribution/conf/nacos-logback.xml b/distribution/conf/nacos-logback.xml index ea8674dd2..9d6452a79 100644 --- a/distribution/conf/nacos-logback.xml +++ b/distribution/conf/nacos-logback.xml @@ -478,6 +478,23 @@ + + ${LOG_HOME}/nacos-job-timer.log + true + + ${LOG_HOME}/nacos-job-timer.log.%d{yyyy-MM-dd}.%i + 2GB + 7 + 7GB + true + + + %date %level %msg%n%n + UTF-8 + + + @@ -579,6 +596,11 @@ + + + + + diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index 4c8f0e403..558e75c80 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -30,7 +30,12 @@ import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyServic import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Service; -import com.alibaba.nacos.naming.misc.*; +import com.alibaba.nacos.naming.misc.GlobalConfig; +import com.alibaba.nacos.naming.misc.GlobalExecutor; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.NamingProxy; +import com.alibaba.nacos.naming.misc.NetUtils; +import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.pojo.Record; import org.apache.commons.lang3.StringUtils; import org.javatuples.Pair; @@ -41,10 +46,11 @@ import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * A consistency protocol algorithm called Distro @@ -64,366 +70,391 @@ import java.util.concurrent.LinkedBlockingQueue; @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { - @Autowired - private DistroMapper distroMapper; - - @Autowired - private DataStore dataStore; - - @Autowired - private TaskDispatcher taskDispatcher; - - @Autowired - private Serializer serializer; - - @Autowired - private ServerMemberManager memberManager; - - @Autowired - private SwitchDomain switchDomain; - - @Autowired - private GlobalConfig globalConfig; - - private boolean initialized = false; - - private volatile Notifier notifier = new Notifier(); - - private LoadDataTask loadDataTask = new LoadDataTask(); - - private Map> listeners = new ConcurrentHashMap<>(); - - private Map syncChecksumTasks = new ConcurrentHashMap<>(16); - - @PostConstruct - public void init() { - GlobalExecutor.submit(loadDataTask); - GlobalExecutor.submitDistroNotifyTask(notifier); - } - - private class LoadDataTask implements Runnable { - - @Override - public void run() { - try { - load(); - if (!initialized) { - GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis()); - } - } catch (Exception e) { - Loggers.DISTRO.error("load data failed.", e); - } - } - } - - public void load() throws Exception { - if (ApplicationUtils.getStandaloneMode()) { - initialized = true; - return; - } - // size = 1 means only myself in the list, we need at least one another server alive: - while (memberManager.getServerList().size() <= 1) { - Thread.sleep(1000L); - Loggers.DISTRO.info("waiting server list init..."); - } - - for (Map.Entry entry : memberManager.getServerList().entrySet()) { - final String address = entry.getValue().getAddress(); - if (NetUtils.localServer().equals(address)) { - continue; - } - if (Loggers.DISTRO.isDebugEnabled()) { - Loggers.DISTRO.debug("sync from " + address); - } - // try sync data from remote server: - if (syncAllDataFromRemote(address)) { - initialized = true; - return; - } - } - } - - @Override - public void put(String key, Record value) throws NacosException { - onPut(key, value); - taskDispatcher.addTask(key); - } - - @Override - public void remove(String key) throws NacosException { - onRemove(key); - listeners.remove(key); - } - - @Override - public Datum get(String key) throws NacosException { - return dataStore.get(key); - } - - public void onPut(String key, Record value) { - - if (KeyBuilder.matchEphemeralInstanceListKey(key)) { - Datum datum = new Datum<>(); - datum.value = (Instances) value; - datum.key = key; - datum.timestamp.incrementAndGet(); - dataStore.put(key, datum); - } - - if (!listeners.containsKey(key)) { - return; - } - - notifier.addTask(key, ApplyAction.CHANGE); - } - - public void onRemove(String key) { - - dataStore.remove(key); - - if (!listeners.containsKey(key)) { - return; - } - - notifier.addTask(key, ApplyAction.DELETE); - } - - public void onReceiveChecksums(Map checksumMap, String server) { - - if (syncChecksumTasks.containsKey(server)) { - // Already in process of this server: - Loggers.DISTRO.warn("sync checksum task already in process with {}", server); - return; - } - - syncChecksumTasks.put(server, "1"); - - try { - - List toUpdateKeys = new ArrayList<>(); - List toRemoveKeys = new ArrayList<>(); - for (Map.Entry entry : checksumMap.entrySet()) { - if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) { - // this key should not be sent from remote server: - Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server); - // abort the procedure: - return; - } - - if (!dataStore.contains(entry.getKey()) || - dataStore.get(entry.getKey()).value == null || - !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) { - toUpdateKeys.add(entry.getKey()); - } - } - - for (String key : dataStore.keys()) { - - if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) { - continue; - } - - if (!checksumMap.containsKey(key)) { - toRemoveKeys.add(key); - } - } - - if (Loggers.DISTRO.isDebugEnabled()) { - Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server); - } - - for (String key : toRemoveKeys) { - onRemove(key); - } - - if (toUpdateKeys.isEmpty()) { - return; - } - - try { - byte[] result = NamingProxy.getData(toUpdateKeys, server); - processData(result); - } catch (Exception e) { - Loggers.DISTRO.error("get data from " + server + " failed!", e); - } - } finally { - // Remove this 'in process' flag: - syncChecksumTasks.remove(server); - } - - } - - public boolean syncAllDataFromRemote(String server) { - - try { - byte[] data = NamingProxy.getAllData(server); - processData(data); - return true; - } catch (Exception e) { - Loggers.DISTRO.error("sync full data from " + server + " failed!", e); - return false; - } - } - - public void processData(byte[] data) throws Exception { - if (data.length > 0) { - Map> datumMap = - serializer.deserializeMap(data, Instances.class); - - - for (Map.Entry> entry : datumMap.entrySet()) { - dataStore.put(entry.getKey(), entry.getValue()); - - if (!listeners.containsKey(entry.getKey())) { - // pretty sure the service not exist: - if (switchDomain.isDefaultInstanceEphemeral()) { - // create empty service - Loggers.DISTRO.info("creating service {}", entry.getKey()); - Service service = new Service(); - String serviceName = KeyBuilder.getServiceName(entry.getKey()); - String namespaceId = KeyBuilder.getNamespace(entry.getKey()); - service.setName(serviceName); - service.setNamespaceId(namespaceId); - service.setGroupName(Constants.DEFAULT_GROUP); - // now validate the service. if failed, exception will be thrown - service.setLastModifiedMillis(System.currentTimeMillis()); - service.recalculateChecksum(); - listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) - .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); - } - } - } - - for (Map.Entry> entry : datumMap.entrySet()) { - - if (!listeners.containsKey(entry.getKey())) { - // Should not happen: - Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); - continue; - } - - try { - for (RecordListener listener : listeners.get(entry.getKey())) { - listener.onChange(entry.getKey(), entry.getValue().value); - } - } catch (Exception e) { - Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e); - continue; - } - - // Update data store if listener executed successfully: - dataStore.put(entry.getKey(), entry.getValue()); - } - } - } - - @Override - public void listen(String key, RecordListener listener) throws NacosException { - if (!listeners.containsKey(key)) { - listeners.put(key, new CopyOnWriteArrayList<>()); - } - - if (listeners.get(key).contains(listener)) { - return; - } - - listeners.get(key).add(listener); - } - - @Override - public void unlisten(String key, RecordListener listener) throws NacosException { - if (!listeners.containsKey(key)) { - return; - } - for (RecordListener recordListener : listeners.get(key)) { - if (recordListener.equals(listener)) { - listeners.get(key).remove(listener); - break; - } - } - } - - @Override - public boolean isAvailable() { - return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus()); - } - - public boolean isInitialized() { - return initialized || !globalConfig.isDataWarmup(); - } - - public class Notifier implements Runnable { - - private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024); - - private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024); - - public void addTask(String datumKey, ApplyAction action) { - - if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { - return; - } - if (action == ApplyAction.CHANGE) { - services.put(datumKey, StringUtils.EMPTY); - } - tasks.add(Pair.with(datumKey, action)); - } - - public int getTaskSize() { - return tasks.size(); - } - - @Override - public void run() { - Loggers.DISTRO.info("distro notifier started"); - - while (true) { - try { - - Pair pair = tasks.take(); - - if (pair == null) { - continue; - } - - String datumKey = (String) pair.getValue0(); - ApplyAction action = (ApplyAction) pair.getValue1(); - - services.remove(datumKey); - - int count = 0; - - if (!listeners.containsKey(datumKey)) { - continue; - } - - for (RecordListener listener : listeners.get(datumKey)) { - - count++; - - try { - if (action == ApplyAction.CHANGE) { - listener.onChange(datumKey, dataStore.get(datumKey).value); - continue; - } - - if (action == ApplyAction.DELETE) { - listener.onDelete(datumKey); - continue; - } - } catch (Throwable e) { - Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); - } - } - - if (Loggers.DISTRO.isDebugEnabled()) { - Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", - datumKey, count, action.name()); - } - } catch (Throwable e) { - Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); - } - } - } - } + @Autowired + private DistroMapper distroMapper; + + @Autowired + private DataStore dataStore; + + @Autowired + private TaskDispatcher taskDispatcher; + + @Autowired + private Serializer serializer; + + @Autowired + private ServerMemberManager memberManager; + + @Autowired + private SwitchDomain switchDomain; + + @Autowired + private GlobalConfig globalConfig; + + private boolean initialized = false; + + private volatile Notifier notifier = new Notifier(); + + private LoadDataTask loadDataTask = new LoadDataTask(); + + private Map> listeners = new ConcurrentHashMap<>(); + + private Map syncChecksumTasks = new ConcurrentHashMap<>(16); + + @PostConstruct + public void init() { + GlobalExecutor.submit(loadDataTask); + GlobalExecutor.submitDistroNotifyTask(notifier); + } + + private class LoadDataTask implements Runnable { + + @Override + public void run() { + try { + load(); + if (!initialized) { + GlobalExecutor + .submit(this, globalConfig.getLoadDataRetryDelayMillis()); + } + } + catch (Exception e) { + Loggers.DISTRO.error("load data failed.", e); + } + } + } + + public void load() throws Exception { + if (ApplicationUtils.getStandaloneMode()) { + initialized = true; + return; + } + // size = 1 means only myself in the list, we need at least one another server alive: + while (memberManager.getServerList().size() <= 1) { + Thread.sleep(1000L); + Loggers.DISTRO.info("waiting server list init..."); + } + + for (Map.Entry entry : memberManager.getServerList().entrySet()) { + final String address = entry.getValue().getAddress(); + if (NetUtils.localServer().equals(address)) { + continue; + } + if (Loggers.DISTRO.isDebugEnabled()) { + Loggers.DISTRO.debug("sync from " + address); + } + // try sync data from remote server: + if (syncAllDataFromRemote(address)) { + initialized = true; + return; + } + } + } + + @Override + public void put(String key, Record value) throws NacosException { + onPut(key, value); + taskDispatcher.addTask(key); + } + + @Override + public void remove(String key) throws NacosException { + onRemove(key); + listeners.remove(key); + } + + @Override + public Datum get(String key) throws NacosException { + return dataStore.get(key); + } + + public void onPut(String key, Record value) { + + if (KeyBuilder.matchEphemeralInstanceListKey(key)) { + Datum datum = new Datum<>(); + datum.value = (Instances) value; + datum.key = key; + datum.timestamp.incrementAndGet(); + dataStore.put(key, datum); + } + + if (!listeners.containsKey(key)) { + return; + } + + notifier.addTask(key, ApplyAction.CHANGE); + } + + public void onRemove(String key) { + + dataStore.remove(key); + + if (!listeners.containsKey(key)) { + return; + } + + notifier.addTask(key, ApplyAction.DELETE); + } + + public void onReceiveChecksums(Map checksumMap, String server) { + + if (syncChecksumTasks.containsKey(server)) { + // Already in process of this server: + Loggers.DISTRO.warn("sync checksum task already in process with {}", server); + return; + } + + syncChecksumTasks.put(server, "1"); + + try { + + List toUpdateKeys = new ArrayList<>(); + List toRemoveKeys = new ArrayList<>(); + for (Map.Entry entry : checksumMap.entrySet()) { + if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) { + // this key should not be sent from remote server: + Loggers.DISTRO.error("receive responsible key timestamp of " + entry + .getKey() + " from " + server); + // abort the procedure: + return; + } + + if (!dataStore.contains(entry.getKey()) + || dataStore.get(entry.getKey()).value == null || !dataStore + .get(entry.getKey()).value.getChecksum() + .equals(entry.getValue())) { + toUpdateKeys.add(entry.getKey()); + } + } + + for (String key : dataStore.keys()) { + + if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) { + continue; + } + + if (!checksumMap.containsKey(key)) { + toRemoveKeys.add(key); + } + } + + if (Loggers.DISTRO.isDebugEnabled()) { + Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", + toRemoveKeys, toUpdateKeys, server); + } + + for (String key : toRemoveKeys) { + onRemove(key); + } + + if (toUpdateKeys.isEmpty()) { + return; + } + + try { + byte[] result = NamingProxy.getData(toUpdateKeys, server); + processData(result); + } + catch (Exception e) { + Loggers.DISTRO.error("get data from " + server + " failed!", e); + } + } + finally { + // Remove this 'in process' flag: + syncChecksumTasks.remove(server); + } + + } + + public boolean syncAllDataFromRemote(String server) { + + try { + byte[] data = NamingProxy.getAllData(server); + processData(data); + return true; + } + catch (Exception e) { + Loggers.DISTRO.error("sync full data from " + server + " failed!", e); + return false; + } + } + + public void processData(byte[] data) throws Exception { + if (data.length > 0) { + Map> datumMap = serializer + .deserializeMap(data, Instances.class); + + for (Map.Entry> entry : datumMap.entrySet()) { + dataStore.put(entry.getKey(), entry.getValue()); + + if (!listeners.containsKey(entry.getKey())) { + // pretty sure the service not exist: + if (switchDomain.isDefaultInstanceEphemeral()) { + // create empty service + Loggers.DISTRO.info("creating service {}", entry.getKey()); + Service service = new Service(); + String serviceName = KeyBuilder.getServiceName(entry.getKey()); + String namespaceId = KeyBuilder.getNamespace(entry.getKey()); + service.setName(serviceName); + service.setNamespaceId(namespaceId); + service.setGroupName(Constants.DEFAULT_GROUP); + // now validate the service. if failed, exception will be thrown + service.setLastModifiedMillis(System.currentTimeMillis()); + service.recalculateChecksum(); + listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0).onChange( + KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), + service); + } + } + } + + for (Map.Entry> entry : datumMap.entrySet()) { + + if (!listeners.containsKey(entry.getKey())) { + // Should not happen: + Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); + continue; + } + + try { + for (RecordListener listener : listeners.get(entry.getKey())) { + listener.onChange(entry.getKey(), entry.getValue().value); + } + } + catch (Exception e) { + Loggers.DISTRO + .error("[NACOS-DISTRO] error while execute listener of key: {}", + entry.getKey(), e); + continue; + } + + // Update data store if listener executed successfully: + dataStore.put(entry.getKey(), entry.getValue()); + } + } + } + + @Override + public void listen(String key, RecordListener listener) throws NacosException { + if (!listeners.containsKey(key)) { + listeners.put(key, new CopyOnWriteArrayList<>()); + } + + if (listeners.get(key).contains(listener)) { + return; + } + + listeners.get(key).add(listener); + } + + @Override + public void unlisten(String key, RecordListener listener) throws NacosException { + if (!listeners.containsKey(key)) { + return; + } + for (RecordListener recordListener : listeners.get(key)) { + if (recordListener.equals(listener)) { + listeners.get(key).remove(listener); + break; + } + } + } + + @Override + public boolean isAvailable() { + return isInitialized() || ServerStatus.UP.name() + .equals(switchDomain.getOverriddenServerStatus()); + } + + public boolean isInitialized() { + return initialized || !globalConfig.isDataWarmup(); + } + + public class Notifier implements Runnable { + + private ConcurrentHashMap services = new ConcurrentHashMap<>( + 10 * 1024); + + private BlockingQueue> tasks = new ArrayBlockingQueue<>( + 1024 * 1024); + + public void addTask(String datumKey, ApplyAction action) { + + if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { + return; + } + if (action == ApplyAction.CHANGE) { + services.put(datumKey, StringUtils.EMPTY); + } + tasks.offer(Pair.with(datumKey, action)); + } + + public int getTaskSize() { + return tasks.size(); + } + + @Override + public void run() { + Loggers.DISTRO.info("distro notifier started"); + + for ( ; ; ) { + try { + Pair pair = tasks.take(); + handle(pair); + } + catch (Throwable e) { + Loggers.DISTRO + .error("[NACOS-DISTRO] Error while handling notifying task", + e); + } + } + } + + private void handle(Pair pair) { + try { + String datumKey = pair.getValue0(); + ApplyAction action = pair.getValue1(); + + services.remove(datumKey); + + int count = 0; + + if (!listeners.containsKey(datumKey)) { + return; + } + + for (RecordListener listener : listeners.get(datumKey)) { + + count++; + + try { + if (action == ApplyAction.CHANGE) { + listener.onChange(datumKey, dataStore.get(datumKey).value); + continue; + } + + if (action == ApplyAction.DELETE) { + listener.onDelete(datumKey); + continue; + } + } + catch (Throwable e) { + Loggers.DISTRO + .error("[NACOS-DISTRO] error while notifying listener of key: {}", + datumKey, e); + } + } + + if (Loggers.DISTRO.isDebugEnabled()) { + Loggers.DISTRO + .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", + datumKey, count, action.name()); + } + } + catch (Throwable e) { + Loggers.DISTRO + .error("[NACOS-DISTRO] Error while handling notifying task", e); + } + } + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java index 9ce602a62..42945572c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java @@ -30,7 +30,6 @@ import com.ning.http.client.Response; import org.apache.commons.collections.SortedBag; import org.apache.commons.collections.bag.TreeBag; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; @@ -44,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** @@ -54,20 +52,20 @@ import java.util.concurrent.atomic.AtomicLong; @DependsOn("ProtocolManager") public class RaftPeerSet implements MemberChangeListener { - @Autowired - private ServerMemberManager memberManager; + private final ServerMemberManager memberManager; private AtomicLong localTerm = new AtomicLong(0L); private RaftPeer leader = null; - private volatile Map peers = new ConcurrentHashMap<>(); + private volatile Map peers = new HashMap<>(8); private Set sites = new HashSet<>(); private volatile boolean ready = false; - public RaftPeerSet() { + public RaftPeerSet(ServerMemberManager memberManager) { + this.memberManager = memberManager; } @PostConstruct @@ -254,15 +252,18 @@ public class RaftPeerSet implements MemberChangeListener { @Override public void onEvent(MemberChangeEvent event) { - changePeers(event.getAllMembers()); + changePeers(event.getMembers()); } private void changePeers(Collection members) { + Map tmpPeers = new HashMap<>(members.size()); + for (Member member : members) { + final String address = member.getAddress(); if (peers.containsKey(address)) { - peers.put(address, peers.get(address)); + tmpPeers.put(address, peers.get(address)); continue; } @@ -274,10 +275,12 @@ public class RaftPeerSet implements MemberChangeListener { raftPeer.term.set(localTerm.get()); } - peers.put(address, raftPeer); + tmpPeers.put(address, raftPeer); } // replace raft peer set: + peers = tmpPeers; + ready = true; Loggers.RAFT.info("raft peers changed: " + members); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java index cb8baa71b..9387a1a18 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java @@ -23,6 +23,7 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.CommonParams; import com.alibaba.nacos.api.naming.NamingResponseCode; import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.core.utils.TimerContext; import com.alibaba.nacos.core.auth.ActionTypes; import com.alibaba.nacos.core.auth.Secured; import com.alibaba.nacos.core.utils.WebUtils; @@ -95,10 +96,15 @@ public class InstanceController { @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { - String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); - String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); + final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); + final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); - serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); + final Instance instance = parseInstance(request); + + TimerContext.run(() -> { + serviceManager.registerInstance(namespaceId, serviceName, instance); + return true; + }, instance.generateInstanceId() + " register, ephemeral : " + instance.isEphemeral()); return "ok"; } @@ -117,7 +123,10 @@ public class InstanceController { return "ok"; } - serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); + TimerContext.run(() -> { + serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); + return true; + }, instance.getInstanceId() + " deregister, ephemeral : " + instance.isEphemeral()); return "ok"; } @@ -126,19 +135,24 @@ public class InstanceController { @PutMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String update(HttpServletRequest request) throws Exception { - String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); - String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); + final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); + final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); + final Instance instance = parseInstance(request); String agent = WebUtils.getUserAgent(request); ClientInfo clientInfo = new ClientInfo(agent); - if (clientInfo.type == ClientInfo.ClientType.JAVA && - clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { - serviceManager.updateInstance(namespaceId, serviceName, parseInstance(request)); - } else { - serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); - } + TimerContext.run(() -> { + if (clientInfo.type == ClientInfo.ClientType.JAVA && + clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { + serviceManager.updateInstance(namespaceId, serviceName, instance); + } else { + serviceManager.registerInstance(namespaceId, serviceName, instance); + } + return true; + }, instance.generateInstanceId() + " update, ephemeral : " + instance.isEphemeral()); + return "ok"; } @@ -182,8 +196,10 @@ public class InstanceController { } instance.setLastBeat(System.currentTimeMillis()); instance.validate(); - - serviceManager.updateInstance(namespaceId, serviceName, instance); + TimerContext.run(() -> { + serviceManager.updateInstance(namespaceId, serviceName, instance); + return true; + }, instance.generateInstanceId() + " patch, ephemeral : " + instance.isEphemeral()); return "ok"; } @@ -288,9 +304,7 @@ public class InstanceController { port = clientBeat.getPort(); } - if (Loggers.SRV_LOG.isDebugEnabled()) { - Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); - } + Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); @@ -299,17 +313,26 @@ public class InstanceController { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } - instance = new Instance(); - instance.setPort(clientBeat.getPort()); - instance.setIp(clientBeat.getIp()); - instance.setWeight(clientBeat.getWeight()); - instance.setMetadata(clientBeat.getMetadata()); - instance.setClusterName(clusterName); - instance.setServiceName(serviceName); - instance.setInstanceId(instance.getInstanceId()); - instance.setEphemeral(clientBeat.isEphemeral()); - serviceManager.registerInstance(namespaceId, serviceName, instance); + TimerContext.start("[CLIENT-BEAT] : " + serviceName + "@@" + ip + ":" + port + ", ephemeral : " + clientBeat.isEphemeral()); + try { + Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, perform data compensation operations, beat: {}, serviceName: {}", + clientBeat, serviceName); + + instance = new Instance(); + instance.setPort(clientBeat.getPort()); + instance.setIp(clientBeat.getIp()); + instance.setWeight(clientBeat.getWeight()); + instance.setMetadata(clientBeat.getMetadata()); + instance.setClusterName(clusterName); + instance.setServiceName(serviceName); + instance.setInstanceId(instance.getInstanceId()); + instance.setEphemeral(clientBeat.isEphemeral()); + + serviceManager.registerInstance(namespaceId, serviceName, instance); + } finally { + TimerContext.end(); + } } Service service = serviceManager.getService(namespaceId, serviceName); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java index f9dc71234..10bf5da8b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java @@ -15,9 +15,9 @@ */ package com.alibaba.nacos.naming.core; -import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberChangeListener; import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MemberUtils; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -25,12 +25,13 @@ import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.NetUtils; import com.alibaba.nacos.naming.misc.SwitchDomain; import org.apache.commons.collections.CollectionUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; /** * @author nkorange @@ -38,24 +39,28 @@ import java.util.List; @Component("distroMapper") public class DistroMapper implements MemberChangeListener { - private List healthyList = new ArrayList<>(); + private volatile List healthyList = new ArrayList<>(); + + private final SwitchDomain switchDomain; + + private final ServerMemberManager memberManager; + + public DistroMapper(ServerMemberManager memberManager, SwitchDomain switchDomain) { + this.memberManager = memberManager; + this.switchDomain = switchDomain; + } public List getHealthyList() { return healthyList; } - @Autowired - private SwitchDomain switchDomain; - - @Autowired - private ServerMemberManager memberManager; - /** * init server list */ @PostConstruct public void init() { NotifyCenter.registerSubscribe(this); + this.healthyList = MemberUtils.simpleMembers(memberManager.allMembers()); } public boolean responsible(Cluster cluster, Instance instance) { @@ -66,50 +71,53 @@ public class DistroMapper implements MemberChangeListener { } public boolean responsible(String serviceName) { + final List servers = healthyList; + if (!switchDomain.isDistroEnabled() || ApplicationUtils.getStandaloneMode()) { return true; } - if (CollectionUtils.isEmpty(healthyList)) { + if (CollectionUtils.isEmpty(servers)) { // means distro config is not ready yet return false; } - int index = healthyList.indexOf(NetUtils.localServer()); - int lastIndex = healthyList.lastIndexOf(NetUtils.localServer()); + int index = servers.indexOf(NetUtils.localServer()); + int lastIndex = servers.lastIndexOf(NetUtils.localServer()); if (lastIndex < 0 || index < 0) { return true; } - int target = distroHash(serviceName) % healthyList.size(); + int target = distroHash(serviceName) % servers.size(); return target >= index && target <= lastIndex; } public String mapSrv(String serviceName) { - if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) { + final List servers = healthyList; + + if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) { return NetUtils.localServer(); } try { - return healthyList.get(distroHash(serviceName) % healthyList.size()); - } catch (Exception e) { + return servers.get(distroHash(serviceName) % servers.size()); + } catch (Throwable e) { Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e); - return NetUtils.localServer(); } } public int distroHash(String serviceName) { - return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE); + return Math.abs(Objects.hash(serviceName) % Integer.MAX_VALUE); } @Override public void onEvent(MemberChangeEvent event) { - List newHealthyList = new ArrayList<>(); - for (Member server : event.getAllMembers()) { - newHealthyList.add(server.getAddress()); - } - healthyList = newHealthyList; + healthyList = Collections.unmodifiableList(MemberUtils.simpleMembers(event.getMembers())); } + @Override + public boolean ignoreExpireEvent() { + return true; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java index 92f98b3b7..cd26375bc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java @@ -25,19 +25,8 @@ import org.apache.commons.lang3.StringUtils; */ public class NetUtils { - private static String serverAddress = null; - public static String localServer() { - return getLocalAddress() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort(); - } - - public static String getLocalAddress() { - if (StringUtils.isNotBlank(serverAddress)) { - return serverAddress; - } - - serverAddress = InetUtils.getSelfIp(); - return serverAddress; + return InetUtils.getSelfIp() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort(); } public static String num2ip(int ip) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java index 38d29d3fa..7d5212dd3 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java @@ -19,9 +19,12 @@ import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.naming.CommonParams; import com.alibaba.nacos.common.constant.HttpHeaderConsts; import com.alibaba.nacos.common.utils.IoUtils; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.code.ControllerMethodsCache; import com.alibaba.nacos.common.utils.ExceptionUtil; import com.alibaba.nacos.core.utils.OverrideParameterRequestWrapper; +import com.alibaba.nacos.core.utils.ReuseHttpRequest; +import com.alibaba.nacos.core.utils.ReuseHttpServletRequest; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.misc.HttpClient; import com.alibaba.nacos.naming.misc.Loggers; @@ -29,6 +32,11 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons; import org.apache.commons.codec.Charsets; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; import javax.servlet.*; import javax.servlet.http.HttpServletRequest; @@ -39,15 +47,18 @@ import java.net.URI; import java.security.AccessControlException; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; /** * @author nacos */ public class DistroFilter implements Filter { - private static final int PROXY_CONNECT_TIMEOUT = 2000; - private static final int PROXY_READ_TIMEOUT = 2000; + private static final String SLASH= "/"; + private static final String DOUBLE_SLASH = "://"; @Autowired private DistroMapper distroMapper; @@ -55,6 +66,8 @@ public class DistroFilter implements Filter { @Autowired private ControllerMethodsCache controllerMethodsCache; + private final RestTemplate restTemplate = new RestTemplate(); + @Override public void init(FilterConfig filterConfig) throws ServletException { @@ -62,7 +75,7 @@ public class DistroFilter implements Filter { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { - HttpServletRequest req = (HttpServletRequest) servletRequest; + ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest); HttpServletResponse resp = (HttpServletResponse) servletResponse; String urlString = req.getRequestURI(); @@ -112,25 +125,37 @@ public class DistroFilter implements Filter { return; } - List headerList = new ArrayList<>(16); - Enumeration headers = req.getHeaderNames(); - while (headers.hasMoreElements()) { - String headerName = headers.nextElement(); - headerList.add(headerName); - headerList.add(req.getHeader(headerName)); + if (urlString.startsWith(SLASH)) { + urlString = urlString.substring(1); } - String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name()); + final String targetServer = distroMapper.mapSrv(groupedServiceName); - HttpClient.HttpResult result = - HttpClient.request("http://" + distroMapper.mapSrv(groupedServiceName) + req.getRequestURI(), headerList, - HttpClient.translateParameterMap(req.getParameterMap()), - body, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod()); + final String reqUrl = + req.getScheme() + DOUBLE_SLASH + targetServer + SLASH + urlString; + + HttpHeaders headers = new HttpHeaders(); + Enumeration headerNames = req.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + headers.set(headerName, req.getHeader(headerName)); + } + + headers.set("Content-Type", "application/x-www-form-urlencoded;charset=" + + Charsets.UTF_8.name()); + headers.set("Accept-Charset", Charsets.UTF_8.name()); + headers.set(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.VERSION); + headers.set(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION); try { + HttpEntity httpEntity = new HttpEntity<>(req.getBody(), headers); + ResponseEntity result = restTemplate + .exchange(reqUrl, Objects.requireNonNull( + HttpMethod.resolve(req.getMethod()), "req.getMethod() is null"), httpEntity, + String.class); resp.setCharacterEncoding("UTF-8"); - resp.getWriter().write(result.content); - resp.setStatus(result.code); + resp.getWriter().write(Objects.requireNonNull(result.getBody(), "result.getBody() is null")); + resp.setStatus(result.getStatusCodeValue()); } catch (Exception ignore) { Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName) + urlString); } @@ -147,7 +172,7 @@ public class DistroFilter implements Filter { resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, "no such api:" + req.getMethod() + ":" + req.getRequestURI()); return; - } catch (Exception e) { + } catch (Throwable e) { resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server failed," + ExceptionUtil.getAllExceptionMsg(e)); return; diff --git a/pom.xml b/pom.xml index 14ea51a17..3e6b6d406 100644 --- a/pom.xml +++ b/pom.xml @@ -417,6 +417,31 @@ + + cluster-test + + + + maven-failsafe-plugin + 2.19.1 + + @{failsafeArgLine} + + **/*DITCase.java + + + + + + integration-test + verify + + + + + + + sonar-apache diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java similarity index 69% rename from test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_ITCase.java rename to test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java index 35266a841..5d1c74a76 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java @@ -16,12 +16,7 @@ package com.alibaba.nacos.test.config; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.AbstractListener; -import com.alibaba.nacos.common.http.HttpClientManager; -import com.alibaba.nacos.common.http.NSyncHttpClient; import com.alibaba.nacos.common.http.param.Header; import com.alibaba.nacos.common.http.param.Query; import com.alibaba.nacos.common.model.RestResult; @@ -29,46 +24,27 @@ import com.alibaba.nacos.config.server.model.event.RaftDBErrorEvent; import com.alibaba.nacos.config.server.model.event.RaftDBErrorRecoverEvent; import com.alibaba.nacos.config.server.service.repository.EmbeddedStoragePersistServiceImpl; import com.alibaba.nacos.config.server.service.repository.PersistService; -import com.alibaba.nacos.config.server.service.repository.DistributedDatabaseOperateImpl; import com.alibaba.nacos.consistency.cp.CPProtocol; -import com.alibaba.nacos.consistency.cp.Constants; import com.alibaba.nacos.core.distributed.id.IdGeneratorManager; import com.alibaba.nacos.core.distributed.raft.utils.JRaftConstants; import com.alibaba.nacos.core.notify.Event; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.notify.listener.Subscribe; -import com.alibaba.nacos.common.utils.DiskUtils; -import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.GenericType; import com.alibaba.nacos.core.utils.InetUtils; import com.alibaba.nacos.common.utils.ThreadUtils; -import com.alibaba.nacos.test.base.HttpClient4Test; -import org.junit.AfterClass; +import com.alibaba.nacos.test.core.BaseClusterTest; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.FixMethodOrder; -import org.junit.Ignore; import org.junit.Test; import org.junit.runners.MethodSorters; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.core.env.MapPropertySource; -import org.springframework.web.context.support.StandardServletEnvironment; -import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -76,111 +52,9 @@ import java.util.concurrent.atomic.AtomicReference; * @author liaochuntao */ @SuppressWarnings("all") -@Ignore @FixMethodOrder(value = MethodSorters.NAME_ASCENDING) -public class ConfigDerbyRaft_ITCase - extends HttpClient4Test { - - private static final String CONFIG_INFO_ID = "config-info-id"; - - private static ConfigService iconfig7; - private static ConfigService iconfig8; - private static ConfigService iconfig9; - - private static final NSyncHttpClient httpClient = HttpClientManager.getShareSyncHttpClient(); - - private static final AtomicBoolean[] finished = new AtomicBoolean[]{new AtomicBoolean(false), new AtomicBoolean(false), new AtomicBoolean(false)}; - - private static Map applications = new HashMap<>(); - - private static String clusterInfo; - - static { - System.getProperties().setProperty("nacos.core.auth.enabled", "false"); - System.getProperties().setProperty("embeddedStorage", "true"); - String ip = InetUtils.getSelfIp(); - clusterInfo = "nacos.member.list=" + ip + ":8847," + ip - + ":8848," + ip + ":8849"; - - NotifyCenter.registerSubscribe(new Subscribe() { - @Override - public void onEvent(RaftDBErrorEvent event) { - System.out.print(event.getEx()); - } - - @Override - public Class subscribeType() { - return RaftDBErrorEvent.class; - } - }); - } - - @BeforeClass - public static void before() throws Exception { - - CountDownLatch latch = new CountDownLatch(3); - - Runnable runnable = () -> { - for (int i = 0; i < 3; i++) { - try { - URL runnerUrl = new File("../console/target/classes").toURI().toURL(); - URL[] urls = new URL[] { runnerUrl }; - URLClassLoader cl = new URLClassLoader(urls); - Class runnerClass = cl.loadClass("com.alibaba.nacos.Nacos"); - run(i, latch, runnerClass); - } catch (Exception e) { - latch.countDown(); - } - } - }; - - new Thread(runnable).start(); - - latch.await(); - - System.out.println("The cluster node initialization is complete"); - - Properties setting7 = new Properties(); - String serverIp7 = "127.0.0.1:8847"; - setting7.put(PropertyKeyConst.SERVER_ADDR, serverIp7); - setting7.put(PropertyKeyConst.USERNAME, "nacos"); - setting7.put(PropertyKeyConst.PASSWORD, "nacos"); - iconfig7 = NacosFactory.createConfigService(setting7); - - Properties setting8 = new Properties(); - String serverIp8 = "127.0.0.1:8848"; - setting8.put(PropertyKeyConst.SERVER_ADDR, serverIp8); - setting8.put(PropertyKeyConst.USERNAME, "nacos"); - setting8.put(PropertyKeyConst.PASSWORD, "nacos"); - iconfig8 = NacosFactory.createConfigService(setting8); - - Properties setting9 = new Properties(); - String serverIp9 = "127.0.0.1:8849"; - setting9.put(PropertyKeyConst.SERVER_ADDR, serverIp9); - setting9.put(PropertyKeyConst.USERNAME, "nacos"); - setting9.put(PropertyKeyConst.PASSWORD, "nacos"); - iconfig9 = NacosFactory.createConfigService(setting9); - - TimeUnit.SECONDS.sleep(20L); - } - - @AfterClass - public static void after() throws Exception { - CountDownLatch latch = new CountDownLatch(applications.size()); - for (ConfigurableApplicationContext context : applications.values()) { - new Thread(() -> { - try { - System.out.println("start close : " + context); - context.close(); - } catch (Exception ignore) { - } finally { - System.out.println("finished close : " + context); - latch.countDown(); - } - }).start(); - } - latch.await(); - } +public class ConfigDerbyRaft_DITCase + extends BaseClusterTest { @Test public void test_a_publish_config() throws Exception { @@ -535,69 +409,4 @@ public class ConfigDerbyRaft_ITCase } - private static void run(final int index, final CountDownLatch latch, final Class cls) { - Runnable runnable = () -> { - try { - ApplicationUtils.setIsStandalone(false); - - final String path = Paths.get(System.getProperty("user.home"), "/nacos-" + index + "/").toString(); - DiskUtils.deleteDirectory(path); - - System.setProperty("nacos.home", path); - System.out.println("nacos.home is : [" + path + "]"); - - Map properties = new HashMap<>(); - properties.put("server.port", "884" + (7 + index)); - properties.put("nacos.home", path); - properties.put("nacos.logs.path", - Paths.get(System.getProperty("user.home"), "nacos-" + index, "/logs/").toString()); - properties.put("spring.jmx.enabled", false); - properties.put("nacos.core.snowflake.worker-id", index + 1); - MapPropertySource propertySource = new MapPropertySource( - "nacos_cluster_test", properties); - ConfigurableEnvironment environment = new StandardServletEnvironment(); - environment.getPropertySources().addFirst(propertySource); - SpringApplication cluster = new SpringApplicationBuilder(cls).web( - WebApplicationType.SERVLET).environment(environment) - .properties(clusterInfo).properties("embeddedStorage=true").build(); - - ConfigurableApplicationContext context = cluster.run(); - - context.stop(); - - DistributedDatabaseOperateImpl operate = context.getBean(DistributedDatabaseOperateImpl.class); - CPProtocol protocol = context.getBean(CPProtocol.class); - - protocol.protocolMetaData() - .subscribe(operate.group(), Constants.LEADER_META_DATA, - (o, arg) -> { - System.out.println("node : 884" + (7 + index) + "-> select leader is : " + arg); - if (finished[index].compareAndSet(false, true)) { - latch.countDown(); - } - }); - - new Thread(() -> { - try { - Thread.sleep(5000L); - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (finished[index].compareAndSet(false, true)) { - latch.countDown(); - } - } - }); - - applications.put(String.valueOf(properties.get("server.port")), context); - } - catch (Exception e) { - e.printStackTrace(); - latch.countDown(); - } - }; - - runnable.run(); - } - } \ No newline at end of file diff --git a/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java b/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java new file mode 100644 index 000000000..844f07f25 --- /dev/null +++ b/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java @@ -0,0 +1,236 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.core; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.common.http.HttpClientManager; +import com.alibaba.nacos.common.http.NSyncHttpClient; +import com.alibaba.nacos.common.utils.DiskUtils; +import com.alibaba.nacos.config.server.model.event.RaftDBErrorEvent; +import com.alibaba.nacos.config.server.service.repository.DistributedDatabaseOperateImpl; +import com.alibaba.nacos.consistency.cp.CPProtocol; +import com.alibaba.nacos.consistency.cp.Constants; +import com.alibaba.nacos.core.notify.Event; +import com.alibaba.nacos.core.notify.NotifyCenter; +import com.alibaba.nacos.core.notify.listener.Subscribe; +import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.core.utils.InetUtils; +import com.alibaba.nacos.test.base.HttpClient4Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.web.context.support.StandardServletEnvironment; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author liaochuntao + */ +public class BaseClusterTest extends HttpClient4Test { + + protected static final String CONFIG_INFO_ID = "config-info-id"; + + protected static ConfigService iconfig7; + protected static ConfigService iconfig8; + protected static ConfigService iconfig9; + + protected static NamingService inaming7; + protected static NamingService inaming8; + protected static NamingService inaming9; + + protected static final NSyncHttpClient httpClient = HttpClientManager.getSyncHttpClient(); + + protected static final AtomicBoolean[] finished = new AtomicBoolean[]{new AtomicBoolean(false), new AtomicBoolean(false), new AtomicBoolean(false)}; + + protected static Map applications = new HashMap<>(); + + protected static String clusterInfo; + + static { + System.getProperties().setProperty("nacos.core.auth.enabled", "false"); + System.getProperties().setProperty("embeddedStorage", "true"); + String ip = InetUtils.getSelfIp(); + clusterInfo = "nacos.member.list=" + ip + ":8847," + ip + + ":8848," + ip + ":8849"; + + NotifyCenter.registerSubscribe(new Subscribe() { + @Override + public void onEvent(RaftDBErrorEvent event) { + System.out.print(event.getEx()); + } + + @Override + public Class subscribeType() { + return RaftDBErrorEvent.class; + } + }); + } + + @BeforeClass + public static void before() throws Exception { + + CountDownLatch latch = new CountDownLatch(3); + + Runnable runnable = () -> { + for (int i = 0; i < 3; i++) { + try { + URL runnerUrl = new File("../console/target/classes").toURI().toURL(); + URL[] urls = new URL[] { runnerUrl }; + URLClassLoader cl = new URLClassLoader(urls); + Class runnerClass = cl.loadClass("com.alibaba.nacos.Nacos"); + run(i, latch, runnerClass); + } catch (Exception e) { + latch.countDown(); + } + } + }; + + new Thread(runnable).start(); + + latch.await(); + + System.out.println("The cluster node initialization is complete"); + + Properties setting7 = new Properties(); + String serverIp7 = "127.0.0.1:8847"; + setting7.put(PropertyKeyConst.SERVER_ADDR, serverIp7); + setting7.put(PropertyKeyConst.USERNAME, "nacos"); + setting7.put(PropertyKeyConst.PASSWORD, "nacos"); + iconfig7 = NacosFactory.createConfigService(setting7); + inaming7 = NacosFactory.createNamingService(setting7); + + Properties setting8 = new Properties(); + String serverIp8 = "127.0.0.1:8848"; + setting8.put(PropertyKeyConst.SERVER_ADDR, serverIp8); + setting8.put(PropertyKeyConst.USERNAME, "nacos"); + setting8.put(PropertyKeyConst.PASSWORD, "nacos"); + iconfig8 = NacosFactory.createConfigService(setting8); + inaming8 = NacosFactory.createNamingService(setting7); + + Properties setting9 = new Properties(); + String serverIp9 = "127.0.0.1:8849"; + setting9.put(PropertyKeyConst.SERVER_ADDR, serverIp9); + setting9.put(PropertyKeyConst.USERNAME, "nacos"); + setting9.put(PropertyKeyConst.PASSWORD, "nacos"); + iconfig9 = NacosFactory.createConfigService(setting9); + inaming9 = NacosFactory.createNamingService(setting7); + + TimeUnit.SECONDS.sleep(20L); + } + + @AfterClass + public static void after() throws Exception { + CountDownLatch latch = new CountDownLatch(applications.size()); + for (ConfigurableApplicationContext context : applications.values()) { + new Thread(() -> { + try { + System.out.println("start close : " + context); + context.close(); + } catch (Exception ignore) { + } finally { + System.out.println("finished close : " + context); + latch.countDown(); + } + }).start(); + } + latch.await(); + } + + private static void run(final int index, final CountDownLatch latch, final Class cls) { + Runnable runnable = () -> { + try { + ApplicationUtils.setIsStandalone(false); + + final String path = Paths + .get(System.getProperty("user.home"), "/nacos-" + index + "/").toString(); + DiskUtils.deleteDirectory(path); + + System.setProperty("nacos.home", path); + System.out.println("nacos.home is : [" + path + "]"); + + Map properties = new HashMap<>(); + properties.put("server.port", "884" + (7 + index)); + properties.put("nacos.home", path); + properties.put("nacos.logs.path", + Paths.get(System.getProperty("user.home"), "nacos-" + index, "/logs/").toString()); + properties.put("spring.jmx.enabled", false); + properties.put("nacos.core.snowflake.worker-id", index + 1); + MapPropertySource propertySource = new MapPropertySource( + "nacos_cluster_test", properties); + ConfigurableEnvironment environment = new StandardServletEnvironment(); + environment.getPropertySources().addFirst(propertySource); + SpringApplication cluster = new SpringApplicationBuilder(cls).web( + WebApplicationType.SERVLET).environment(environment) + .properties(clusterInfo).properties("embeddedStorage=true") + .build(); + + ConfigurableApplicationContext context = cluster.run(); + + DistributedDatabaseOperateImpl operate = context.getBean(DistributedDatabaseOperateImpl.class); + CPProtocol protocol = context.getBean(CPProtocol.class); + + protocol.protocolMetaData() + .subscribe(operate.group(), Constants.LEADER_META_DATA, + (o, arg) -> { + System.out.println("node : 884" + (7 + index) + "-> select leader is : " + arg); + if (finished[index].compareAndSet(false, true)) { + latch.countDown(); + } + }); + + new Thread(() -> { + try { + Thread.sleep(5000L); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (finished[index].compareAndSet(false, true)) { + latch.countDown(); + } + } + }); + + applications.put(String.valueOf(properties.get("server.port")), context); + } + catch (Throwable e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + }; + + runnable.run(); + } + +} diff --git a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java index be3b7edf4..325c283d9 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.test.core.cluster; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MemberUtils; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.Event; import com.alibaba.nacos.core.notify.NotifyCenter; @@ -32,7 +33,11 @@ import org.junit.runners.MethodSorters; import org.springframework.core.env.StandardEnvironment; import org.springframework.mock.web.MockServletContext; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +61,32 @@ public class ServerMemberManager_ITCase { memberManager.shutdown(); } + @Test + public void test_k_isFirst() { + String firstIp = "127.0.0.1:8847"; + String secondIp = "127.0.0.1:8847"; + String thirdIp = "127.0.0.1:8847"; + ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); + map.put(secondIp, Member.builder() + .ip("127.0.0.1") + .port(8847) + .build()); + map.put(firstIp, Member.builder() + .ip("127.0.0.1") + .port(8848) + .build()); + map.put(thirdIp, Member.builder() + .ip("127.0.0.1") + .port(8849) + .build()); + + List members = new ArrayList(map.values()); + Collections.sort(members); + List ss = MemberUtils.simpleMembers(members); + + Assert.assertEquals(ss.get(0), members.get(0).getAddress()); + } + @Test public void test_a_member_change() throws Exception { diff --git a/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java index d2cedd3c2..73f78ebf3 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/notify/NotifyCenter_ITCase.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong; /** * @author liaochuntao */ -@Ignore @FixMethodOrder(value = MethodSorters.NAME_ASCENDING) public class NotifyCenter_ITCase { @@ -103,7 +102,7 @@ public class NotifyCenter_ITCase { System.out.println("TestEvent event num : " + NotifyCenter.getPublisher(TestEvent.class).currentEventSize()); System.out.println("TestSlowEvent event num : " + NotifyCenter.getPublisher(TestSlowEvent.class).currentEventSize()); - latch.await(); + latch.await(5_000L, TimeUnit.MILLISECONDS); Assert.assertEquals(2, count.get()); } diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/NamingRaft_DITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/NamingRaft_DITCase.java new file mode 100644 index 000000000..69ed03783 --- /dev/null +++ b/test/src/test/java/com/alibaba/nacos/test/naming/NamingRaft_DITCase.java @@ -0,0 +1,68 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.naming; + +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.common.utils.ThreadUtils; +import com.alibaba.nacos.test.core.BaseClusterTest; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author liaochuntao + */ +@FixMethodOrder(value = MethodSorters.NAME_ASCENDING) +public class NamingRaft_DITCase extends BaseClusterTest { + + @Test + public void test_register_instance() throws Exception { + String serviceName = NamingBase.randomDomainName(); + Instance instance = new Instance(); + instance.setEphemeral(true); //是否临时实例 + instance.setServiceName(serviceName); + instance.setClusterName("c1"); + instance.setIp("11.11.11.11"); + instance.setPort(80); + + try { + inaming7.registerInstance(serviceName, instance); + } catch (Throwable ex) { + ex.printStackTrace(); + CountDownLatch latch = new CountDownLatch(1); + latch.await(60_000L, TimeUnit.MILLISECONDS); + } + + ThreadUtils.sleep(5_000L); + List list = inaming8.getAllInstances(serviceName); + Assert.assertEquals(1, list.size()); + + Instance host = list.get(0); + + Assert.assertEquals(host.getIp(), instance.getIp()); + Assert.assertEquals(host.getPort(), instance.getPort()); + Assert.assertEquals(host.getServiceName(), NamingUtils.getGroupedName(instance.getServiceName(), "DEFAULT_GROUP")); + Assert.assertEquals(host.getClusterName(), instance.getClusterName()); + } + +}