From 2e9c09d3678b45cbca576cda87262db008fc54e3 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Sun, 24 Apr 2022 13:33:28 +0800 Subject: [PATCH] fix different client worker share same localsnapshot bug ,add properties to control whether load snaoshot content on startup in CacheData (#8202) --- .../alibaba/nacos/api/PropertyKeyConst.java | 4 + .../nacos/client/config/impl/CacheData.java | 35 ++++-- .../client/config/impl/ClientWorker.java | 2 +- .../client/config/impl/ServerListManager.java | 109 +++++++++++------- client/src/main/resources/nacos-logback.xml | 4 + .../config/http/ServerHttpAgentTest.java | 2 +- .../client/config/impl/ClientWorkerTest.java | 2 +- .../config/impl/ServerListManagerTest.java | 2 +- .../remote/client/RpcClientFactory.java | 40 ++++--- 9 files changed, 125 insertions(+), 75 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java index 70efbe77e..7cf8c8dc5 100644 --- a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java +++ b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java @@ -29,8 +29,12 @@ public class PropertyKeyConst { public static final String ENDPOINT = "endpoint"; + public static final String ENDPOINT_QUERY_PARAMS = "endpointQueryParams"; + public static final String ENDPOINT_PORT = "endpointPort"; + public static final String SERVER_NAME = "serverName"; + public static final String NAMESPACE = "namespace"; public static final String USERNAME = "username"; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java index cf167d21a..1bc230a19 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java @@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicLong; */ public class CacheData { + private static final Logger LOGGER = LogUtils.logger(CacheData.class); + static final int CONCURRENCY = 5; static ThreadFactory internalNotifierFactory = r -> { @@ -56,11 +58,16 @@ public class CacheData { return t; }; + static boolean initSnapshot; + + static { + initSnapshot = Boolean.valueOf(System.getProperty("nacos.cache.data.init.snapshot", "true")); + LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot); + } + static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), internalNotifierFactory); - private static final Logger LOGGER = LogUtils.logger(CacheData.class); - private final String name; private final ConfigFilterChainManager configFilterChainManager; @@ -318,14 +325,14 @@ public class CacheData { } listenerWrap.lastCallMd5 = md5; - LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, - dataId, group, md5, listener, (System.currentTimeMillis() - start)); + LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId, + group, md5, listener, (System.currentTimeMillis() - start)); } catch (NacosException ex) { - LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", - name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); + LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, + dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); } catch (Throwable t) { - LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, - group, md5, listener, t.getCause()); + LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, + md5, listener, t.getCause()); } finally { listenerWrap.inNotifying = false; Thread.currentThread().setContextClassLoader(myClassLoader); @@ -395,8 +402,10 @@ public class CacheData { this.tenant = TenantUtil.getUserTenantForAcm(); listeners = new CopyOnWriteArrayList<>(); this.isInitializing = true; - this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant); - this.md5 = getMd5String(content); + if (initSnapshot) { + this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant); + this.md5 = getMd5String(content); + } this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant); } @@ -412,8 +421,10 @@ public class CacheData { this.tenant = tenant; listeners = new CopyOnWriteArrayList<>(); this.isInitializing = true; - this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant); - this.md5 = getMd5String(content); + if (initSnapshot) { + this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant); + this.md5 = getMd5String(content); + } } // ================== diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 3b1662e4c..93e48522c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -703,7 +703,7 @@ public class ClientWorker implements Closeable { @Override public String getName() { - return "config_rpc_client"; + return serverListManager.getName(); } @Override diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index ad74a1a8f..72467fa11 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -132,28 +132,28 @@ public class ServerListManager implements Closeable { for (String serverAddr : fixed) { String[] serverAddrArr = InternetAddressUtil.splitIPPortStr(serverAddr); if (serverAddrArr.length == 1) { - serverAddrs.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort()); + serverAddrs + .add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort()); } else { serverAddrs.add(serverAddr); } } this.serverUrls = new ArrayList<>(serverAddrs); - if (StringUtils.isBlank(namespace)) { - this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])); - } else { + if (StringUtils.isNotBlank(namespace)) { this.namespace = namespace; - this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-" - + namespace; + this.tenant = namespace; } + this.name = initServerName(null); } public ServerListManager(String host, int port) { this.isFixed = false; this.isStarted = false; - this.name = CUSTOM_NAME + "-" + host + "-" + port; - this.addressServerUrl = String - .format("http://%s:%d%s/%s", host, port, ContextPathUtil.normalizeContextPath(this.contentPath), - this.serverListName); + this.endpoint = host; + this.endpointPort = port; + + this.name = initServerName(null); + initAddressServerUrl(null); } public ServerListManager(String endpoint) throws NacosException { @@ -165,25 +165,18 @@ public class ServerListManager implements Closeable { this.isStarted = false; Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.ENDPOINT, endpoint); - endpoint = initEndpoint(properties); + this.endpoint = initEndpoint(properties); if (StringUtils.isBlank(endpoint)) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank"); } - if (StringUtils.isBlank(namespace)) { - this.name = endpoint; - this.addressServerUrl = String.format("http://%s:%d%s/%s", endpoint, this.endpointPort, - ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName); - } else { - if (StringUtils.isBlank(endpoint)) { - throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank"); - } - this.name = endpoint + "-" + namespace; + if (StringUtils.isNotBlank(namespace)) { this.namespace = namespace; this.tenant = namespace; - this.addressServerUrl = String.format("http://%s:%d%s/%s?namespace=%s", endpoint, this.endpointPort, - ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace); } + + this.name = initServerName(null); + initAddressServerUrl(properties); } public ServerListManager(Properties properties) throws NacosException { @@ -192,6 +185,11 @@ public class ServerListManager implements Closeable { String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE); initParam(properties); + if (StringUtils.isNotBlank(namespace)) { + this.namespace = namespace; + this.tenant = namespace; + } + if (StringUtils.isNotEmpty(serverAddrsStr)) { this.isFixed = true; List serverAddrs = new ArrayList<>(); @@ -211,33 +209,62 @@ public class ServerListManager implements Closeable { } } this.serverUrls = serverAddrs; - if (StringUtils.isBlank(namespace)) { - this.name = FIXED_NAME + "-" + getFixedNameSuffix( - this.serverUrls.toArray(new String[this.serverUrls.size()])); - } else { - this.namespace = namespace; - this.tenant = namespace; - this.name = FIXED_NAME + "-" + getFixedNameSuffix( - this.serverUrls.toArray(new String[this.serverUrls.size()])) + "-" + namespace; - } + this.name = initServerName(properties); + } else { if (StringUtils.isBlank(endpoint)) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank"); } this.isFixed = false; - if (StringUtils.isBlank(namespace)) { - this.name = endpoint; - this.addressServerUrl = String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort, - ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName); + this.name = initServerName(properties); + initAddressServerUrl(properties); + } + + } + + private String initServerName(Properties properties) { + String serverName = ""; + //1.user define server name. + if (properties != null && properties.containsKey(PropertyKeyConst.SERVER_NAME)) { + serverName = properties.get(PropertyKeyConst.SERVER_NAME).toString(); + } else { + // if fix url,use fix url join string. + if (isFixed) { + serverName = FIXED_NAME + "-" + (StringUtils.isNotBlank(namespace) ? (StringUtils.trim(namespace) + "-") + : "") + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()])); } else { - this.namespace = namespace; - this.tenant = namespace; - this.name = this.endpoint + "-" + namespace; - this.addressServerUrl = String - .format("http://%s:%d%s/%s?namespace=%s", this.endpoint, this.endpointPort, - ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace); + //if use endpoint , use endpoint ,content path ,serverlist name + serverName = CUSTOM_NAME + "-" + String + .join("_", endpoint, String.valueOf(endpointPort), contentPath, serverListName) + ( + StringUtils.isNotBlank(namespace) ? ("_" + StringUtils.trim(namespace)) : ""); } } + serverName.replaceAll("\\/", "_"); + serverName.replaceAll("\\:", "_"); + + return serverName; + } + + private void initAddressServerUrl(Properties properties) { + if (isFixed) { + return; + } + StringBuilder addressServerUrlTem = new StringBuilder( + String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort, + ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName)); + boolean hasQueryString = false; + if (StringUtils.isNotBlank(namespace)) { + addressServerUrlTem.append("?namespace=" + namespace); + hasQueryString = false; + } + if (properties != null && properties.containsKey(PropertyKeyConst.ENDPOINT_QUERY_PARAMS)) { + addressServerUrlTem + .append(hasQueryString ? "&" : "?" + properties.get(PropertyKeyConst.ENDPOINT_QUERY_PARAMS)); + + } + + this.addressServerUrl = addressServerUrlTem.toString(); + LOGGER.info("serverName = {}, address server url = {}", this.name, this.addressServerUrl); } private void initParam(Properties properties) { diff --git a/client/src/main/resources/nacos-logback.xml b/client/src/main/resources/nacos-logback.xml index c24b41534..b247069cb 100644 --- a/client/src/main/resources/nacos-logback.xml +++ b/client/src/main/resources/nacos-logback.xml @@ -81,6 +81,10 @@ + + + diff --git a/client/src/test/java/com/alibaba/nacos/client/config/http/ServerHttpAgentTest.java b/client/src/test/java/com/alibaba/nacos/client/config/http/ServerHttpAgentTest.java index 634946aeb..09b953554 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/http/ServerHttpAgentTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/http/ServerHttpAgentTest.java @@ -59,7 +59,7 @@ public class ServerHttpAgentTest { Assert.assertNull(encode); Assert.assertEquals("namespace1", namespace); Assert.assertEquals("namespace1", tenant); - Assert.assertEquals("aaa-namespace1", name); + Assert.assertEquals("custom-aaa_8080_nacos_serverlist_namespace1", name); } diff --git a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java index 4c9657d59..cf99559f0 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java @@ -172,7 +172,7 @@ public class ClientWorkerTest { agent1.setAccessible(false); Assert.assertTrue(clientWorker.isHealthServer()); - Assert.assertEquals("config_rpc_client", clientWorker.getAgentName()); + Assert.assertEquals(null, clientWorker.getAgentName()); } } diff --git a/client/src/test/java/com/alibaba/nacos/client/config/impl/ServerListManagerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/impl/ServerListManagerTest.java index 583a7e2af..600f725d7 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/impl/ServerListManagerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/impl/ServerListManagerTest.java @@ -36,7 +36,7 @@ public class ServerListManagerTest { Assert.fail(); } catch (NacosException e) { Assert.assertEquals( - "fail to get NACOS-server serverlist! env:custom-localhost-0, not connnect url:http://localhost:0/nacos/serverlist", + "fail to get NACOS-server serverlist! env:custom-localhost_0_nacos_serverlist, not connnect url:http://localhost:0/nacos/serverlist", e.getErrMsg()); } mgr.shutdown(); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java index ab28354d1..2d71b9e46 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java @@ -79,26 +79,31 @@ public class RpcClientFactory { /** * create a rpc client. * - * @param clientName client name. - * @param connectionType client type. + * @param clientName client name. + * @param connectionType client type. * @param threadPoolCoreSize grpc thread pool core size - * @param threadPoolMaxSize grpc thread pool max size + * @param threadPoolMaxSize grpc thread pool max size * @return rpc client. */ - public static RpcClient createClient(String clientName, ConnectionType connectionType, - Integer threadPoolCoreSize, Integer threadPoolMaxSize, - Map labels) { + public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, + Integer threadPoolMaxSize, Map labels) { if (!ConnectionType.GRPC.equals(connectionType)) { throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType()); } - + return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> { LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName); - GrpcClient client = new GrpcSdkClient(clientNameInner); - client.setThreadPoolCoreSize(threadPoolCoreSize); - client.setThreadPoolMaxSize(threadPoolMaxSize); - client.labels(labels); - return client; + try { + GrpcClient client = new GrpcSdkClient(clientNameInner); + client.setThreadPoolCoreSize(threadPoolCoreSize); + client.setThreadPoolMaxSize(threadPoolMaxSize); + client.labels(labels); + return client; + } catch (Throwable throwable) { + LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable); + throw throwable; + } + }); } @@ -117,19 +122,18 @@ public class RpcClientFactory { /** * create a rpc client. * - * @param clientName client name. - * @param connectionType client type. + * @param clientName client name. + * @param connectionType client type. * @param threadPoolCoreSize grpc thread pool core size - * @param threadPoolMaxSize grpc thread pool max size + * @param threadPoolMaxSize grpc thread pool max size * @return rpc client. */ public static RpcClient createClusterClient(String clientName, ConnectionType connectionType, - Integer threadPoolCoreSize, Integer threadPoolMaxSize, - Map labels) { + Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map labels) { if (!ConnectionType.GRPC.equals(connectionType)) { throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType()); } - + return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> { GrpcClient client = new GrpcClusterClient(clientNameInner); client.setThreadPoolCoreSize(threadPoolCoreSize);