only load grpc config from system one time.

This commit is contained in:
KomachiSion 2022-09-07 11:53:15 +08:00
parent 3d7bda84f7
commit 1e9b18cd75
4 changed files with 113 additions and 114 deletions

View File

@ -32,7 +32,6 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.PayloadRegistry;
import com.alibaba.nacos.common.remote.client.grpc.DefaultGrpcClientConfig;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.NumberUtils;
@ -279,8 +278,8 @@ public abstract class RpcClient implements Closeable {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal.poll(rpcClientConfig.connectionKeepAlive(),
TimeUnit.MILLISECONDS);
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
@ -298,8 +297,8 @@ public abstract class RpcClient implements Closeable {
break;
}
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(
rpcClientStatus, RpcClientStatus.UNHEALTHY);
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
@ -366,9 +365,10 @@ public abstract class RpcClient implements Closeable {
}
if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect to server [{}] on start up, connectionId = {}", rpcClientConfig.name(),
connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
@ -402,8 +402,8 @@ public abstract class RpcClient implements Closeable {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
ServerInfo serverInfo = resolveServerInfo(
connectResetRequest.getServerIp() + Constants.COLON
+ connectResetRequest.getServerPort());
connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
.getServerPort());
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
@ -439,8 +439,8 @@ public abstract class RpcClient implements Closeable {
while (reTryTimes >= 0) {
reTryTimes--;
try {
Response response = this.currentConnection.request(healthCheckRequest,
rpcClientConfig.healthCheckTimeOut());
Response response = this.currentConnection
.request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut());
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
@ -496,9 +496,10 @@ public abstract class RpcClient implements Closeable {
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect a server [{}], connectionId = {}", rpcClientConfig.name(),
serverInfo.getAddress(), connectionNew.getConnectionId());
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
rpcClientConfig.name(), serverInfo.getAddress(),
connectionNew.getConnectionId());
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
@ -567,8 +568,9 @@ public abstract class RpcClient implements Closeable {
}
} catch (Exception e) {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}",
rpcClientConfig.name(), e);
LoggerUtils
.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", rpcClientConfig.name(),
e);
}
}
@ -701,8 +703,8 @@ public abstract class RpcClient implements Closeable {
Exception exceptionToThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < rpcClientConfig.retryTimes()
&& System.currentTimeMillis() < start + callback.getTimeout()) {
while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < start + callback
.getTimeout()) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
@ -751,8 +753,8 @@ public abstract class RpcClient implements Closeable {
int retryTimes = 0;
long start = System.currentTimeMillis();
Exception exceptionToThrow = null;
while (retryTimes < rpcClientConfig.retryTimes()
&& System.currentTimeMillis() < start + rpcClientConfig.timeOutMills()) {
while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < start + rpcClientConfig
.timeOutMills()) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {

View File

@ -94,8 +94,7 @@ public class RpcClientFactory {
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
GrpcClient client = new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels);
return client;
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels);
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
@ -131,10 +130,9 @@ public class RpcClientFactory {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
GrpcClient client = new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels);
return client;
});
return CLIENT_MAP.computeIfAbsent(clientName,
clientNameInner -> new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize,
labels));
}
}

View File

