fix different client worker share same localsnapshot bug ,add properties to control whether load snaoshot content on startup in CacheData (#8202)

This commit is contained in:
nov.lzf 2022-04-24 13:33:28 +08:00 committed by GitHub
parent 028075138c
commit 2e9c09d367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 125 additions and 75 deletions

View File

@ -29,8 +29,12 @@ public class PropertyKeyConst {
public static final String ENDPOINT = "endpoint";
public static final String ENDPOINT_QUERY_PARAMS = "endpointQueryParams";
public static final String ENDPOINT_PORT = "endpointPort";
public static final String SERVER_NAME = "serverName";
public static final String NAMESPACE = "namespace";
public static final String USERNAME = "username";

View File

@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class CacheData {
private static final Logger LOGGER = LogUtils.logger(CacheData.class);
static final int CONCURRENCY = 5;
static ThreadFactory internalNotifierFactory = r -> {
@ -56,11 +58,16 @@ public class CacheData {
return t;
};
static boolean initSnapshot;
static {
initSnapshot = Boolean.valueOf(System.getProperty("nacos.cache.data.init.snapshot", "true"));
LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot);
}
static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), internalNotifierFactory);
private static final Logger LOGGER = LogUtils.logger(CacheData.class);
private final String name;
private final ConfigFilterChainManager configFilterChainManager;
@ -318,14 +325,14 @@ public class CacheData {
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
dataId, group, md5, listener, (System.currentTimeMillis() - start));
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,
group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
@ -395,8 +402,10 @@ public class CacheData {
this.tenant = TenantUtil.getUserTenantForAcm();
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);
}
@ -412,9 +421,11 @@ public class CacheData {
this.tenant = tenant;
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
}
// ==================

View File

@ -703,7 +703,7 @@ public class ClientWorker implements Closeable {
@Override
public String getName() {
return "config_rpc_client";
return serverListManager.getName();
}
@Override

View File

@ -132,28 +132,28 @@ public class ServerListManager implements Closeable {
for (String serverAddr : fixed) {
String[] serverAddrArr = InternetAddressUtil.splitIPPortStr(serverAddr);
if (serverAddrArr.length == 1) {
serverAddrs.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
serverAddrs
.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
} else {
serverAddrs.add(serverAddr);
}
}
this.serverUrls = new ArrayList<>(serverAddrs);
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()]));
} else {
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-"
+ namespace;
this.tenant = namespace;
}
this.name = initServerName(null);
}
public ServerListManager(String host, int port) {
this.isFixed = false;
this.isStarted = false;
this.name = CUSTOM_NAME + "-" + host + "-" + port;
this.addressServerUrl = String
.format("http://%s:%d%s/%s", host, port, ContextPathUtil.normalizeContextPath(this.contentPath),
this.serverListName);
this.endpoint = host;
this.endpointPort = port;
this.name = initServerName(null);
initAddressServerUrl(null);
}
public ServerListManager(String endpoint) throws NacosException {
@ -165,25 +165,18 @@ public class ServerListManager implements Closeable {
this.isStarted = false;
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ENDPOINT, endpoint);
endpoint = initEndpoint(properties);
this.endpoint = initEndpoint(properties);
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.name = endpoint + "-" + namespace;
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
this.addressServerUrl = String.format("http://%s:%d%s/%s?namespace=%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
}
this.name = initServerName(null);
initAddressServerUrl(properties);
}
public ServerListManager(Properties properties) throws NacosException {
@ -192,6 +185,11 @@ public class ServerListManager implements Closeable {
String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
initParam(properties);
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
}
if (StringUtils.isNotEmpty(serverAddrsStr)) {
this.isFixed = true;
List<String> serverAddrs = new ArrayList<>();
@ -211,33 +209,62 @@ public class ServerListManager implements Closeable {
}
}
this.serverUrls = serverAddrs;
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()])) + "-" + namespace;
}
this.name = initServerName(properties);
} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.isFixed = false;
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
this.name = initServerName(properties);
initAddressServerUrl(properties);
}
}
private String initServerName(Properties properties) {
String serverName = "";
//1.user define server name.
if (properties != null && properties.containsKey(PropertyKeyConst.SERVER_NAME)) {
serverName = properties.get(PropertyKeyConst.SERVER_NAME).toString();
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = this.endpoint + "-" + namespace;
this.addressServerUrl = String
.format("http://%s:%d%s/%s?namespace=%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
// if fix url,use fix url join string.
if (isFixed) {
serverName = FIXED_NAME + "-" + (StringUtils.isNotBlank(namespace) ? (StringUtils.trim(namespace) + "-")
: "") + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()]));
} else {
//if use endpoint , use endpoint ,content path ,serverlist name
serverName = CUSTOM_NAME + "-" + String
.join("_", endpoint, String.valueOf(endpointPort), contentPath, serverListName) + (
StringUtils.isNotBlank(namespace) ? ("_" + StringUtils.trim(namespace)) : "");
}
}
serverName.replaceAll("\\/", "_");
serverName.replaceAll("\\:", "_");
return serverName;
}
private void initAddressServerUrl(Properties properties) {
if (isFixed) {
return;
}
StringBuilder addressServerUrlTem = new StringBuilder(
String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName));
boolean hasQueryString = false;
if (StringUtils.isNotBlank(namespace)) {
addressServerUrlTem.append("?namespace=" + namespace);
hasQueryString = false;
}
if (properties != null && properties.containsKey(PropertyKeyConst.ENDPOINT_QUERY_PARAMS)) {
addressServerUrlTem
.append(hasQueryString ? "&" : "?" + properties.get(PropertyKeyConst.ENDPOINT_QUERY_PARAMS));
}
this.addressServerUrl = addressServerUrlTem.toString();
LOGGER.info("serverName = {}, address server url = {}", this.name, this.addressServerUrl);
}
private void initParam(Properties properties) {

View File

@ -81,6 +81,10 @@
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>
<Logger name="com.alibaba.nacos.shaded.io.grpc" level="${com.alibaba.nacos.log.level:-info}"
additivity="false">
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>
<logger name="com.alibaba.nacos.client.config" level="${com.alibaba.nacos.config.log.level:-info}"
additivity="false">

View File

@ -59,7 +59,7 @@ public class ServerHttpAgentTest {
Assert.assertNull(encode);
Assert.assertEquals("namespace1", namespace);
Assert.assertEquals("namespace1", tenant);
Assert.assertEquals("aaa-namespace1", name);
Assert.assertEquals("custom-aaa_8080_nacos_serverlist_namespace1", name);
}

View File

@ -172,7 +172,7 @@ public class ClientWorkerTest {
agent1.setAccessible(false);
Assert.assertTrue(clientWorker.isHealthServer());
Assert.assertEquals("config_rpc_client", clientWorker.getAgentName());
Assert.assertEquals(null, clientWorker.getAgentName());
}
}

View File

@ -36,7 +36,7 @@ public class ServerListManagerTest {
Assert.fail();
} catch (NacosException e) {
Assert.assertEquals(
"fail to get NACOS-server serverlist! env:custom-localhost-0, not connnect url:http://localhost:0/nacos/serverlist",
"fail to get NACOS-server serverlist! env:custom-localhost_0_nacos_serverlist, not connnect url:http://localhost:0/nacos/serverlist",
e.getErrMsg());
}
mgr.shutdown();

View File

@ -85,20 +85,25 @@ public class RpcClientFactory {
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
GrpcClient client = new GrpcSdkClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
return client;
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
});
}
@ -124,8 +129,7 @@ public class RpcClientFactory {
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}