fix query sla multi connection model ;add rpc client error log; optimize client shutdown (#5949)

* fix multi connection model ,query sla

* add rpc client error log

* optimize client shutdown

* optimize client log
This commit is contained in:
nov.lzf 2021-06-04 15:27:32 +08:00 committed by GitHub
parent 0e4d13e789
commit 55c6f44413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 32 deletions

View File

@ -68,6 +68,7 @@ import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -519,7 +520,9 @@ public class ClientWorker implements Closeable {
public void shutdown() throws NacosException {
String className = this.getClass().getName();
LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(agent.executor, LOGGER);
if (agent != null) {
agent.shutdown();
}
LOGGER.info("{} do shutdown stop", className);
}
@ -573,6 +576,37 @@ public class ClientWorker implements Closeable {
}
@Override
public void shutdown() {
synchronized (RpcClientFactory.getAllClientEntries()) {
LOGGER.info("Trying to shutdown transport client " + this);
Set<Map.Entry<String, RpcClient>> allClientEntries = RpcClientFactory.getAllClientEntries();
Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, RpcClient> entry = iterator.next();
if (entry.getKey().startsWith(uuid)) {
LOGGER.info("Trying to shutdown rpc client " + entry.getKey());
try {
entry.getValue().shutdown();
} catch (NacosException nacosException) {
nacosException.printStackTrace();
}
LOGGER.info("Remove rpc client " + entry.getKey());
iterator.remove();
}
}
LOGGER.info("Shutdown executor " + executor);
executor.shutdown();
Map<String, CacheData> stringCacheDataMap = cacheMap.get();
for (Map.Entry<String, CacheData> entry : stringCacheDataMap.entrySet()) {
entry.getValue().setSyncWithServer(false);
}
}
}
private Map<String, String> getLabels() {
Map<String, String> labels = new HashMap<String, String>(2, 1);
@ -845,21 +879,25 @@ public class ClientWorker implements Closeable {
}
}
private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException {
Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<String, String>(labels);
newLabels.put("taskId", taskId);
RpcClient rpcClient = RpcClientFactory
.createClient("config-" + taskId + "-" + uuid, getConnectionType(), newLabels);
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
rpcClient.start();
private RpcClient ensureRpcClient(String taskId) throws NacosException {
synchronized (ClientWorker.this) {
Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<String, String>(labels);
newLabels.put("taskId", taskId);
RpcClient rpcClient = RpcClientFactory
.createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
rpcClient.start();
}
return rpcClient;
}
return rpcClient;
}
private ClientAbilities initAbilities() {
@ -909,8 +947,14 @@ public class ClientWorker implements Closeable {
throws NacosException {
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
request.putHeader("notify", String.valueOf(notify));
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(getOneRunningClient(), request,
readTimeouts);
RpcClient rpcClient = getOneRunningClient();
if (notify) {
CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
if (cacheData != null) {
rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
}
}
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
ConfigResponse configResponse = new ConfigResponse();
if (response.isSuccess()) {

View File

@ -73,6 +73,10 @@ public abstract class ConfigTransportClient {
private volatile StsCredential stsCredential;
public void shutdown() {
}
public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);

View File

@ -401,18 +401,6 @@ public abstract class RpcClient implements Closeable {
return null;
}
});
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
RpcClient.this.shutdown();
} catch (NacosException e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]RpcClient shutdown exception, errorMessage ={}", name,
e.getMessage());
}
}
});
}

View File

@ -135,7 +135,7 @@ public abstract class GrpcClient extends RpcClient {
* @param requestBlockingStub requestBlockingStub used to check server.
* @return success or not
*/
private Response serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) {
private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
try {
if (requestBlockingStub == null) {
return null;
@ -147,6 +147,8 @@ public abstract class GrpcClient extends RpcClient {
//receive connection unregister response here,not check response is success.
return (Response) GrpcUtils.parse(response);
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
return null;
}
}
@ -259,11 +261,11 @@ public abstract class GrpcClient extends RpcClient {
grpcExecutor.allowCoreThreadTimeOut(true);
}
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(),
serverInfo.getServerPort() + rpcPortOffset());
int port = serverInfo.getServerPort() + rpcPortOffset();
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
if (newChannelStubTemp != null) {
Response response = serverCheck(newChannelStubTemp);
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
return null;