@ -65,20 +65,32 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
*/
private DefaultGrpcClientConfig(Builder builder) {
this.name = builder.name;
this.retryTimes = builder.retryTimes;
this.timeOutMills = builder.timeOutMills;
this.connectionKeepAlive = builder.connectionKeepAlive;
this.threadPoolKeepAlive = builder.threadPoolKeepAlive;
this.threadPoolCoreSize = builder.threadPoolCoreSize;
this.threadPoolMaxSize = builder.threadPoolMaxSize;
this.serverCheckTimeOut = builder.serverCheckTimeOut;
this.threadPoolQueueSize = builder.threadPoolQueueSize;
this.maxInboundMessageSize = builder.maxInboundMessageSize;
this.channelKeepAlive = builder.channelKeepAlive;
this.healthCheckRetryTimes = builder.healthCheckRetryTimes;
this.healthCheckTimeOut = builder.healthCheckTimeOut;
this.retryTimes = loadIntegerConfig(GrpcConstants.GRPC_RETRY_TIMES, builder.retryTimes);
this.timeOutMills = loadLongConfig(GrpcConstants.GRPC_TIMEOUT_MILLS, builder.timeOutMills);
this.connectionKeepAlive = loadLongConfig(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME,
builder.connectionKeepAlive);
this.threadPoolKeepAlive = loadLongConfig(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME,
builder.threadPoolKeepAlive);
this.threadPoolCoreSize = loadIntegerConfig(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE,
builder.threadPoolCoreSize);
this.threadPoolMaxSize = loadIntegerConfig(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE, builder.threadPoolMaxSize);
this.serverCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT, builder.serverCheckTimeOut);
this.threadPoolQueueSize = loadIntegerConfig(GrpcConstants.GRPC_QUEUESIZE, builder.threadPoolQueueSize);
this.maxInboundMessageSize = loadIntegerConfig(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE,
builder.maxInboundMessageSize);
this.channelKeepAlive = loadIntegerConfig(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME, builder.channelKeepAlive);
this.healthCheckRetryTimes = loadIntegerConfig(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES,
builder.healthCheckRetryTimes);
this.healthCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, builder.healthCheckTimeOut);
this.labels = builder.labels;
}
private int loadIntegerConfig(String key, int builderValue) {
return Integer.getInteger(key, builderValue);
}
private long loadLongConfig(String key, long builderValue) {
return Long.getLong(key, builderValue);
}
@Override
@ -88,74 +100,62 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
@Override
public int retryTimes() {
return Integer.parseInt(
System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_RETRY_TIMES, String.valueOf(this.retryTimes)));
return retryTimes;
}
@Override
public long timeOutMills() {
return Long.parseLong(
System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_TIMEOUT_MILLS, String.valueOf(this.timeOutMills)));
return timeOutMills;
}
@Override
public long connectionKeepAlive() {
return Long.parseLong(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_CONNECT_KEEP_ALIVE_TIME,
String.valueOf(this.connectionKeepAlive)));
return connectionKeepAlive;
}
@Override
public int threadPoolCoreSize() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_CORE_SIZE,
String.valueOf(this.threadPoolCoreSize)));
return threadPoolCoreSize;
}
@Override
public int threadPoolMaxSize() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_MAX_SIZE,
String.valueOf(this.threadPoolMaxSize)));
return threadPoolMaxSize;
}
@Override
public long threadPoolKeepAlive() {
return Long.parseLong(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME,
String.valueOf(this.threadPoolKeepAlive)));
return threadPoolKeepAlive;
}
@Override
public long serverCheckTimeOut() {
return Long.parseLong(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_SERVER_CHECK_TIMEOUT,
String.valueOf(this.serverCheckTimeOut)));
return serverCheckTimeOut;
}
@Override
public int threadPoolQueueSize() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_QUEUESIZE,
String.valueOf(this.threadPoolQueueSize)));
return threadPoolQueueSize;
}
@Override
public int maxInboundMessageSize() {
return Integer.parseInt(
System.getProperty(GrpcConstants.MAX_INBOUND_MESSAGE_SIZE, String.valueOf(this.maxInboundMessageSize)));
return maxInboundMessageSize;
}
@Override
public int channelKeepAlive() {
return Integer.parseInt(
System.getProperty(GrpcConstants.CHANNEL_KEEP_ALIVE_TIME, String.valueOf(this.channelKeepAlive)));
return channelKeepAlive;
}
@Override
public int healthCheckRetryTimes() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES,
String.valueOf(this.healthCheckRetryTimes)));
return healthCheckRetryTimes;
}
@Override
public long healthCheckTimeOut() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT,
String.valueOf(this.healthCheckTimeOut)));
return healthCheckTimeOut;
}
@Override
@ -199,7 +199,7 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
private Builder() {
}
/**
* Set config from properties.
*
@ -207,54 +207,53 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
* @return Builder
*/
public Builder fromProperties(Properties properties) {
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_NAME)) {
this.name = properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_NAME);
if (properties.contains(GrpcConstants.GRPC_NAME)) {
this.name = properties.getProperty(GrpcConstants.GRPC_NAME);
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_RETRY_TIMES)) {
this.retryTimes = Integer.parseInt(properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_RETRY_TIMES));
if (properties.contains(GrpcConstants.GRPC_RETRY_TIMES)) {
this.retryTimes = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_RETRY_TIMES));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_TIMEOUT_MILLS)) {
this.timeOutMills = Long.parseLong(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_TIMEOUT_MILLS));
if (properties.contains(GrpcConstants.GRPC_TIMEOUT_MILLS)) {
this.timeOutMills = Long.parseLong(properties.getProperty(GrpcConstants.GRPC_TIMEOUT_MILLS));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_CONNECT_KEEP_ALIVE_TIME)) {
this.connectionKeepAlive = Long.parseLong(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_CONNECT_KEEP_ALIVE_TIME));
if (properties.contains(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)) {
this.connectionKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME)) {
this.threadPoolKeepAlive = Long.parseLong(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)) {
this.threadPoolKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_CORE_SIZE)) {
this.threadPoolCoreSize = Integer.parseInt(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_CORE_SIZE));
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)) {
this.threadPoolCoreSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_MAX_SIZE)) {
this.threadPoolMaxSize = Integer.parseInt(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_THREADPOOL_MAX_SIZE));
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)) {
this.threadPoolMaxSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_SERVER_CHECK_TIMEOUT)) {
this.serverCheckTimeOut = Long.parseLong(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_SERVER_CHECK_TIMEOUT));
if (properties.contains(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)) {
this.serverCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_QUEUESIZE)) {
this.threadPoolQueueSize = Integer.parseInt(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_QUEUESIZE));
if (properties.contains(GrpcConstants.GRPC_QUEUESIZE)) {
this.threadPoolQueueSize = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_QUEUESIZE));
}
if (properties.contains(GrpcConstants.MAX_INBOUND_MESSAGE_SIZE)) {
this.maxInboundMessageSize = Integer.parseInt(
properties.getProperty(GrpcConstants.MAX_INBOUND_MESSAGE_SIZE));
if (properties.contains(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)) {
this.maxInboundMessageSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE));
}
if (properties.contains(GrpcConstants.CHANNEL_KEEP_ALIVE_TIME)) {
this.channelKeepAlive = Integer.parseInt(properties.getProperty(GrpcConstants.CHANNEL_KEEP_ALIVE_TIME));
if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)) {
this.channelKeepAlive = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer.parseInt(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES));
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES));
}
if (properties.contains(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long.parseLong(
properties.getProperty(GrpcConstants.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT));
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT));
}
return this;
}

