From afb1a2813546d2b90649301f1cbddab87883a921 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Wed, 6 Jan 2021 21:25:14 +0800 Subject: [PATCH] log optimize; config re sync request (#4638) --- .../request/ConfigChangeNotifyRequest.java | 192 +----------------- .../remote/request/ConfigReSyncRequest.java | 109 ++++++++++ .../remote/response/ConfigReSyncResponse.java | 28 +++ .../request/NotifySubscriberRequest.java | 4 +- .../remote/request/ConnectResetRequest.java | 2 +- .../remote/request/ServerReloadRequest.java | 10 + ...verPushRequest.java => ServerRequest.java} | 2 +- .../client/config/impl/ClientWorker.java | 30 ++- .../nacos/common/remote/client/RpcClient.java | 77 ++++--- .../common/remote/client/grpc/GrpcClient.java | 38 ++-- .../controller/CommunicationController.java | 46 ++++- .../remote/RpcConfigChangeNotifier.java | 24 +-- .../config/server/utils/PropertyUtil.java | 23 --- .../controller/ServerLoaderController.java | 118 +++++++++-- .../nacos/core/remote/ConnectionManager.java | 12 +- .../nacos/core/remote/RpcPushService.java | 6 +- .../core/ServerReloaderRequestHandler.java | 5 +- 17 files changed, 393 insertions(+), 333 deletions(-) create mode 100644 api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigReSyncRequest.java create mode 100644 api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigReSyncResponse.java rename api/src/main/java/com/alibaba/nacos/api/remote/request/{ServerPushRequest.java => ServerRequest.java} (93%) diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java index 12fd140f0..0377ce2c9 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java @@ -16,75 +16,13 @@ package com.alibaba.nacos.api.config.remote.request; -import com.alibaba.nacos.api.remote.request.ServerPushRequest; - -import java.util.List; - /** * ConfigChangeNotifyRequest. + * * @author liuzunfei * @version $Id: ConfigChangeNotifyRequest.java, v 0.1 2020年07月14日 3:20 PM liuzunfei Exp $ */ -public class ConfigChangeNotifyRequest extends ServerPushRequest { - - private String dataId; - - private String group; - - private String tenant; - - private boolean beta; - - private List betaIps; - - private String content; - - private String type; - - private boolean contentPush; - - public long lastModifiedTs; - - @Override - public String getModule() { - return "config"; - } - - /** - * Getter method for property contentPush. - * - * @return property value of contentPush - */ - public boolean isContentPush() { - return contentPush; - } - - /** - * Setter method for property contentPush. - * - * @param contentPush value to be assigned to property contentPush - */ - public void setContentPush(boolean contentPush) { - this.contentPush = contentPush; - } - - /** - * Getter method for property lastModifiedTs. - * - * @return property value of lastModifiedTs - */ - public long getLastModifiedTs() { - return lastModifiedTs; - } - - /** - * Setter method for property lastModifiedTs. - * - * @param lastModifiedTs value to be assigned to property lastModifiedTs - */ - public void setLastModifiedTs(long lastModifiedTs) { - this.lastModifiedTs = lastModifiedTs; - } +public class ConfigChangeNotifyRequest extends ConfigReSyncRequest { /** * build success response. @@ -101,130 +39,4 @@ public class ConfigChangeNotifyRequest extends ServerPushRequest { response.setTenant(tenant); return response; } - - /** - * Getter method for property dataId. - * - * @return property value of dataId - */ - public String getDataId() { - return dataId; - } - - /** - * Setter method for property dataId. - * - * @param dataId value to be assigned to property dataId - */ - public void setDataId(String dataId) { - this.dataId = dataId; - } - - /** - * Getter method for property group. - * - * @return property value of group - */ - public String getGroup() { - return group; - } - - /** - * Setter method for property group. - * - * @param group value to be assigned to property group - */ - public void setGroup(String group) { - this.group = group; - } - - /** - * Getter method for property tenant. - * - * @return property value of tenant - */ - public String getTenant() { - return tenant; - } - - /** - * Setter method for property tenant. - * - * @param tenant value to be assigned to property tenant - */ - public void setTenant(String tenant) { - this.tenant = tenant; - } - - /** - * Getter method for property beta. - * - * @return property value of beta - */ - public boolean isBeta() { - return beta; - } - - /** - * Setter method for property beta. - * - * @param beta value to be assigned to property beta - */ - public void setBeta(boolean beta) { - this.beta = beta; - } - - /** - * Getter method for property betaIps. - * - * @return property value of betaIps - */ - public List getBetaIps() { - return betaIps; - } - - /** - * Setter method for property betaIps. - * - * @param betaIps value to be assigned to property betaIps - */ - public void setBetaIps(List betaIps) { - this.betaIps = betaIps; - } - - /** - * Getter method for property content. - * - * @return property value of content - */ - public String getContent() { - return content; - } - - /** - * Setter method for property content. - * - * @param content value to be assigned to property content - */ - public void setContent(String content) { - this.content = content; - } - - /** - * Getter method for property type. - * - * @return property value of type - */ - public String getType() { - return type; - } - - /** - * Setter method for property type. - * - * @param type value to be assigned to property type - */ - public void setType(String type) { - this.type = type; - } } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigReSyncRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigReSyncRequest.java new file mode 100644 index 000000000..d0d3b9322 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigReSyncRequest.java @@ -0,0 +1,109 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.config.remote.request; + +import com.alibaba.nacos.api.remote.request.ServerRequest; + +/** + * ConfigReSyncRequest. + * + * @author liuzunfei + * @version $Id: ConfigReSyncRequest.java, v 0.1 2020年07月14日 3:20 PM liuzunfei Exp $ + */ +public class ConfigReSyncRequest extends ServerRequest { + + private String dataId; + + private String group; + + private String tenant; + + @Override + public String getModule() { + return "config"; + } + + /** + * build success response. + * + * @param dataId dataId + * @param group group + * @param tenant tenant + * @return ConfigReSyncRequest + */ + public static ConfigReSyncRequest build(String dataId, String group, String tenant) { + ConfigReSyncRequest response = new ConfigReSyncRequest(); + response.setDataId(dataId); + response.setGroup(group); + response.setTenant(tenant); + return response; + } + + /** + * Getter method for property dataId. + * + * @return property value of dataId + */ + public String getDataId() { + return dataId; + } + + /** + * Setter method for property dataId. + * + * @param dataId value to be assigned to property dataId + */ + public void setDataId(String dataId) { + this.dataId = dataId; + } + + /** + * Getter method for property group. + * + * @return property value of group + */ + public String getGroup() { + return group; + } + + /** + * Setter method for property group. + * + * @param group value to be assigned to property group + */ + public void setGroup(String group) { + this.group = group; + } + + /** + * Getter method for property tenant. + * + * @return property value of tenant + */ + public String getTenant() { + return tenant; + } + + /** + * Setter method for property tenant. + * + * @param tenant value to be assigned to property tenant + */ + public void setTenant(String tenant) { + this.tenant = tenant; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigReSyncResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigReSyncResponse.java new file mode 100644 index 000000000..9989ec482 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigReSyncResponse.java @@ -0,0 +1,28 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.config.remote.response; + +import com.alibaba.nacos.api.remote.response.Response; + +/** + * config change notify response from client. + * @author liuzunfei + * @version $Id: ConfigChangeNotifyResponse.java, v 0.1 2020年09月01日 2:59 PM liuzunfei Exp $ + */ +public class ConfigReSyncResponse extends Response { + +} diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NotifySubscriberRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NotifySubscriberRequest.java index 9196cefc5..537c5099a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NotifySubscriberRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NotifySubscriberRequest.java @@ -17,14 +17,14 @@ package com.alibaba.nacos.api.naming.remote.request; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; -import com.alibaba.nacos.api.remote.request.ServerPushRequest; +import com.alibaba.nacos.api.remote.request.ServerRequest; /** * Notify subscriber response. * * @author xiweng.yy */ -public class NotifySubscriberRequest extends ServerPushRequest { +public class NotifySubscriberRequest extends ServerRequest { private String namespace; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java index 46909b9f5..99bd52e10 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java @@ -22,7 +22,7 @@ package com.alibaba.nacos.api.remote.request; * @author liuzunfei * @version $Id: ConnectResetResponse.java, v 0.1 2020年07月15日 11:11 AM liuzunfei Exp $ */ -public class ConnectResetRequest extends ServerPushRequest { +public class ConnectResetRequest extends ServerRequest { String serverIp; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerReloadRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerReloadRequest.java index 6754ab4b7..96be66dcd 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerReloadRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerReloadRequest.java @@ -26,6 +26,8 @@ public class ServerReloadRequest extends InternalRequest { int reloadCount = 0; + String reloadServer; + /** * Getter method for property reloadCount. * @@ -43,4 +45,12 @@ public class ServerReloadRequest extends InternalRequest { public void setReloadCount(int reloadCount) { this.reloadCount = reloadCount; } + + public String getReloadServer() { + return reloadServer; + } + + public void setReloadServer(String reloadServer) { + this.reloadServer = reloadServer; + } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerPushRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerRequest.java similarity index 93% rename from api/src/main/java/com/alibaba/nacos/api/remote/request/ServerPushRequest.java rename to api/src/main/java/com/alibaba/nacos/api/remote/request/ServerRequest.java index 4501f6649..447aa9c3e 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerPushRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ServerRequest.java @@ -23,6 +23,6 @@ package com.alibaba.nacos.api.remote.request; * @version $Id: ServerPushResponse.java, v 0.1 2020年07月20日 1:21 PM liuzunfei Exp $ */ @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class ServerPushRequest extends Request { +public abstract class ServerRequest extends Request { } diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 24bb2c6be..d70b963eb 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -24,11 +24,13 @@ import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest; import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest; import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest; import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest; +import com.alibaba.nacos.api.config.remote.request.ConfigReSyncRequest; import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest; import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse; import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse; import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse; import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse; +import com.alibaba.nacos.api.config.remote.response.ConfigReSyncResponse; import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; @@ -525,7 +527,7 @@ public class ClientWorker implements Closeable { public class ConfigRpcTransportClient extends ConfigTransportClient { - private BlockingQueue listenExecutebell = new ArrayBlockingQueue(1); + private final BlockingQueue listenExecutebell = new ArrayBlockingQueue(1); private Object bellItem = new Object(); @@ -557,29 +559,25 @@ public class ClientWorker implements Closeable { private void initRpcClientHandler(final RpcClient rpcClientInner) { /* - * Register Listen Change Handler + * Register Config Change /Config ReSync Handler */ rpcClientInner.registerServerRequestHandler((request, requestMeta) -> { - if (request instanceof ConfigChangeNotifyRequest) { - ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; - LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(), - configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup()); + if (request instanceof ConfigChangeNotifyRequest || request instanceof ConfigReSyncRequest) { + ConfigReSyncRequest configReSyncRequest = (ConfigReSyncRequest) request; + LOGGER.info("[{}] [server-push] config {}. dataId={}, group={}", rpcClientInner.getName(), + (request instanceof ConfigChangeNotifyRequest) ? "changed" : "re sync", + configReSyncRequest.getDataId(), configReSyncRequest.getGroup()); String groupKey = GroupKey - .getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), - configChangeNotifyRequest.getTenant()); + .getKeyTenant(configReSyncRequest.getDataId(), configReSyncRequest.getGroup(), + configReSyncRequest.getTenant()); CacheData cacheData = cacheMap.get().get(groupKey); if (cacheData != null) { - if (configChangeNotifyRequest.isContentPush() - && cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) { - cacheData.setContent(configChangeNotifyRequest.getContent()); - cacheData.setType(configChangeNotifyRequest.getType()); - cacheData.checkListenerMd5(); - } cacheData.setSync(false); notifyListenConfig(); } - return new ConfigChangeNotifyResponse(); + return (request instanceof ConfigChangeNotifyRequest) ? new ConfigChangeNotifyResponse() + : new ConfigReSyncResponse(); } return null; }); @@ -746,7 +744,7 @@ public class ClientWorker implements Closeable { changeConfig.getTenant()); changeKeys.add(changeKey); boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); - this.executor.execute(() -> refreshContentAndCheck(changeKey, !isInitializing)); + refreshContentAndCheck(changeKey, !isInitializing); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 121150913..1e4b279c5 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -130,12 +130,12 @@ public abstract class RpcClient implements Closeable { if (connectionEventListeners.isEmpty()) { return; } - LoggerUtils.printIfInfoEnabled(LOGGER, "Notify disconnected event to listeners"); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify disconnected event to listeners", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onDisConnect(); } catch (Throwable throwable) { - LoggerUtils.printIfErrorEnabled(LOGGER, "notify disconnect listener error,listener ={}", + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify disconnect listener error,listener ={}", name, connectionEventListener.getClass().getName()); } } @@ -148,12 +148,12 @@ public abstract class RpcClient implements Closeable { if (connectionEventListeners.isEmpty()) { return; } - LoggerUtils.printIfInfoEnabled(LOGGER, "Notify connected event to listeners."); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify connected event to listeners.", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onConnected(); } catch (Throwable throwable) { - LoggerUtils.printIfErrorEnabled(LOGGER, "notify connect listener error,listener ={}", + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify connect listener error,listener ={}", name, connectionEventListener.getClass().getName()); } } @@ -198,7 +198,7 @@ public abstract class RpcClient implements Closeable { this.serverListFactory = serverListFactory; rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); - LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init, ServerListFactory ={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name, serverListFactory.getClass().getName()); } @@ -209,7 +209,7 @@ public abstract class RpcClient implements Closeable { */ public void initLabels(Map labels) { this.labels.putAll(labels); - LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init label, labels={}", this.labels); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels); } /** @@ -262,20 +262,19 @@ public abstract class RpcClient implements Closeable { startUpRetryTimes--; ServerInfo serverInfo = nextRpcServer(); - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] try to connect to server on start up, server: %s", name, serverInfo)); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to connect to server on start up, server: {}", name, + serverInfo); connectToServer = connectToServer(serverInfo); } catch (Exception e) { - LoggerUtils.printIfWarnEnabled(LOGGER, String.format( - "Fail to connect to server on start up, error message=%s, start up retry times left: %s", - e.getMessage(), startUpRetryTimes)); + LoggerUtils.printIfWarnEnabled(LOGGER, + "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}", + name, e.getMessage(), startUpRetryTimes); } } if (connectToServer != null) { - LoggerUtils - .printIfInfoEnabled(LOGGER, String.format("[%s] success to connect to server on start up", name)); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect to server on start up", name); this.currentConnection = connectToServer; rpcClientStatus.set(RpcClientStatus.RUNNING); eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED)); @@ -290,7 +289,7 @@ public abstract class RpcClient implements Closeable { try { RpcClient.this.shutdown(); } catch (NacosException e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "RpcClient shutdown exception, errorMessage ={}", + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]RpcClient shutdown exception, errorMessage ={}", name, e.getMessage()); } @@ -325,7 +324,7 @@ public abstract class RpcClient implements Closeable { } } } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e); + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Switch server error ,{}", name, e); } return new ConnectResetResponse(); } @@ -372,9 +371,8 @@ public abstract class RpcClient implements Closeable { if (switchingFlag.get()) { return; } - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] Submit server switch task : %s,onRequestFail=%s", name, recommendServerInfo, - onRequestFail)); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Submit server switch task : {},onRequestFail={}", name, + recommendServerInfo, onRequestFail); executor.submit(new Runnable() { @Override public void run() { @@ -389,14 +387,13 @@ public abstract class RpcClient implements Closeable { } if (onRequestFail && serverCheck()) { - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] Server check success : %s", name, recommendServer)); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success : {}", name, recommendServer); rpcClientStatus.set(RpcClientStatus.RUNNING); return; } - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] Execute server switch task : %s", name, recommendServer)); + LoggerUtils + .printIfInfoEnabled(LOGGER, "[{}] Execute server switch task : {}", name, recommendServer); switchingFlag.compareAndSet(false, true); // loop until start client success. @@ -414,8 +411,8 @@ public abstract class RpcClient implements Closeable { //2.create a new channel to new server Connection connectionNew = connectToServer(serverInfo); if (connectionNew != null) { - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] success to connect server : %s", name, serverInfo)); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect server : {}", name, + serverInfo); //successfully create a new connect. if (currentConnection != null) { //set current connection to enable connection event. @@ -445,9 +442,9 @@ public abstract class RpcClient implements Closeable { if (reConnectTimes > 0 && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) { - LoggerUtils.printIfInfoEnabled(LOGGER, String.format( - "[%s] fail to connect server,after trying %s times, last try server is %s", name, - reConnectTimes, serverInfo)); + LoggerUtils.printIfInfoEnabled(LOGGER, + "[{}] fail to connect server,after trying {} times, last try server is {}", name, + reConnectTimes, serverInfo); if (Integer.MAX_VALUE == retryTurns) { retryTurns = 50; } else { @@ -469,12 +466,12 @@ public abstract class RpcClient implements Closeable { } if (isShutdown()) { - LoggerUtils.printIfInfoEnabled(LOGGER, - String.format("[%s] client is shutdown ,stop reconnect to server", name)); + LoggerUtils + .printIfInfoEnabled(LOGGER, "[{}] client is shutdown ,stop reconnect to server", name); } } catch (Exception e) { - LoggerUtils.printIfWarnEnabled(LOGGER, String.format("[%s] fail to connect to server", name)); + LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] fail to connect to server", name); } finally { switchingFlag.set(false); switchingLock.unlock(); @@ -606,9 +603,9 @@ public abstract class RpcClient implements Closeable { && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { // Do nothing. } else { - LoggerUtils - .printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}", - request, retryTimes, e.getMessage()); + LoggerUtils.printIfErrorEnabled(LOGGER, + "[{}]send request fail, request={}, retryTimes={},errorMessage={}", name, request, + retryTimes, e.getMessage()); } } retryTimes++; @@ -649,9 +646,9 @@ public abstract class RpcClient implements Closeable { && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { // Do nothing. } else { - LoggerUtils - .printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}", - request, retryTimes, e.getMessage()); + LoggerUtils.printIfErrorEnabled(LOGGER, + "[{}]send request fail, request={}, retryTimes={},errorMessage={}", name, request, + retryTimes, e.getMessage()); } } } @@ -685,7 +682,7 @@ public abstract class RpcClient implements Closeable { */ protected Response handleServerRequest(final Request request, final RequestMeta meta) { - LoggerUtils.printIfInfoEnabled(LOGGER, "receive server push request,request={},requestId={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name, request.getClass().getSimpleName(), request.getRequestId()); for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { try { @@ -694,7 +691,7 @@ public abstract class RpcClient implements Closeable { return response; } } catch (Exception e) { - LoggerUtils.printIfInfoEnabled(LOGGER, "handleServerRequest:{}, errorMessage={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]handleServerRequest:{}, errorMessage={}", name, serverRequestHandler.getClass().getName(), e.getMessage()); } @@ -709,7 +706,7 @@ public abstract class RpcClient implements Closeable { */ public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) { - LoggerUtils.printIfInfoEnabled(LOGGER, "Registry connection listener to current client:{}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName()); this.connectionEventListeners.add(connectionEventListener); } @@ -720,7 +717,7 @@ public abstract class RpcClient implements Closeable { * @param serverRequestHandler serverRequestHandler */ public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) { - LoggerUtils.printIfInfoEnabled(LOGGER, "Register server push request handler:{}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Register server push request handler:{}", name, serverRequestHandler.getClass().getName()); this.serverRequestHandlers.add(serverRequestHandler); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 06941c421..ae3f1d181 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -127,8 +127,8 @@ public abstract class GrpcClient extends RpcClient { @Override public void onNext(Payload payload) { - LoggerUtils.printIfDebugEnabled(LOGGER, "Stream server request receive, original info: {}", - payload.toString()); + LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", + GrpcClient.this.getName(), payload.toString()); try { GrpcUtils.PlainRequest parse = GrpcUtils.parse(payload); final Request request = (Request) parse.getBody(); @@ -140,13 +140,14 @@ public abstract class GrpcClient extends RpcClient { response.setRequestId(request.getRequestId()); sendResponse(response); } else { - LOGGER.warn("Fail to process server request, ackId->{}", request.getRequestId()); + LOGGER.warn("[{}]Fail to process server request, ackId->{}", GrpcClient.this.getName(), + request.getRequestId()); } } catch (Exception e) { - LoggerUtils - .printIfErrorEnabled(LOGGER, e.getMessage(), "Handle server request exception: {}", - payload.toString()); + LoggerUtils.printIfErrorEnabled(LOGGER, e.getMessage(), + "[{}]Handle server request exception: {}", GrpcClient.this.getName(), + payload.toString()); sendResponse(request.getRequestId(), false); } @@ -154,8 +155,8 @@ public abstract class GrpcClient extends RpcClient { } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Error tp process server push response: {}", - payload.getBody().getValue().toStringUtf8()); + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error tp process server push response: {}", + GrpcClient.this.getName(), payload.getBody().getValue().toStringUtf8()); } } @@ -164,7 +165,8 @@ public abstract class GrpcClient extends RpcClient { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server,error={}", throwable); + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", + GrpcClient.this.getName(), throwable); if (throwable instanceof StatusRuntimeException) { Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode(); if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) { @@ -174,8 +176,8 @@ public abstract class GrpcClient extends RpcClient { } } } else { - LoggerUtils.printIfWarnEnabled(LOGGER, "ignore error event,isRunning:{},isAbandon={}", isRunning, - isAbandon); + LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]ignore error event,isRunning:{},isAbandon={}", + GrpcClient.this.getName(), isRunning, isAbandon); } } @@ -185,13 +187,14 @@ public abstract class GrpcClient extends RpcClient { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream onCompleted, switch server"); + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", + GrpcClient.this.getName()); if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } else { - LoggerUtils.printIfInfoEnabled(LOGGER, "ignore complete event,isRunning:{},isAbandon={}", isRunning, - isAbandon); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ignore complete event,isRunning:{},isAbandon={}", + GrpcClient.this.getName(), isRunning, isAbandon); } } @@ -203,7 +206,7 @@ public abstract class GrpcClient extends RpcClient { PushAckRequest request = PushAckRequest.build(ackId, success); this.currentConnection.request(request, buildMeta()); } catch (Exception e) { - LOGGER.error("Error to send ack response, ackId->{}", ackId); + LOGGER.error("[{}]Error to send ack response, ackId->{}", GrpcClient.this.getName(), ackId); } } @@ -211,7 +214,8 @@ public abstract class GrpcClient extends RpcClient { try { ((GrpcConnection) this.currentConnection).sendResponse(response); } catch (Exception e) { - LOGGER.error("Error to send ack response, ackId->{}", response.getRequestId()); + LOGGER.error("[{}]Error to send ack response, ackId->{}", GrpcClient.this.getName(), + response.getRequestId()); } } @@ -242,7 +246,7 @@ public abstract class GrpcClient extends RpcClient { } return null; } catch (Exception e) { - LOGGER.error("Fail to connect to server!", e); + LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); } return null; } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java index 78e8f8c15..6bfe46886 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.config.server.controller; +import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.model.SampleResult; import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext; @@ -27,6 +30,7 @@ import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.ui.ModelMap; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -35,6 +39,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,7 +104,7 @@ public class CommunicationController { group = StringUtils.isBlank(group) ? Constants.DEFAULT_GROUP : group; // long polling listners. SampleResult result = longPollingService.getCollectSubscribleInfo(dataId, group, tenant); - // rpc listerns. + // rpc listeners. String groupKey = GroupKey2.getKey(dataId, group, tenant); Set listenersClients = configChangeListenContext.getListeners(groupKey); SampleResult rpcSample = new SampleResult(); @@ -136,4 +141,43 @@ public class CommunicationController { return result; } + + /** + * Notify client to check config from server. + */ + @GetMapping("/watcherSyncConfig") + public ResponseEntity watcherSyncConfig(@RequestParam("dataId") String dataId, @RequestParam("group") String group, + @RequestParam(value = "tenant", required = false) String tenant, + @RequestParam(value = "clientIp", required = false) String clientIp, ModelMap modelMap) { + String groupKey = GroupKey2.getKey(dataId, group, tenant); + Set listenersClients = configChangeListenContext.getListeners(groupKey); + List listeners = new ArrayList<>(); + for (String connectionId : listenersClients) { + Connection connection = connectionManager.getConnection(connectionId); + if (connection != null) { + if (StringUtils.isNotBlank(clientIp) && !connection.getMetaInfo().getClientIp().equals(clientIp)) { + continue; + } + listeners.add(connection); + } + + } + if (!listeners.isEmpty()) { + ConfigChangeNotifyRequest notifyRequest = new ConfigChangeNotifyRequest(); + notifyRequest.setDataId(dataId); + notifyRequest.setGroup(group); + notifyRequest.setTenant(tenant); + for (Connection connectionByIp : listeners) { + try { + connectionByIp.request(notifyRequest, new RequestMeta()); + } catch (NacosException e) { + e.printStackTrace(); + } + + } + } + return ResponseEntity.ok().body(trueStr); + + } + } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java index 5f1230bbd..b9425ecc6 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java @@ -26,7 +26,6 @@ import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent; import com.alibaba.nacos.config.server.utils.ConfigExecutor; import com.alibaba.nacos.config.server.utils.GroupKey; -import com.alibaba.nacos.config.server.utils.PropertyUtil; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.RpcPushService; @@ -63,10 +62,11 @@ public class RpcConfigChangeNotifier extends Subscriber { /** * adaptor to config module ,when server side config change ,invoke this method. * - * @param groupKey groupKey - * @param notifyRequet notifyRequet + * @param groupKey groupKey + * @param notifyRequest notifyRequest */ - public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequet) { + public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequest, boolean isBeta, + List betaIps) { Set listeners = configChangeListenContext.getListeners(groupKey); if (CollectionUtils.isEmpty(listeners)) { @@ -80,14 +80,13 @@ public class RpcConfigChangeNotifier extends Subscriber { continue; } - if (notifyRequet.isBeta()) { - List betaIps = notifyRequet.getBetaIps(); + if (isBeta) { if (betaIps != null && !betaIps.contains(connection.getMetaInfo().getClientIp())) { continue; } } - RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 50, client, + RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, connection.getMetaInfo().getClientIp(), connection.getMetaInfo().getConnectionId()); push(rpcPushRetryTask); notifyCount++; @@ -106,16 +105,7 @@ public class RpcConfigChangeNotifier extends Subscriber { String group = strings[1]; String tenant = strings.length > 2 ? strings[2] : ""; ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant); - notifyRequest.setBeta(isBeta); - notifyRequest.setBetaIps(betaIps); - if (PropertyUtil.isPushContent()) { - notifyRequest.setContent(event.content); - notifyRequest.setType(event.type); - notifyRequest.setLastModifiedTs(event.lastModifiedTs); - notifyRequest.setContentPush(true); - } - - configDataChanged(groupKey, notifyRequest); + configDataChanged(groupKey, notifyRequest, isBeta, betaIps); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java index 46f7277ef..c6f866388 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java @@ -38,8 +38,6 @@ public class PropertyUtil implements ApplicationContextInitializerpushContent. - * - * @return property value of pushContent - */ - public static boolean isPushContent() { - return pushContent; - } - - /** - * Setter method for property pushContent. - * - * @param pushContent value to be assigned to property pushContent - */ - public static void setPushContent(boolean pushContent) { - PropertyUtil.pushContent = pushContent; - } - public static int getNotifyConnectTimeout() { return notifyConnectTimeout; } @@ -292,9 +272,6 @@ public class PropertyUtil implements ApplicationContextInitializer responseMap = new HashMap<>(3); - connectionManager.coordinateMaxClientsSmoth(count); + connectionManager.setMaxClientCount(count); return ResponseEntity.ok().body("success"); } @@ -122,6 +129,84 @@ public class ServerLoaderController { return ResponseEntity.ok().body("success"); } + /** + * Get server state of current server. + * + * @return state json. + */ + @Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.WRITE) + @GetMapping("/smartReload") + public ResponseEntity smartReload(HttpServletRequest request, + @RequestParam(value = "loaderFactor", required = false) String loaderFactorStr) { + + LOGGER.info("Smart reload request receive,requestIp={}", RequestUtil.getRemoteIp(request)); + + Map serverLoadMetrics = getServerLoadMetrics(); + Object avgString = (Object) serverLoadMetrics.get("avg"); + List details = (List) serverLoadMetrics.get("detail"); + int avg = Integer.valueOf(avgString.toString()); + float loaderFactor = + StringUtils.isBlank(loaderFactorStr) ? RemoteUtils.LOADER_FACTOR : Float.valueOf(loaderFactorStr); + int overLimitCount = (int) (avg * (1 + loaderFactor)); + int lowLimitCount = (int) (avg * (1 - loaderFactor)); + + List overLimitServer = new ArrayList(); + List lowLimitServer = new ArrayList(); + + for (ServerLoaderMetrics metrics : details) { + int sdkCount = Integer.valueOf(metrics.getMetric().get("sdkConCount")); + if (sdkCount > overLimitCount) { + overLimitServer.add(metrics); + } + if (sdkCount < lowLimitCount) { + lowLimitServer.add(metrics); + } + } + + // desc by sdkConCount + overLimitServer.sort((o1, o2) -> { + Integer sdkCount1 = Integer.valueOf(o1.getMetric().get("sdkConCount")); + Integer sdkCount2 = Integer.valueOf(o2.getMetric().get("sdkConCount")); + return sdkCount1.compareTo(sdkCount2) * -1; + }); + + LOGGER.info("Over load limit server list ={}", overLimitServer); + + //asc by sdkConCount + lowLimitServer.sort((o1, o2) -> { + Integer sdkCount1 = Integer.valueOf(o1.getMetric().get("sdkConCount")); + Integer sdkCount2 = Integer.valueOf(o2.getMetric().get("sdkConCount")); + return sdkCount1.compareTo(sdkCount2); + }); + + LOGGER.info("Low load limit server list ={}", lowLimitServer); + + CompletionService completionService = new ExecutorCompletionService( + executorService); + for (int i = 0; i < overLimitServer.size() & i < lowLimitServer.size(); i++) { + ServerReloadRequest serverLoaderInfoRequest = new ServerReloadRequest(); + serverLoaderInfoRequest.setReloadCount(overLimitCount); + serverLoaderInfoRequest.setReloadServer(lowLimitServer.get(i).address); + Member member = serverMemberManager.find(overLimitServer.get(i).address); + + LOGGER.info("Reload task submit ,fromServer ={},toServer={}, ", overLimitServer.get(i).address, + lowLimitServer.get(i).address); + + if (serverMemberManager.getSelf().equals(member)) { + try { + serverReloaderRequestHandler.handle(serverLoaderInfoRequest, new RequestMeta()); + } catch (NacosException e) { + e.printStackTrace(); + } + } else { + completionService.submit(new ServerReLoaderRpcTask(serverLoaderInfoRequest, member)); + } + } + + return ResponseEntity.ok().body("ok"); + } + + /** * Get server state of current server. * @@ -203,7 +288,7 @@ public class ServerLoaderController { */ @Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.READ) @GetMapping("/clustermetric") - public ResponseEntity clusterLoader() { + public ResponseEntity loaderMetrics() { Map serverLoadMetrics = getServerLoadMetrics(); @@ -212,7 +297,7 @@ public class ServerLoaderController { private Map getServerLoadMetrics() { - CompletionService completionService = new ExecutorCompletionService( + CompletionService completionService = new ExecutorCompletionService( executorService); int count = 0; @@ -224,12 +309,12 @@ public class ServerLoaderController { } } - List responseList = new LinkedList(); + List responseList = new LinkedList(); try { ServerLoaderInfoResponse handle = serverLoaderInfoRequestHandler .handle(new ServerLoaderInfoRequest(), new RequestMeta()); - ServerLoaderMetris metris = new ServerLoaderMetris(); + ServerLoaderMetrics metris = new ServerLoaderMetrics(); metris.setAddress(serverMemberManager.getSelf().getAddress()); metris.setMetric(handle.getLoaderMetrics()); responseList.add(metris); @@ -237,13 +322,15 @@ public class ServerLoaderController { e.printStackTrace(); } + int resultCount = 0; for (int i = 0; i < count; i++) { try { - Future f = completionService.poll(1000, TimeUnit.MILLISECONDS); + Future f = completionService.poll(1000, TimeUnit.MILLISECONDS); try { if (f != null) { - ServerLoaderMetris response = f.get(500, TimeUnit.MILLISECONDS); + ServerLoaderMetrics response = f.get(500, TimeUnit.MILLISECONDS); if (response != null) { + resultCount++; responseList.add(response); } } @@ -262,12 +349,15 @@ public class ServerLoaderController { Map responseMap = new HashMap<>(3); responseMap.put("detail", responseList); + responseMap.put("memberCount", count); + responseMap.put("metricsCount", resultCount); + int max = 0; int min = -1; int total = 0; - for (ServerLoaderMetris serverLoaderMetris : responseList) { - String sdkConCountStr = serverLoaderMetris.getMetric().get("sdkConCount"); + for (ServerLoaderMetrics serverLoaderMetrics : responseList) { + String sdkConCountStr = serverLoaderMetrics.getMetric().get("sdkConCount"); if (StringUtils.isNotBlank(sdkConCountStr)) { int sdkConCount = Integer.valueOf(sdkConCountStr); @@ -290,7 +380,7 @@ public class ServerLoaderController { } - class ServerLoaderInfoRpcTask implements Callable { + class ServerLoaderInfoRpcTask implements Callable { ServerLoaderInfoRequest request; @@ -302,18 +392,18 @@ public class ServerLoaderController { } @Override - public ServerLoaderMetris call() throws Exception { + public ServerLoaderMetrics call() throws Exception { ServerLoaderInfoResponse response = (ServerLoaderInfoResponse) clusterRpcClientProxy .sendRequest(this.member, this.request); - ServerLoaderMetris metris = new ServerLoaderMetris(); + ServerLoaderMetrics metris = new ServerLoaderMetrics(); metris.setAddress(member.getAddress()); metris.setMetric(response.getLoaderMetrics()); return metris; } } - class ServerLoaderMetris { + class ServerLoaderMetrics { String address; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index a40c200fb..2745632c2 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -167,10 +167,7 @@ public class ConnectionManager { @Override public void run() { try { - MetricsMonitor.getLongConnectionMonitor().set(connections.size()); - - long currentStamp = System.currentTimeMillis(); Set> entries = connections.entrySet(); boolean isLoaderClient = loadClient >= 0; int currentMaxClient = isLoaderClient ? loadClient : maxClient; @@ -197,8 +194,9 @@ public class ConnectionManager { if (connection != null) { connection.asyncRequest(connectResetRequest, buildMeta(), null); Loggers.REMOTE - .info("expel connection ,send switch server response connection id = {},connectResetRequest={} ", - expelledClientId, connectResetRequest); + .info("send connection reset server , connection id = {},recommendServerIp={}, recommendServerPort={}", + expelledClientId, connectResetRequest.getServerIp(), + connectResetRequest.getServerPort()); } } catch (ConnectionAlreadyClosedException e) { @@ -229,7 +227,7 @@ public class ConnectionManager { return meta; } - public void coordinateMaxClientsSmoth(int maxClient) { + public void setMaxClientCount(int maxClient) { this.maxClient = maxClient; } @@ -259,7 +257,7 @@ public class ConnectionManager { } catch (ConnectionAlreadyClosedException e) { unregister(connectionId); } catch (Exception e) { - Loggers.REMOTE.error("error occurs when expel connetion :", connectionId, e); + Loggers.REMOTE.error("error occurs when expel connection :", connectionId, e); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java index 4032f0da0..e9b649b8d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java @@ -18,7 +18,7 @@ package com.alibaba.nacos.core.remote; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.AbstractRequestCallBack; -import com.alibaba.nacos.api.remote.request.ServerPushRequest; +import com.alibaba.nacos.api.remote.request.ServerRequest; import com.alibaba.nacos.api.remote.PushCallBack; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException; @@ -47,7 +47,7 @@ public class RpcPushService { * @param request request. * @param requestCallBack requestCallBack. */ - public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack, + public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { @@ -93,7 +93,7 @@ public class RpcPushService { * @param connectionId connectionId. * @param request request. */ - public void pushWithoutAck(String connectionId, ServerPushRequest request) { + public void pushWithoutAck(String connectionId, ServerRequest request) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerReloaderRequestHandler.java b/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerReloaderRequestHandler.java index 6fb0528d3..e58546fa8 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerReloaderRequestHandler.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerReloaderRequestHandler.java @@ -23,6 +23,7 @@ import com.alibaba.nacos.api.remote.request.ServerReloadRequest; import com.alibaba.nacos.api.remote.response.ServerReloadResponse; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.RemoteUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -45,6 +46,8 @@ public class ServerReloaderRequestHandler extends RequestHandler filter = new HashMap(2); filter.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); @@ -53,7 +56,7 @@ public class ServerReloaderRequestHandler extends RequestHandler