View File

@ -38,43 +38,43 @@ public class GrpcConstants {
public static final String NACOS_CLIENT_GRPC = "nacos.remote.client.grpc";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_NAME = NACOS_CLIENT_GRPC + ".name";
public static final String GRPC_NAME = NACOS_CLIENT_GRPC + ".name";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = NACOS_CLIENT_GRPC + ".pool.alive";
public static final String GRPC_THREADPOOL_KEEPALIVETIME = NACOS_CLIENT_GRPC + ".pool.alive";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_THREADPOOL_CORE_SIZE = NACOS_CLIENT_GRPC + ".pool.core.size";
public static final String GRPC_THREADPOOL_CORE_SIZE = NACOS_CLIENT_GRPC + ".pool.core.size";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_RETRY_TIMES = NACOS_CLIENT_GRPC + ".retry.times";
public static final String GRPC_RETRY_TIMES = NACOS_CLIENT_GRPC + ".retry.times";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_TIMEOUT_MILLS = NACOS_CLIENT_GRPC + ".timeout";
public static final String GRPC_TIMEOUT_MILLS = NACOS_CLIENT_GRPC + ".timeout";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_CONNECT_KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".connect.keep.alive";
public static final String GRPC_CONNECT_KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".connect.keep.alive";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_THREADPOOL_MAX_SIZE = NACOS_CLIENT_GRPC + ".pool.max.size";
public static final String GRPC_THREADPOOL_MAX_SIZE = NACOS_CLIENT_GRPC + ".pool.max.size";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_SERVER_CHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".server.check.timeout";
public static final String GRPC_SERVER_CHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".server.check.timeout";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_QUEUESIZE = NACOS_CLIENT_GRPC + ".queue.size";
public static final String GRPC_QUEUESIZE = NACOS_CLIENT_GRPC + ".queue.size";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES = NACOS_CLIENT_GRPC + ".health.retry";
public static final String GRPC_HEALTHCHECK_RETRY_TIMES = NACOS_CLIENT_GRPC + ".health.retry";
@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".health.timeout";
public static final String GRPC_HEALTHCHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".health.timeout";
@GRpcConfigLabel
public static final String MAX_INBOUND_MESSAGE_SIZE = NACOS_CLIENT_GRPC + ".maxinbound.message.size";
public static final String GRPC_MAX_INBOUND_MESSAGE_SIZE = NACOS_CLIENT_GRPC + ".maxinbound.message.size";
@GRpcConfigLabel
public static final String CHANNEL_KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".channel.keep.alive";
public static final String GRPC_CHANNEL_KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".channel.keep.alive";
private static final Set<String> CONFIG_NAMES = new HashSet<>();