From 33df55d40a26670d31b4097eae02467adf16739f Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Fri, 14 Aug 2020 17:40:00 +0800 Subject: [PATCH] remote support on servers and optimize connection and client model (#3609) * Add gprc support-> optimize rpc listen execute task notify * Add gprc support-> add listener optimize. * Add gprc support-> cluster rpc client support * cluster rpc client support * config change notify bettween server long connect support * server push future refactor. * server rpc sync compatibility support * connection labels support * code fail fix --- .../request/ConfigRequestTypeConstants.java | 3 + .../ConfigChangeClusterSyncRequest.java | 163 ++++++++++++ .../response/ConfigResponseTypeConstants.java | 2 + .../ConfigChangeClusterSyncResponse.java | 34 +++ .../nacos/api/remote/RemoteConstants.java | 41 +++ .../request/ConnectionSetupRequest.java | 31 +++ .../api/remote/request/PushAckRequest.java | 20 ++ .../remote/response/AbstractPushCallBack.java | 18 +- .../api/remote/response/PushCallBack.java | 2 + .../api/remote/response/ResponseRegistry.java | 4 + .../client/config/impl/ClientWorker.java | 58 +++-- .../config/remote/ConfigClientProxy.java | 20 +- .../nacos/client/config/utils/ParamUtils.java | 9 + .../remote/gprc/NamingGrpcClientProxy.java | 2 +- .../com/alibaba/nacos/client/ConfigTest.java | 75 ++++-- .../nacos/common/remote/ConnectionType.java | 11 + .../common/remote/client/Connection.java | 17 ++ .../nacos/common/remote/client/RpcClient.java | 96 +++++-- .../remote/client/RpcClientFactory.java | 69 ++++- .../common/remote/client/grpc/GrpcClient.java | 26 +- .../remote/client/grpc/GrpcConnection.java | 3 +- .../client/rsocket/RsocketConnection.java | 9 + .../client/rsocket/RsocketRpcClient.java | 117 +++++---- ...ConfigChangeClusterSyncRequestHandler.java | 71 ++++++ .../server/remote/ConfigChangeNotifier.java | 5 +- .../remote/ConfigClusterRpcClientProxy.java | 56 +++++ .../service/notify/AsyncNotifyService.java | 122 ++++++++- .../controller/ServerLoaderController.java | 22 +- .../src/main/resources/application.properties | 8 +- .../core/cluster/MemberMetaDataConstants.java | 6 +- .../nacos/core/cluster/MemberUtils.java | 17 ++ .../core/cluster/ServerMemberManager.java | 2 + .../cluster/remote/ClusterRpcClientProxy.java | 166 +++++++++++++ .../alibaba/nacos/core/remote/Connection.java | 24 +- .../nacos/core/remote/ConnectionManager.java | 9 +- .../nacos/core/remote/ConnectionMetaInfo.java | 19 +- .../nacos/core/remote/DefaultPushFuture.java | 146 +++++++++++ .../alibaba/nacos/core/remote/PushFuture.java | 52 ++++ .../remote/RemoteConnectionEventListener.java | 38 +++ .../remote/RpcAckCallbackSynchronizer.java | 154 ++++++++++++ .../nacos/core/remote/RpcPushService.java | 16 +- .../alibaba/nacos/core/remote/RpcServer.java | 67 ++++- .../core/remote/grpc/GrpcAckSynchronizer.java | 235 ------------------ .../core/remote/grpc/GrpcConnection.java | 95 ++----- .../grpc/GrpcRequestHandlerReactor.java | 6 +- .../nacos/core/remote/grpc/GrpcServer.java | 79 +++--- .../grpc/GrpcStreamRequestHanderImpl.java | 10 +- .../remote/rsocket/RsocketConnection.java | 48 ++-- .../core/remote/rsocket/RsocketRpcServer.java | 25 +- .../cluster/remote/ClusterConnection.java | 5 +- .../ForwardInstanceRequestHandler.java | 3 +- 51 files changed, 1788 insertions(+), 548 deletions(-) create mode 100644 api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java create mode 100644 api/src/main/java/com/alibaba/nacos/api/config/remote/response/cluster/ConfigChangeClusterSyncResponse.java create mode 100644 api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java rename core/src/main/java/com/alibaba/nacos/core/remote/NacosRemoteConstants.java => api/src/main/java/com/alibaba/nacos/api/remote/response/AbstractPushCallBack.java (61%) create mode 100644 config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeClusterSyncRequestHandler.java create mode 100644 config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigClusterRpcClientProxy.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/DefaultPushFuture.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/PushFuture.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/RemoteConnectionEventListener.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRequestTypeConstants.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRequestTypeConstants.java index 9f17d8b4d..5d816d455 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRequestTypeConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRequestTypeConstants.java @@ -35,4 +35,7 @@ public class ConfigRequestTypeConstants extends RequestTypeConstants { public static final String CONFIG_CHANGE_NOTIFY = "CONFIG_CHANGE_NOTIFY"; + public static final String CONFIG_CHANGE_CLUSTER_SYNC = "CONFIG_CHANGE_CLUSTER_SYNC"; + + } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java new file mode 100644 index 000000000..3dade2062 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java @@ -0,0 +1,163 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.config.remote.request.cluster; + +import com.alibaba.nacos.api.config.remote.request.AbstractConfigRequest; +import com.alibaba.nacos.api.config.remote.request.ConfigRequestTypeConstants; + +/** + * config change sync request on clusters. + * + * @author liuzunfei + * @version $Id: ConfigChangeClusterSyncRequest.java, v 0.1 2020年08月11日 4:30 PM liuzunfei Exp $ + */ +public class ConfigChangeClusterSyncRequest extends AbstractConfigRequest { + + String dataId; + + String group; + + String tenant; + + String tag; + + long lastModified; + + String isBeta; + + /** + * is beta. + * + * @return + */ + public boolean isBeta() { + return "Y".equalsIgnoreCase(isBeta); + } + + @Override + public String getType() { + return ConfigRequestTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC; + } + + /** + * Getter method for property dataId. + * + * @return property value of dataId + */ + public String getDataId() { + return dataId; + } + + /** + * Setter method for property dataId. + * + * @param dataId value to be assigned to property dataId + */ + public void setDataId(String dataId) { + this.dataId = dataId; + } + + /** + * Getter method for property group. + * + * @return property value of group + */ + public String getGroup() { + return group; + } + + /** + * Setter method for property group. + * + * @param group value to be assigned to property group + */ + public void setGroup(String group) { + this.group = group; + } + + /** + * Getter method for property tenant. + * + * @return property value of tenant + */ + public String getTenant() { + return tenant; + } + + /** + * Setter method for property tenant. + * + * @param tenant value to be assigned to property tenant + */ + public void setTenant(String tenant) { + this.tenant = tenant; + } + + /** + * Getter method for property tag. + * + * @return property value of tag + */ + public String getTag() { + return tag; + } + + /** + * Setter method for property tag. + * + * @param tag value to be assigned to property tag + */ + public void setTag(String tag) { + this.tag = tag; + } + + /** + * Getter method for property lastModified. + * + * @return property value of lastModified + */ + public long getLastModified() { + return lastModified; + } + + /** + * Setter method for property lastModified. + * + * @param lastModified value to be assigned to property lastModified + */ + public void setLastModified(long lastModified) { + this.lastModified = lastModified; + } + + /** + * Getter method for property isBeta. + * + * @return property value of isBeta + */ + public String getIsBeta() { + return isBeta; + } + + /** + * Setter method for property isBeta. + * + * @param isBeta value to be assigned to property isBeta + */ + public void setIsBeta(String isBeta) { + this.isBeta = isBeta; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigResponseTypeConstants.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigResponseTypeConstants.java index cc2bb34bb..92759ffa3 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigResponseTypeConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigResponseTypeConstants.java @@ -35,4 +35,6 @@ public class ConfigResponseTypeConstants extends ResponseTypeConstants { public static final String CONFIG_NOTIFY = "CONFIG_NOTIFY"; + public static final String CONFIG_CHANGE_CLUSTER_SYNC = "CONFIG_CHANGE_CLUSTER_SYNC"; + } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/cluster/ConfigChangeClusterSyncResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/cluster/ConfigChangeClusterSyncResponse.java new file mode 100644 index 000000000..e78b285e9 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/cluster/ConfigChangeClusterSyncResponse.java @@ -0,0 +1,34 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.config.remote.response.cluster; + +import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants; +import com.alibaba.nacos.api.remote.response.Response; + +/** + * config change sync response on clusters. + * + * @author liuzunfei + * @version $Id: ConfigChangeClusterSyncResponse.java, v 0.1 2020年08月11日 4:32 PM liuzunfei Exp $ + */ +public class ConfigChangeClusterSyncResponse extends Response { + + @Override + public String getType() { + return ConfigResponseTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java b/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java new file mode 100644 index 000000000..a6e21e0a6 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote; + +/** + * constants define of remote. + * + * @author liuzunfei + * @version $Id: ConnectionMetaConstants.java, v 0.1 2020年08月13日 1:05 PM liuzunfei Exp $ + */ +public class RemoteConstants { + + /** + * label key value define. + */ + public static final String LABEL_SOURCE = "source"; + + public static final String LABEL_SOURCE_SDK = "sdk"; + + public static final String LABEL_SOURCE_NODE = "node"; + + public static final String LABEL_MODULE = "module"; + + public static final String LABEL_MODULE_CONFIG = "config"; + + public static final String LABEL_MODULE_NAMING = "naming"; +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java index 8f2be43be..bb9c689be 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.api.remote.request; +import java.util.HashMap; +import java.util.Map; + /** * request to setup a connection. * @@ -30,6 +33,8 @@ public class ConnectionSetupRequest extends InternalRequest { private String clientVersion; + protected Map labels = new HashMap(); + public ConnectionSetupRequest() { } @@ -39,6 +44,14 @@ public class ConnectionSetupRequest extends InternalRequest { this.clientVersion = clientVersion; } + public ConnectionSetupRequest(String connectionId, String clientIp, String clientVersion, + Map labels) { + this.clientIp = clientIp; + this.connectionId = connectionId; + this.clientVersion = clientVersion; + this.labels = labels; + } + @Override public String getType() { return RequestTypeConstants.CONNECTION_SETUP; @@ -97,4 +110,22 @@ public class ConnectionSetupRequest extends InternalRequest { public void setClientVersion(String clientVersion) { this.clientVersion = clientVersion; } + + /** + * Getter method for property labels. + * + * @return property value of labels + */ + public Map getLabels() { + return labels; + } + + /** + * Setter method for property labels. + * + * @param labels value to be assigned to property labels + */ + public void setLabels(Map labels) { + this.labels = labels; + } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/PushAckRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/PushAckRequest.java index d0083f3ee..79829081e 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/PushAckRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/PushAckRequest.java @@ -28,6 +28,8 @@ public class PushAckRequest extends InternalRequest { private boolean success; + private Exception exception; + @Override public String getType() { return RequestTypeConstants.PUSH_ACK; @@ -83,4 +85,22 @@ public class PushAckRequest extends InternalRequest { public void setSuccess(boolean success) { this.success = success; } + + /** + * Setter method for property exception. + * + * @param exception value to be assigned to property exception + */ + public void setException(Exception exception) { + this.exception = exception; + } + + /** + * Getter method for property exception. + * + * @return property value of exception + */ + public Exception getException() { + return exception; + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/NacosRemoteConstants.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/AbstractPushCallBack.java similarity index 61% rename from core/src/main/java/com/alibaba/nacos/core/remote/NacosRemoteConstants.java rename to api/src/main/java/com/alibaba/nacos/api/remote/response/AbstractPushCallBack.java index b45930ddc..66d709176 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/NacosRemoteConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/AbstractPushCallBack.java @@ -14,18 +14,24 @@ * limitations under the License. */ -package com.alibaba.nacos.core.remote; +package com.alibaba.nacos.api.remote.response; /** - * NacosRemoteConstants. + * abstract callback of push service. * * @author liuzunfei - * @version $Id: NacosRemoteConstants.java, v 0.1 2020年07月14日 9:22 PM liuzunfei Exp $ + * @version $Id: PushCallBack.java, v 0.1 2020年07月20日 1:13 PM liuzunfei Exp $ */ -public class NacosRemoteConstants { +public abstract class AbstractPushCallBack implements PushCallBack { - public static final String LISTEN_CONTEXT_CONFIG = "CONFIG"; + private long timeout; - public static final String LISTEN_CONTEXT_NAMING = "NAMING"; + public AbstractPushCallBack(long timeout) { + this.timeout = timeout; + } + @Override + public long getTimeout() { + return timeout; + } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java index fd2c22822..89847e87a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java @@ -24,6 +24,8 @@ package com.alibaba.nacos.api.remote.response; */ public interface PushCallBack { + public long getTimeout(); + public void onSuccess(); public void onFail(Exception e); diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseRegistry.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseRegistry.java index 0ffa70450..c010f39e6 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseRegistry.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseRegistry.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse; import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse; import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse; import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants; +import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest; import com.alibaba.nacos.api.naming.remote.response.InstanceResponse; @@ -53,6 +54,9 @@ public class ResponseRegistry { REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_QUERY, ConfigQueryResponse.class); REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_PUBLISH, ConfigPubishResponse.class); REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_REMOVE, ConfigRemoveResponse.class); + //config on cluster. + REGISTRY_RESPONSES + .put(ConfigResponseTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC, ConfigChangeClusterSyncResponse.class); //naming response registry REGISTRY_RESPONSES.put(NamingRemoteConstants.REGISTER_INSTANCE, InstanceResponse.class); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index eb0194ad1..acf97ead8 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -93,7 +93,9 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } - notifyRpcListenConfig(); + if (!cache.isListenSuccess()) { + notifyRpcListenConfig(); + } } /** @@ -104,13 +106,15 @@ public class ClientWorker implements Closeable { private void notifyRpcListenConfig() { try { if (!ParamUtils.useHttpSwitch()) { - lock.tryLock(); - try { - condition.signal(); - } finally { - lock.unlock(); + + boolean lockSuccess = lock.tryLock(); + if (lockSuccess) { + try { + condition.signal(); + } finally { + lock.unlock(); + } } - } } catch (Exception e) { LOGGER.warn("[notify rpc listen fail]", e); @@ -153,7 +157,9 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } - notifyRpcListenConfig(); + if (!cache.isListenSuccess()) { + notifyRpcListenConfig(); + } } /** @@ -174,9 +180,10 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } - - notifyRpcListenConfig(); - + // if current cache is already at listening status,do not notify. + if (!cache.isListenSuccess()) { + notifyRpcListenConfig(); + } } /** @@ -653,7 +660,7 @@ public class ClientWorker implements Closeable { try { while (true) { try { - lock.tryLock(); + lock.lock(); //System.out.println("wait execute listen.."); condition.await(); executeRpcListen(); @@ -696,11 +703,14 @@ public class ClientWorker implements Closeable { CacheData cacheData = cacheMap.get().get(groupKey); if (cacheData != null) { cacheData.setListenSuccess(false); - try { - lock.tryLock(); - condition.signal(); - } finally { - lock.unlock(); + + boolean lockSuccess = lock.tryLock(); + if (lockSuccess) { + try { + condition.signal(); + } finally { + lock.unlock(); + } } } } @@ -712,11 +722,13 @@ public class ClientWorker implements Closeable { @Override public void onConnected() { - lock.tryLock(); - try { - condition.signal(); - } finally { - lock.unlock(); + boolean lockSuccess = lock.tryLock(); + if (lockSuccess) { + try { + condition.signal(); + } finally { + lock.unlock(); + } } } @@ -816,7 +828,7 @@ public class ClientWorker implements Closeable { try { ConfigChangeBatchListenResponse configChangeBatchListenResponse = rpcClientProxy .listenConfigChange(listenConfigString); - if (configChangeBatchListenResponse.isSuccess()) { + if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedGroupKeys())) { for (String groupKey : configChangeBatchListenResponse.getChangedGroupKeys()) { diff --git a/client/src/main/java/com/alibaba/nacos/client/config/remote/ConfigClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/config/remote/ConfigClientProxy.java index 68bfb4b32..7d5fee39e 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/remote/ConfigClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/remote/ConfigClientProxy.java @@ -25,12 +25,18 @@ import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse; import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse; import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.client.config.utils.ParamUtils; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.common.remote.client.ServerListFactory; +import com.alibaba.nacos.common.utils.StringUtils; + +import java.util.HashMap; +import java.util.Map; /** * config grpc client proxy. @@ -44,7 +50,19 @@ public class ConfigClientProxy { private RpcClient rpcClient; public ConfigClientProxy() { - rpcClient = RpcClientFactory.getClient("config", ConnectionType.RSOCKET); + ConnectionType connectionType = ConnectionType.GRPC; + String connetionType = ParamUtils.configRemoteConnectionType(); + if (StringUtils.isNotBlank(connetionType)) { + ConnectionType connectionType1 = ConnectionType.valueOf(connetionType); + if (connectionType1 != null) { + connectionType = connectionType1; + } + } + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); + labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_CONFIG); + + rpcClient = RpcClientFactory.createClient("config", connectionType, labels); } public Response request(Request request) throws NacosException { diff --git a/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java b/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java index 483d23873..031ea325f 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java @@ -218,4 +218,13 @@ public class ParamUtils { return "Y".equalsIgnoreCase(useHttpSwitch); } + /** + * get connection type for remote. + * + * @return + */ + public static String configRemoteConnectionType() { + String remoteConnectionType = System.getProperty("nacos.remote.config.connectiontype"); + return remoteConnectionType; + } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java index 0e4c85474..686e30490 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java @@ -62,7 +62,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy { public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { this.namespaceId = namespaceId; - this.rpcClient = RpcClientFactory.getClient("naming", ConnectionType.GRPC); + this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC); this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); start(serverListFactory, serviceInfoHolder); } diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index a1bf0ce67..f51aba578 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -21,6 +21,12 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.AbstractListener; import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.RpcClientFactory; +import com.alibaba.nacos.common.remote.client.ServerListFactory; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -40,6 +46,7 @@ public class ConfigTest { @Before public void before() throws Exception { Properties properties = new Properties(); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); @@ -49,6 +56,37 @@ public class ConfigTest { //Thread.sleep(2000L); } + @Test + public void test222() throws Exception { + RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET); + client.init(new ServerListFactory() { + @Override + public String genNextServer() { + return "127.0.0.1:8848"; + } + + @Override + public String getCurrentServer() { + return "127.0.0.1:8848"; + } + }); + client.start(); + ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); + syncRequest.setDataId("xiaochun.xxc1"); + syncRequest.setGroup("xiaochun.xxc"); + syncRequest.setIsBeta("N"); + syncRequest.setLastModified(System.currentTimeMillis()); + syncRequest.setTag(""); + syncRequest.setTenant(""); + System.out.println(client.isRunning()); + Response response = client.request(syncRequest); + client.request(syncRequest); + + client.request(syncRequest); + System.out.println(response); + + } + @After public void cleanup() throws Exception { configService.shutDown(); @@ -59,6 +97,7 @@ public class ConfigTest { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); //" + System.out.println("1"); List configServiceList = new ArrayList(); for (int i = 0; i < 200; i++) { @@ -67,10 +106,12 @@ public class ConfigTest { @Override public void receiveConfigInfo(String configInfo) { + System.out.println("listener2:" + configInfo); } }); configServiceList.add(configService); } + System.out.println("2"); Thread th = new Thread(new Runnable() { @Override @@ -80,6 +121,8 @@ public class ConfigTest { int times = 10000; while (times > 0) { try { + System.out.println("3"); + boolean result = configService .publishConfig("test", "test", "value" + System.currentTimeMillis()); @@ -111,22 +154,15 @@ public class ConfigTest { public void run() { long start = System.currentTimeMillis(); Random random = new Random(); - int times = 1000; + int times = 1; while (times > 0) { try { - //System.out.println("发布配置"); boolean success = configService.publishConfig(dataId + random.nextInt(20), group, "value" + System.currentTimeMillis()); - if (success) { - // System.out.println("发布配置成功"); - } else { - //System.out.println("发布配置失败"); - } times--; - Thread.sleep(500L); + Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); - } } @@ -149,24 +185,13 @@ public class ConfigTest { configService.getConfigAndSignListener(dataId + i, group, 3000L, listener); } - //configService.getConfigAndSignListener(dataId, group, 5000, listener); - - //Assert.assertTrue(result); + Thread.sleep(10000L); - // configService.getConfigAndSignListener(dataId, group, 5000, listener); + for (int i = 0; i < 20; i++) { + //configService.removeListener(dataId + i, group, listener); + } + System.out.println("remove listens."); - //configService.removeListener(dataId, group, listener); - //configService.removeConfig(dataId, group); - - // configService.publishConfig("lessspring2", group, "lessspring2value"); - // - // configService.getConfigAndSignListener("lessspring2", group, 5000, new AbstractListener() { - // @Override - // public void receiveConfigInfo(String configInfo) { - // System.out.println("receiveConfigInfo2 :" + configInfo); - // } - // }); - // Scanner scanner = new Scanner(System.in); System.out.println("input content"); while (scanner.hasNextLine()) { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/ConnectionType.java b/common/src/main/java/com/alibaba/nacos/common/remote/ConnectionType.java index f94f99a45..98c6b9924 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/ConnectionType.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/ConnectionType.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.common.remote; /** * ConnectionType. + * * @author liuzunfei * @version $Id: ConnectionType.java, v 0.1 2020年07月13日 7:15 PM liuzunfei Exp $ */ @@ -42,6 +43,16 @@ public enum ConnectionType { String name; + public static ConnectionType getByType(String type) { + ConnectionType[] values = ConnectionType.values(); + for (ConnectionType connectionType : values) { + if (connectionType.getType().equals(type)) { + return connectionType; + } + } + return null; + } + private ConnectionType(String type, String name) { this.type = type; this.name = name; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java index da8ac477c..ffe2efb47 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java @@ -21,6 +21,9 @@ import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; import com.google.common.util.concurrent.FutureCallback; +import java.util.HashMap; +import java.util.Map; + /** * connection on client side. * @@ -33,11 +36,25 @@ public abstract class Connection { protected String connectionId; + protected Map labels = new HashMap(); + public Connection(String connetionId, RpcClient.ServerInfo serverInfo) { this.serverInfo = serverInfo; this.connectionId = connetionId; } + public String getLabel(String labelKey) { + return labels.get(labelKey); + } + + public void putLabel(String labelKey, String labelValue) { + labels.put(labelKey, labelValue); + } + + public void putLabels(Map labels) { + labels.putAll(labels); + } + /** * send request. * diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 4a198e2ef..de5fec136 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -31,7 +31,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -42,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; + /** * abstract remote client to connect to server. * @@ -52,18 +56,24 @@ public abstract class RpcClient implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class); + protected static final long ACTIVE_INTERNAL = 3000L; + private ServerListFactory serverListFactory; protected String connectionId; protected LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue(); - protected AtomicReference rpcClientStatus = new AtomicReference( + protected volatile AtomicReference rpcClientStatus = new AtomicReference( RpcClientStatus.WAIT_INIT); + private long activeTimeStamp = System.currentTimeMillis(); + protected ScheduledExecutorService executorService; - protected Connection currentConnetion; + protected volatile Connection currentConnetion; + + protected Map labels = new HashMap(); /** * listener called where connect status changed. @@ -115,6 +125,14 @@ public abstract class RpcClient implements Closeable { } } + protected boolean overActiveTime() { + return System.currentTimeMillis() - this.activeTimeStamp > ACTIVE_INTERNAL; + } + + protected void refereshActiveTimestamp() { + this.activeTimeStamp = System.currentTimeMillis(); + } + /** * check is this client is inited. * @@ -168,6 +186,16 @@ public abstract class RpcClient implements Closeable { serverListFactory.getClass().getName()); } + /** + * init server list factory. + * + * @param labels labels + */ + public void initLabels(Map labels) { + this.labels.putAll(labels); + LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init label ,labels={}", this.labels); + } + /** * Start this client. */ @@ -196,8 +224,8 @@ public abstract class RpcClient implements Closeable { } else if (take.isDisConnected()) { notifyDisConnected(); } - } catch (InterruptedException e) { - //Do nothing + } catch (Exception e) { + //Donothing } } } @@ -225,8 +253,9 @@ public abstract class RpcClient implements Closeable { public void requestReply(ServerPushRequest request) { if (request instanceof ConnectResetRequest) { try { - + if (isRunning()) { + clearContextOnResetRequest(); switchServerAsync(); } } catch (Exception e) { @@ -247,18 +276,14 @@ public abstract class RpcClient implements Closeable { */ protected void switchServerAsync() { - System.out.println("1"); - //return if is in switching of other thread. if (switchingFlag.get()) { - System.out.println("1-1"); - return; } executorService.submit(new Runnable() { @Override public void run() { - + try { //only one thread can execute switching meantime. boolean innerLock = switchingLock.tryLock(); @@ -269,27 +294,28 @@ public abstract class RpcClient implements Closeable { // loop until start client success. boolean switchSuccess = false; while (!switchSuccess) { - + //1.get a new server ServerInfo serverInfo = nextRpcServer(); + System.out.println("1:" + serverInfo); //2.create a new channel to new server try { Connection connectNew = connectToServer(serverInfo); if (connectNew != null) { - //successfully create a new connect. closeConnection(currentConnetion); currentConnetion = connectNew; rpcClientStatus.set(RpcClientStatus.RUNNING); switchSuccess = true; - eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED)); + boolean s = eventLinkedBlockingQueue + .add(new ConnectionEvent(ConnectionEvent.CONNECTED)); return; } } catch (Exception e) { e.printStackTrace(); // error to create connection } - + try { //sleep 1 second to switch next server. Thread.sleep(1000L); @@ -310,7 +336,7 @@ public abstract class RpcClient implements Closeable { private void closeConnection(Connection connection) { if (connection != null) { connection.close(); - eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED)); + eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED)); } } @@ -328,6 +354,10 @@ public abstract class RpcClient implements Closeable { */ public abstract int rpcPortOffset(); + protected void clearContextOnResetRequest() { + // Default do nothing. + } + /** * send request. * @@ -335,12 +365,32 @@ public abstract class RpcClient implements Closeable { * @return */ public Response request(Request request) throws NacosException { - Response response = this.currentConnetion.request(request); - if (response != null && response instanceof ConnectionUnregisterResponse) { - switchServerAsync(); - throw new IllegalStateException("Invalid client status."); + int retryTimes = 3; + + Exception exceptionToThrow = null; + while (retryTimes > 0) { + try { + Response response = this.currentConnetion.request(request); + if (response != null && response instanceof ConnectionUnregisterResponse) { + clearContextOnResetRequest(); + switchServerAsync(); + throw new IllegalStateException("Invalid client status."); + } + refereshActiveTimestamp(); + return response; + } catch (Exception e) { + LoggerUtils.printIfErrorEnabled(LOGGER, + "Fail to send request,connectionId={}, request={},errorMesssage={}", this.connectionId, request, + e.getMessage()); + exceptionToThrow = e; + } finally { + retryTimes--; + } } - return response; + if (exceptionToThrow != null) { + throw new NacosException(SERVER_ERROR, exceptionToThrow); + } + return null; } /** @@ -351,6 +401,7 @@ public abstract class RpcClient implements Closeable { */ public void asyncRequest(Request request, FutureCallback callback) throws NacosException { this.currentConnetion.asyncRequest(request, callback); + refereshActiveTimestamp(); } /** @@ -414,7 +465,10 @@ public abstract class RpcClient implements Closeable { } protected ServerInfo nextRpcServer() { - getServerListFactory().genNextServer(); + + String s = getServerListFactory().genNextServer(); + System.out.println("0...,switch..." + s); + String serverAddress = getServerListFactory().getCurrentServer(); return resolveServerInfo(serverAddress); } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java index f3ed0cfcf..2fc328fcc 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java @@ -16,12 +16,14 @@ package com.alibaba.nacos.common.remote.client; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.client.grpc.GrpcClient; import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * RpcClientFactory.to support muti client for diffrent modules of usage. @@ -33,13 +35,48 @@ public class RpcClientFactory { static Map clientMap = new HashMap(); - public static RpcClient getClient(String clientName, ConnectionType connectionType) { + /** + * get all client. + * + * @return client collection. + */ + public static Set> getAllClientEntrys() { + Set> entries = clientMap.entrySet(); + return entries; + } + + /** + * shut down client. + * + * @param clientName client name. + */ + public static void destroyClient(String clientName) throws NacosException { + RpcClient rpcClient = clientMap.get(clientName); + if (rpcClient != null) { + rpcClient.shutdown(); + } + clientMap.remove(clientName); + } + + public static RpcClient getClient(String clientName) { + + return clientMap.get(clientName); + } + + /** + * create a rpc client. + * + * @param clientName client name. + * @param connectionType client type. + * @return + */ + public static RpcClient createClient(String clientName, ConnectionType connectionType) { synchronized (clientMap) { if (clientMap.get(clientName) == null) { RpcClient moduleClient = null; if (ConnectionType.GRPC.equals(connectionType)) { moduleClient = new GrpcClient(); - + } else if (ConnectionType.RSOCKET.equals(connectionType)) { moduleClient = new RsocketRpcClient(); } @@ -53,4 +90,32 @@ public class RpcClientFactory { } } + /** + * create a rpc client. + * + * @param clientName client name. + * @param connectionType client type. + * @return + */ + public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) { + synchronized (clientMap) { + if (clientMap.get(clientName) == null) { + RpcClient moduleClient = null; + if (ConnectionType.GRPC.equals(connectionType)) { + moduleClient = new GrpcClient(); + + } else if (ConnectionType.RSOCKET.equals(connectionType)) { + moduleClient = new RsocketRpcClient(); + } + if (moduleClient == null) { + throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType()); + } + moduleClient.initLabels(labels); + clientMap.put(clientName, moduleClient); + return moduleClient; + } + return clientMap.get(clientName); + } + } + } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index cbdbf0c11..421569034 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -35,12 +35,24 @@ import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.utils.VersionUtils; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.Context; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.net.URI; import java.util.concurrent.TimeUnit; /** @@ -73,10 +85,10 @@ public class GrpcClient extends RpcClient { * @return if server check success,return a non-null stub. */ private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) { - + ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext() .build(); - + RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp); boolean checkSucess = serverCheck(grpcServiceStubTemp); @@ -89,6 +101,7 @@ public class GrpcClient extends RpcClient { } } + /** * shutdown a channel. * @@ -114,7 +127,7 @@ public class GrpcClient extends RpcClient { while (maxRetryTimes > 0) { try { - if (!isRunning()) { + if (!isRunning() && !overActiveTime()) { return; } HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); @@ -142,7 +155,7 @@ public class GrpcClient extends RpcClient { private GrpcMetadata buildMeta() { GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) - .setVersion(VersionUtils.getFullClientVersion()).build(); + .setVersion(VersionUtils.getFullClientVersion()).putAllLabels(labels).build(); return meta; } @@ -173,6 +186,7 @@ public class GrpcClient extends RpcClient { * @param streamStub streamStub to bind. */ private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) { + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build(); LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest); streamStub.requestStream(streamRequest, new StreamObserver() { @@ -206,6 +220,7 @@ public class GrpcClient extends RpcClient { } }); } + private void sendAckResponse(String ackId, boolean success) { try { PushAckRequest request = PushAckRequest.build(ackId, success); @@ -223,7 +238,7 @@ public class GrpcClient extends RpcClient { public void run() { sendBeat(); } - }, 0, 3000, TimeUnit.MILLISECONDS); + }, 0, ACTIVE_INTERNAL, TimeUnit.MILLISECONDS); } @Override @@ -240,6 +255,7 @@ public class GrpcClient extends RpcClient { RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); bindRequestStream(requestStreamStubTemp); + //switch current channel and stub RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc .newFutureStub(newChannelStubTemp.getChannel()); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java index 2286a5b82..901f346a9 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java @@ -50,7 +50,8 @@ public class GrpcConnection extends Connection { /** * executor to execute future request. */ - static ExecutorService aynsRequestExecutor = Executors.newScheduledThreadPool(10); + static ExecutorService aynsRequestExecutor = Executors + .newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); /** * grpc channel. diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java index b8f1ba0c9..f549b1a61 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java @@ -86,6 +86,15 @@ public class RsocketConnection extends Connection { } } + /** + * Getter method for property rSocketClient. + * + * @return property value of rSocketClient + */ + public RSocket getrSocketClient() { + return rSocketClient; + } + @Override public String toString() { return "RsocketConnection{" + "serverInfo=" + serverInfo + ", connectionId='" + connectionId + '\'' + '}'; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java index bcb4678d0..0d42d0070 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java @@ -40,9 +40,13 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.Random; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * rsocket implementation of rpc client. @@ -78,52 +82,53 @@ public class RsocketRpcClient extends RpcClient { try { ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest(connectionId, NetUtils.localIP(), - VersionUtils.getFullClientVersion()); + VersionUtils.getFullClientVersion(), labels); Payload setUpPayload = RsocketUtils.convertRequestToPayload(conconSetupRequest, buildMeta()); - - RSocket rSocket = RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor() { - @Override - public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { - - RSocket rsocket = new RSocketProxy(sendingSocket) { + RSocket rSocket = RSocketConnector.create().keepAlive(Duration.ofMillis(3000L), Duration.ofMillis(6000L)) + .setupPayload(setUpPayload).acceptor(new SocketAcceptor() { @Override - public Mono requestResponse(Payload payload) { - try { - final ServerPushRequest request = RsocketUtils.parseServerRequestFromPayload(payload); - try { - handleServerRequest(request); - ServerPushResponse response = new ServerPushResponse(); - response.setRequestId(request.getRequestId()); - return Mono.just(RsocketUtils.convertResponseToPayload(response)); - } catch (Exception e) { - ServerPushResponse response = new ServerPushResponse(); - response.setResultCode(ResponseCode.FAIL.getCode()); - response.setMessage(e.getMessage()); - response.setRequestId(request.getRequestId()); - return Mono.just(RsocketUtils.convertResponseToPayload(response)); + public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { + + RSocket rsocket = new RSocketProxy(sendingSocket) { + @Override + public Mono requestResponse(Payload payload) { + try { + final ServerPushRequest request = RsocketUtils + .parseServerRequestFromPayload(payload); + try { + handleServerRequest(request); + ServerPushResponse response = new ServerPushResponse(); + response.setRequestId(request.getRequestId()); + return Mono.just(RsocketUtils.convertResponseToPayload(response)); + } catch (Exception e) { + ServerPushResponse response = new ServerPushResponse(); + response.setResultCode(ResponseCode.FAIL.getCode()); + response.setMessage(e.getMessage()); + response.setRequestId(request.getRequestId()); + return Mono.just(RsocketUtils.convertResponseToPayload(response)); + } + + } catch (Exception e) { + ServerPushResponse response = new ServerPushResponse(); + response.setResultCode(ResponseCode.FAIL.getCode()); + response.setMessage(e.getMessage()); + return Mono.just(DefaultPayload + .create(RsocketUtils.convertResponseToPayload(response))); + } } - - } catch (Exception e) { - ServerPushResponse response = new ServerPushResponse(); - response.setResultCode(ResponseCode.FAIL.getCode()); - response.setMessage(e.getMessage()); - return Mono.just(DefaultPayload - .create(RsocketUtils.convertResponseToPayload(response))); - } - } - @Override - public Mono fireAndForget(Payload payload) { - System.out.println("收到服务端fireAndForget:" + payload.getDataUtf8()); - final ServerPushRequest request = RsocketUtils.parseServerRequestFromPayload(payload); - handleServerRequest(request); - return Mono.just(null); - } - }; + @Override + public Mono fireAndForget(Payload payload) { + final ServerPushRequest request = RsocketUtils + .parseServerRequestFromPayload(payload); + handleServerRequest(request); + return Mono.just(null); + } + }; - return Mono.just((RSocket) rsocket); - } - }).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block(); + return Mono.just((RSocket) rsocket); + } + }).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block(); RsocketConnection connection = new RsocketConnection(connectionId, serverInfo, rSocket); fireOnCloseEvent(rSocket); return connection; @@ -147,8 +152,24 @@ public class RsocketRpcClient extends RpcClient { } } + void cancelfireOnCloseEvent(RSocket rSocket) { + System.out.println("cancelfireOnCloseEvent....111"); + + if (rSocket != null) { + System.out.println("cancelfireOnCloseEvent....222"); + rSocket.onClose().subscribe().dispose(); + } + } + + @Override + protected void clearContextOnResetRequest() { + RsocketConnection rsocket = (RsocketConnection) currentConnetion; + cancelfireOnCloseEvent(rsocket.getrSocketClient()); + } + void fireOnCloseEvent(RSocket rSocket) { - rSocket.onClose().subscribe(new Subscriber() { + + Subscriber subscriber = new Subscriber() { @Override public void onSubscribe(Subscription subscription) { @@ -156,12 +177,11 @@ public class RsocketRpcClient extends RpcClient { @Override public void onNext(Void aVoid) { - } @Override public void onError(Throwable throwable) { - System.out.println("On error ,switch server ..."); + System.out.println("On error ,switch server ..." + throwable); switchServerAsync(); } @@ -170,7 +190,14 @@ public class RsocketRpcClient extends RpcClient { System.out.println("On complete ,switch server ..."); switchServerAsync(); } - }); + }; + + } + + class RsocketHolder { + + RSocket rsocket; + } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeClusterSyncRequestHandler.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeClusterSyncRequestHandler.java new file mode 100644 index 000000000..a6973e7ab --- /dev/null +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeClusterSyncRequestHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.config.server.remote; + +import com.alibaba.nacos.api.config.remote.request.ConfigRequestTypeConstants; +import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest; +import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.config.server.service.dump.DumpService; +import com.alibaba.nacos.core.remote.RequestHandler; +import com.google.common.collect.Lists; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * handller to handler config change from other servers. + * + * @author liuzunfei + * @version $Id: ConfigChangeClusterSyncRequestHandler.java, v 0.1 2020年08月11日 4:35 PM liuzunfei Exp $ + */ +@Component +public class ConfigChangeClusterSyncRequestHandler extends RequestHandler { + + @Autowired + private DumpService dumpService; + + @Override + public Request parseBodyString(String bodyString) { + return JacksonUtils.toObj(bodyString, ConfigChangeClusterSyncRequest.class); + } + + @Override + public Response handle(Request request, RequestMeta meta) throws NacosException { + ConfigChangeClusterSyncRequest configChangeSyncRequest = (ConfigChangeClusterSyncRequest) request; + + if (configChangeSyncRequest.isBeta()) { + dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(), + configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp(), + true); + } else { + dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(), + configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp()); + } + return new ConfigChangeClusterSyncResponse(); + } + + @Override + public List getRequestTypes() { + return Lists.newArrayList(ConfigRequestTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC); + } +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java index 28847e2b2..02c24169a 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.config.server.remote; import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest; -import com.alibaba.nacos.api.remote.response.PushCallBack; +import com.alibaba.nacos.api.remote.response.AbstractPushCallBack; import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.core.remote.RpcPushService; import com.alibaba.nacos.core.utils.Loggers; @@ -60,7 +60,8 @@ public class ConfigChangeNotifier { if (!CollectionUtils.isEmpty(clients)) { for (final String client : clients) { - rpcPushService.pushWithCallback(client, notifyRequet, new PushCallBack() { + rpcPushService.pushWithCallback(client, notifyRequet, new AbstractPushCallBack(500L) { + @Override public void onSuccess() { Loggers.CORE.info("push callback success.,groupKey={},clientId={}", groupKey, client); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigClusterRpcClientProxy.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigClusterRpcClientProxy.java new file mode 100644 index 000000000..d23388f30 --- /dev/null +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigClusterRpcClientProxy.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.config.server.remote; + +import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest; +import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.remote.ClusterRpcClientProxy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * ConfigClusterRpcClientProxy. + * + * @author liuzunfei + * @version $Id: ConfigClusterRpcClientProxy.java, v 0.1 2020年08月11日 4:28 PM liuzunfei Exp $ + */ +@Service +public class ConfigClusterRpcClientProxy { + + @Autowired + ClusterRpcClientProxy clusterRpcClientProxy; + + /** + * sync config change request. + * @param member + * @param request + * @return + * @throws NacosException exception. + */ + public ConfigChangeClusterSyncResponse syncConfigChange(Member member, ConfigChangeClusterSyncRequest request) + throws NacosException { + + Response response = clusterRpcClientProxy.sendRequest(member, request); + if (response != null && response instanceof ConfigChangeClusterSyncResponse) { + return (ConfigChangeClusterSyncResponse) response; + } + return null; + } +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index c78247294..09c2d1df1 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -16,17 +16,23 @@ package com.alibaba.nacos.config.server.service.notify; +import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest; +import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse; +import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; +import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy; +import com.alibaba.nacos.config.server.service.dump.DumpService; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; import com.alibaba.nacos.config.server.utils.ConfigExecutor; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.config.server.utils.PropertyUtil; import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.MemberUtils; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.InetUtils; @@ -60,6 +66,9 @@ import java.util.concurrent.TimeUnit; @Service public class AsyncNotifyService { + @Autowired + private DumpService dumpService; + @Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; @@ -84,12 +93,25 @@ public class AsyncNotifyService { Collection ipList = memberManager.allMembers(); // In fact, any type of queue here can be - Queue queue = new LinkedList(); + Queue httpQueue = new LinkedList(); + Queue rpcQueue = new LinkedList(); + for (Member member : ipList) { - queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), - evt.isBeta)); + if (MemberUtils.getSupportedConnectionType(member) == null) { + httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), + evt.isBeta)); + } else { + rpcQueue.add( + new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); + } } - ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, queue)); + if (!httpQueue.isEmpty()) { + ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, httpQueue)); + } + if (!rpcQueue.isEmpty()) { + ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); + } + } } @@ -109,6 +131,9 @@ public class AsyncNotifyService { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class); + @Autowired + private ConfigClusterRpcClientProxy configClusterRpcClientProxy; + private ServerMemberManager memberManager; class AsyncTask implements Runnable { @@ -157,6 +182,83 @@ public class AsyncNotifyService { } + class AsyncRpcTask implements Runnable { + + private Queue queue; + + public AsyncRpcTask(Queue queue) { + this.queue = queue; + } + + @Override + public void run() { + while (!queue.isEmpty()) { + NotifySingleRpcTask task = queue.poll(); + Member member = task.member; + + ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); + syncRequest.setDataId(task.getDataId()); + syncRequest.setGroup(task.getGroup()); + syncRequest.setIsBeta(task.isBeta ? "Y" : "N"); + syncRequest.setLastModified(task.getLastModified()); + syncRequest.setTag(task.tag); + syncRequest.setTenant(task.getTenant()); + + if (memberManager.hasMember(member.getAddress()) && !memberManager.getSelf().equals(member)) { + // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify + boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); + if (unHealthNeedDelay) { + // target ip is unhealthy, then put it in the notification list + ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, + task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, + 0, member.getAddress()); + // get delay time and set fail count to the task + asyncTaskExecute(task); + } else { + + try { + ConfigChangeClusterSyncResponse response = configClusterRpcClientProxy + .syncConfigChange(member, syncRequest); + if (response == null || !response.isSuccess()) { + asyncTaskExecute(task); + } + } catch (Exception e) { + asyncTaskExecute(task); + } + } + } + + if (memberManager.getSelf().equals(member)) { + if (syncRequest.isBeta()) { + dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), + syncRequest.getLastModified(), NetUtils.localIP(), true); + } else { + dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), + syncRequest.getLastModified(), NetUtils.localIP()); + } + } + + } + } + } + + static class NotifySingleRpcTask extends NotifyTask { + + private Member member; + + private boolean isBeta; + + private String tag; + + public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified, + boolean isBeta, Member member) { + super(dataId, group, tenant, lastModified); + this.member = member; + this.isBeta = isBeta; + this.tag = tag; + } + } + private void asyncTaskExecute(NotifySingleTask task) { int delay = getDelayTime(task); Queue queue = new LinkedList(); @@ -165,6 +267,14 @@ public class AsyncNotifyService { ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS); } + private void asyncTaskExecute(NotifySingleRpcTask task) { + int delay = getDelayTime(task); + Queue queue = new LinkedList(); + queue.add(task); + AsyncRpcTask asyncTask = new AsyncRpcTask(queue); + ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS); + } + class AsyncNotifyCallBack implements FutureCallback { public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) { @@ -310,7 +420,7 @@ public class AsyncNotifyService { * @param task notify task * @return delay */ - private static int getDelayTime(NotifySingleTask task) { + private static int getDelayTime(NotifyTask task) { int failCount = task.getFailCount(); int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS; if (failCount <= MAX_COUNT) { @@ -325,4 +435,4 @@ public class AsyncNotifyService { private static final int MAX_COUNT = 6; -} \ No newline at end of file +} diff --git a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java index d82414d87..97a719e9f 100644 --- a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java +++ b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.console.controller; +import com.alibaba.nacos.core.remote.Connection; +import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.grpc.GrpcServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; @@ -38,7 +40,7 @@ import java.util.Map; public class ServerLoaderController { @Autowired - private GrpcServer grpcServer; + private ConnectionManager connectionManager; /** * Get server state of current server. @@ -48,7 +50,7 @@ public class ServerLoaderController { @GetMapping("/max") public ResponseEntity updateMaxClients(@RequestParam Integer count) { Map responseMap = new HashMap<>(3); - grpcServer.setMaxClientCount(count); + connectionManager.coordinateMaxClientsSmoth(count); return ResponseEntity.ok().body("success"); } @@ -60,7 +62,7 @@ public class ServerLoaderController { @GetMapping("/reload") public ResponseEntity reloadClients(@RequestParam Integer count) { Map responseMap = new HashMap<>(3); - grpcServer.reloadClient(count); + connectionManager.loadClientsSmoth(count); return ResponseEntity.ok().body("success"); } @@ -72,9 +74,19 @@ public class ServerLoaderController { @GetMapping("/current") public ResponseEntity currentCount() { Map responseMap = new HashMap<>(3); - int count = grpcServer.currentClients(); + int count = connectionManager.currentClientsCount(); return ResponseEntity.ok().body(count); } - + /** + * Get current clients. + * + * @return state json. + */ + @GetMapping("/all") + public ResponseEntity currentClients() { + Map responseMap = new HashMap<>(3); + Map stringConnectionMap = connectionManager.currentClients(); + return ResponseEntity.ok().body(stringConnectionMap); + } } diff --git a/console/src/main/resources/application.properties b/console/src/main/resources/application.properties index 7ef886769..f30a44cc8 100644 --- a/console/src/main/resources/application.properties +++ b/console/src/main/resources/application.properties @@ -92,7 +92,6 @@ management.metrics.export.influx.enabled=false #management.metrics.export.influx.consistency=one #management.metrics.export.influx.compressed=true - #*************** Access Log Related Configurations ***************# ### If turn on the access log: server.tomcat.accesslog.enabled=true @@ -102,8 +101,11 @@ server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D %{User-Agent}i %{Reque ### The directory of access log: server.tomcat.basedir= - - +#spring.datasource.platform=mysql +#db.num=1 +#db.url.0=jdbc:mysql://10.101.167.27:3306/acm?characterEncoding=utf8&connectTimeout=1000&socketTimeout=10000&autoReconnect=true +#db.user=root +#db.password=root #*************** Access Control Related Configurations ***************# ### If enable spring security, this option is deprecated in 1.2.0: #spring.security.enabled=false diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java index 577b9c303..2af1507d5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java @@ -38,9 +38,11 @@ public class MemberMetaDataConstants { public static final String VERSION = "version"; + public static final String SUPPORT_REMOTE_C_TYPE = "remoteConnectType"; + public static final String[] META_KEY_LIST = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, - LAST_REFRESH_TIME, VERSION}; + LAST_REFRESH_TIME, VERSION, SUPPORT_REMOTE_C_TYPE}; public static final String[] META_KEY_LIST_WITHOUT_LAST_REFRESH_TIME = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, - WEIGHT, VERSION}; + WEIGHT, VERSION, SUPPORT_REMOTE_C_TYPE}; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java index fa88a1603..fd098649b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.core.cluster; +import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.utils.ExceptionUtil; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Loggers; @@ -93,6 +94,22 @@ public class MemberUtils { return target; } + /** + * get support member connection type. + * + * @param member + * @return + */ + public static ConnectionType getSupportedConnectionType(Member member) { + Map extendInfo = member.getExtendInfo(); + if (extendInfo == null || !extendInfo.containsKey(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE)) { + return null; + } else { + String type = (String) extendInfo.get(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE); + return ConnectionType.getByType(type); + } + } + public static int calculateRaftPort(Member member) { return member.getPort() - 1000; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index f77c0a8b3..90910d901 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -28,6 +28,7 @@ import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ExceptionUtil; import com.alibaba.nacos.common.utils.VersionUtils; @@ -134,6 +135,7 @@ public class ServerMemberManager implements ApplicationListener members = serverMemberManager.allMembersWithoutSelf(); + refresh(members); + Loggers.CLUSTER + .warn("[ClusterRpcClientProxy] succss to refresh cluster rpc client on start up,members ={} ", + members); + } catch (NacosException e) { + Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage()); + } + + } + + /** + * init cluster rpc clients. + * + * @param members cluster server list member list. + */ + private void refresh(List members) throws NacosException { + + //ensure to create client of new members + for (Member member : members) { + ConnectionType supportedConnectionType = MemberUtils.getSupportedConnectionType(member); + if (supportedConnectionType != null) { + createRpcClientAndStart(member, supportedConnectionType); + } + } + + //shutdown and remove old members. + Set> allClientEntrys = RpcClientFactory.getAllClientEntrys(); + Iterator> iterator = allClientEntrys.iterator(); + List newMemberKeys = members.stream().map(a -> memberClientKey(a)).collect(Collectors.toList()); + while (iterator.hasNext()) { + Map.Entry next1 = iterator.next(); + if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) { + next1.getValue().shutdown(); + iterator.remove(); + } + } + + } + + private String memberClientKey(Member member) { + return "Cluster-" + member.getAddress(); + } + + private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException { + RpcClient client = RpcClientFactory.createClient(memberClientKey(member), type); + if (!client.getConnectionType().equals(type)) { + RpcClientFactory.destroyClient(memberClientKey(member)); + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); + client = RpcClientFactory.createClient(memberClientKey(member), type, labels); + } + + if (client.isWaitInited()) { + Loggers.CLUSTER.info("create a new rpc client to member - > : {}", member); + + //one fixed server + client.init(new ServerListFactory() { + @Override + public String genNextServer() { + return member.getAddress(); + } + + @Override + public String getCurrentServer() { + return member.getAddress(); + } + }); + + client.start(); + } + } + + /** + * send request to member. + * + * @param member + * @param request + * @return + * @throws NacosException + */ + public Response sendRequest(Member member, Request request) throws NacosException { + RpcClient client = RpcClientFactory.getClient(memberClientKey(member)); + if (client != null) { + Response response = client.request(request); + return response; + } else { + throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member); + } + } + + @Override + public void onEvent(MembersChangeEvent event) { + try { + List members = serverMemberManager.allMembersWithoutSelf(); + refresh(members); + } catch (NacosException e) { + Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client ", event, e.getMessage()); + } + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java index 00aa3b0bb..8a9b34880 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java @@ -16,10 +16,12 @@ package com.alibaba.nacos.core.remote; +import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.ServerPushRequest; import com.alibaba.nacos.api.remote.response.PushCallBack; +import org.apache.commons.lang3.builder.ToStringBuilder; -import java.util.concurrent.Future; +import java.util.Map; /** * Connection. @@ -96,15 +98,14 @@ public abstract class Connection { * * @param request request. */ - public abstract Future sendRequestWithFuture(ServerPushRequest request) throws Exception; + public abstract PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception; /** * Send response to this client that associated to this connection. * * @param request request. */ - public abstract void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) - throws Exception; + public abstract void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception; /** * Close this connection, if this connection is not active yet. @@ -131,5 +132,20 @@ public abstract class Connection { return metaInfo.connectionId; } + /** + * check if this connection is sdk source. + * + * @return if this connection is sdk source. + */ + public boolean isSdkSource() { + Map labels = metaInfo.labels; + String source = labels.get(RemoteConstants.LABEL_SOURCE); + return RemoteConstants.LABEL_SOURCE_SDK.equalsIgnoreCase(source); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index 5129aec67..0929a0872 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -151,6 +151,9 @@ public class ConnectionManager { List expireCLients = new LinkedList(); for (Map.Entry entry : entries) { Connection client = entry.getValue(); + if (!client.isSdkSource()) { + continue; + } long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp(); if (client.heartBeatExpire() && currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) { expireCLients.add(client.getConnectionId()); @@ -208,10 +211,14 @@ public class ConnectionManager { this.loadClient = loadClient; } - public int currentClients() { + public int currentClientsCount() { return connetions.size(); } + public Map currentClients() { + return connetions; + } + /** * expel all connections. */ diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionMetaInfo.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionMetaInfo.java index 66f4989b1..93e83fafa 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionMetaInfo.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionMetaInfo.java @@ -16,7 +16,11 @@ package com.alibaba.nacos.core.remote; +import org.apache.commons.lang3.builder.ToStringBuilder; + import java.util.Date; +import java.util.HashMap; +import java.util.Map; /** * ConnectionMetaInfo. @@ -56,13 +60,21 @@ public class ConnectionMetaInfo { */ long lastActiveTime; - public ConnectionMetaInfo(String connectionId, String clientIp, String connectType, String version) { + protected Map labels = new HashMap(); + + public String getLabel(String labelKey) { + return labels.get(labelKey); + } + + public ConnectionMetaInfo(String connectionId, String clientIp, String connectType, String version, + Map labels) { this.connectionId = connectionId; this.clientIp = clientIp; this.connectType = connectType; this.version = version; this.createTime = new Date(); this.lastActiveTime = System.currentTimeMillis(); + this.labels.putAll(labels); } /** @@ -172,4 +184,9 @@ public class ConnectionMetaInfo { public void setVersion(String version) { this.version = version; } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/DefaultPushFuture.java b/core/src/main/java/com/alibaba/nacos/core/remote/DefaultPushFuture.java new file mode 100644 index 000000000..f9db8eff3 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/DefaultPushFuture.java @@ -0,0 +1,146 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote; + +import com.alibaba.nacos.api.remote.response.PushCallBack; + +import java.util.concurrent.TimeoutException; + +/** + * default push future. + * + * @author liuzunfei + * @version $Id: DefaultPushFuture.java, v 0.1 2020年08月12日 7:10 PM liuzunfei Exp $ + */ +public class DefaultPushFuture implements PushFuture { + + private long timeStamp; + + private volatile boolean isDone = false; + + private boolean isSuccess; + + private PushCallBack pushCallBack; + + private Exception exception; + + private String requestId; + + /** + * Getter method for property pushCallBack. + * + * @return property value of pushCallBack + */ + public PushCallBack getPushCallBack() { + return pushCallBack; + } + + /** + * Getter method for property timeStamp. + * + * @return property value of timeStamp + */ + public long getTimeStamp() { + return timeStamp; + } + + public DefaultPushFuture() { + } + + public DefaultPushFuture(String requestId) { + this(requestId, null); + } + + public DefaultPushFuture(String requestId, PushCallBack pushCallBack) { + this.timeStamp = System.currentTimeMillis(); + this.pushCallBack = pushCallBack; + this.requestId = requestId; + } + + public void setSuccessResult() { + isDone = true; + isSuccess = true; + synchronized (this) { + notifyAll(); + } + + if (pushCallBack != null) { + if (isSuccess) { + pushCallBack.onSuccess(); + } + } + } + + public void setFailResult(Exception e) { + isDone = true; + isSuccess = false; + synchronized (this) { + notifyAll(); + } + + if (pushCallBack != null) { + if (isSuccess) { + pushCallBack.onFail(e); + } + } + } + + public String getRequestId() { + return this.requestId; + } + + @Override + public boolean isDone() { + return isDone; + } + + @Override + public boolean get() throws TimeoutException, InterruptedException { + synchronized (this) { + while (!isDone) { + wait(); + } + } + return isSuccess; + } + + @Override + public boolean get(long timeout) throws TimeoutException, InterruptedException { + if (timeout < 0) { + synchronized (this) { + while (!isDone) { + wait(); + } + } + } else if (timeout > 0) { + long end = System.currentTimeMillis() + timeout; + long waitTime = timeout; + synchronized (this) { + while (!isDone && waitTime > 0) { + wait(waitTime); + waitTime = end - System.currentTimeMillis(); + } + } + } + + if (isDone) { + return isSuccess; + } else { + throw new TimeoutException(); + } + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/PushFuture.java b/core/src/main/java/com/alibaba/nacos/core/remote/PushFuture.java new file mode 100644 index 000000000..f07ecd6b1 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/PushFuture.java @@ -0,0 +1,52 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote; + +import com.alipay.sofa.jraft.error.RemotingException; + +import java.util.concurrent.TimeoutException; + +/** + * push future. + * + * @author liuzunfei + * @version $Id: PushFuture.java, v 0.1 2020年08月12日 7:04 PM liuzunfei Exp $ + */ +public interface PushFuture { + + /** + * @return + */ + boolean isDone(); + + /** + * @return + * @throws TimeoutException + * @throws InterruptedException + */ + boolean get() throws TimeoutException, InterruptedException; + + /** + * @param timeout + * @return + * @throws TimeoutException + * @throws RemotingException + * @throws InterruptedException + */ + boolean get(long timeout) throws TimeoutException, InterruptedException; + +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RemoteConnectionEventListener.java b/core/src/main/java/com/alibaba/nacos/core/remote/RemoteConnectionEventListener.java new file mode 100644 index 000000000..481b29dfb --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RemoteConnectionEventListener.java @@ -0,0 +1,38 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote; + +import org.springframework.stereotype.Component; + +/** + * RemoteConnectionEventListener. + * @author liuzunfei + * @version $Id: RemoteConnectionEventListener.java, v 0.1 2020年08月10日 1:04 AM liuzunfei Exp $ + */ +@Component +public class RemoteConnectionEventListener extends ClientConnectionEventListener { + + @Override + public void clientConnected(Connection connect) { + + } + + @Override + public void clientDisConnected(Connection connect) { + RpcAckCallbackSynchronizer.clearContext(connect.getConnectionId()); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java new file mode 100644 index 000000000..098bf811e --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java @@ -0,0 +1,154 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote; + +import com.alibaba.nacos.core.utils.Loggers; +import com.alipay.hessian.clhm.ConcurrentLinkedHashMap; +import com.alipay.hessian.clhm.EvictionListener; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +/** + * serber push ack synchronier. + * + * @author liuzunfei + * @version $Id: RpcAckCallbackSynchronizer.java, v 0.1 2020年07月29日 7:56 PM liuzunfei Exp $ + */ +public class RpcAckCallbackSynchronizer { + + private static final long TIMEOUT = 60000L; + + static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + private static final Map> CALLBACK_CONTEXT2 = new ConcurrentLinkedHashMap.Builder>() + .maximumWeightedCapacity(30000).listener(new EvictionListener>() { + @Override + public void onEviction(String s, Map pushCallBack) { + + pushCallBack.entrySet().forEach(new Consumer>() { + @Override + public void accept(Map.Entry stringDefaultPushFutureEntry) { + stringDefaultPushFutureEntry.getValue().setFailResult(new TimeoutException()); + } + }); + } + }).build(); + + + private static final Map CALLBACK_CONTEXT = new ConcurrentLinkedHashMap.Builder() + .maximumWeightedCapacity(30000).listener(new EvictionListener() { + @Override + public void onEviction(String s, DefaultPushFuture pushCallBack) { + if (System.currentTimeMillis() - pushCallBack.getTimeStamp() > TIMEOUT) { + Loggers.CORE.warn("time out on eviction:" + pushCallBack.getRequestId()); + if (pushCallBack.getPushCallBack() != null) { + pushCallBack.getPushCallBack().onTimeout(); + } + } else { + pushCallBack.getPushCallBack().onFail(new RuntimeException("callback pool overlimit")); + } + } + }).build(); + + // static { + // executor.scheduleWithFixedDelay(new Runnable() { + // @Override + // public void run() { + // Set timeOutCalls = new HashSet<>(); + // long now = System.currentTimeMillis(); + // for (Map.Entry enrty : CALLBACK_CONTEXT.entrySet()) { + // if (now - enrty.getValue().getTimeStamp() > TIMEOUT) { + // timeOutCalls.add(enrty.getKey()); + // } + // } + // for (String ackId : timeOutCalls) { + // DefaultPushFuture remove = CALLBACK_CONTEXT.remove(ackId); + // if (remove != null) { + // Loggers.CORE.warn("time out on scheduler:" + ackId); + // if (remove.getPushCallBack() != null) { + // remove.getPushCallBack().onTimeout(); + // } + // } + // } + // } + // }, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS); + // } + + /** + * notify ackid. + */ + public static void ackNotify(String connectionId, String requestId, boolean success, Exception e) { + if (!CALLBACK_CONTEXT2.containsKey(connectionId)) { + return; + } + + Map stringDefaultPushFutureMap = CALLBACK_CONTEXT2.get(connectionId); + if (stringDefaultPushFutureMap.containsKey(requestId)) { + return; + } + + DefaultPushFuture currentCallback = stringDefaultPushFutureMap.get(requestId); + if (currentCallback == null) { + return; + } + + if (success) { + currentCallback.setSuccessResult(); + } else { + currentCallback.setFailResult(e); + } + } + + /** + * notify ackid. + */ + public static void syncCallback(String connectionId, String requestId, DefaultPushFuture defaultPushFuture) + throws Exception { + DefaultPushFuture pushCallBackPrev = CALLBACK_CONTEXT.putIfAbsent(requestId, defaultPushFuture); + if (pushCallBackPrev != null) { + throw new RuntimeException("callback conflict."); + } + } + + /** + * clear context of connectionId. + * + * @param connetionId connetionId + */ + public static void clearContext(String connetionId) { + + } + + /** + * clear context of connectionId. TODO + * + * @param connetionId connetionId + */ + public static void clearFuture(String connetionId, String requestId) { + + } + + +} + diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java index 38c0c1869..fa1fa9b50 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java @@ -23,8 +23,6 @@ import com.alibaba.nacos.core.utils.Loggers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.concurrent.Future; - /** * push response to clients. * @@ -53,7 +51,8 @@ public class RpcPushService { connectionManager.unregister(connectionId); return true; } catch (Exception e) { - Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId, + Loggers.RPC_DIGEST + .error("error to send push response to connectionId ={},push response={}", connectionId, request, e); return false; } @@ -68,7 +67,7 @@ public class RpcPushService { * @param connectionId connectionId. * @param request request. */ - public Future pushWithFuture(String connectionId, ServerPushRequest request) { + public PushFuture pushWithFuture(String connectionId, ServerPushRequest request) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { @@ -77,7 +76,8 @@ public class RpcPushService { } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); } catch (Exception e) { - Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId, + Loggers.RPC_DIGEST + .error("error to send push response to connectionId ={},push response={}", connectionId, request, e); } } @@ -99,7 +99,8 @@ public class RpcPushService { } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); } catch (Exception e) { - Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId, + Loggers.RPC_DIGEST + .error("error to send push response to connectionId ={},push response={}", connectionId, request, e); } } @@ -119,7 +120,8 @@ public class RpcPushService { } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); } catch (Exception e) { - Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId, + Loggers.RPC_DIGEST + .error("error to send push response to connectionId ={},push response={}", connectionId, request, e); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java index 9372d1481..85006449b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java @@ -16,8 +16,13 @@ package com.alibaba.nacos.core.remote; +import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.core.utils.Loggers; import org.springframework.beans.factory.annotation.Autowired; +import javax.annotation.PostConstruct; + /** * abstrat rpc server . * @@ -32,7 +37,41 @@ public abstract class RpcServer { /** * Start sever. */ - public abstract void start() throws Exception; + @PostConstruct + public void start() throws Exception { + + Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getConnectionType(), + (ApplicationUtils.getPort() + rpcPortOffset())); + + startServer(); + + Loggers.RPC.info("Nacos {} Rpc server started at port {}", getConnectionType(), + (ApplicationUtils.getPort() + rpcPortOffset())); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + Loggers.RPC.info("Nacos {} Rpc server stopping", getConnectionType()); + try { + RpcServer.this.stopServer(); + Loggers.RPC.info("Nacos {} Rpc server stopped successfully...", getConnectionType()); + } catch (Exception e) { + Loggers.RPC.error("Nacos {} Rpc server stopped fail...", getConnectionType(), e); + } + } + }); + } + + /** + * get connection type. + * + * @return + */ + public abstract ConnectionType getConnectionType(); + + /** + * Start sever. + */ + public abstract void startServer() throws Exception; /** * the increase offset of nacos server port for rpc server port. @@ -44,17 +83,23 @@ public abstract class RpcServer { /** * Stop Server. */ - public abstract void stop() throws Exception; - - public void setMaxClientCount(int maxClient) { - this.connectionManager.coordinateMaxClientsSmoth(maxClient); + public void stopServer() throws Exception { + Loggers.RPC.info("Nacos clear all rpc clients..."); + connectionManager.expelAll(); + try { + //wait clients to switch server. + Thread.sleep(2000L); + } catch (InterruptedException e) { + //Do nothing. + } + shundownServer(); } - public void reloadClient(int loadCount) { - this.connectionManager.loadClientsSmoth(loadCount); - } + /** + * the increase offset of nacos server port for rpc server port. + * + * @return + */ + public abstract void shundownServer(); - public int currentClients() { - return this.connectionManager.currentClients(); - } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java deleted file mode 100644 index 8fe28a4af..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright 1999-2020 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.remote.grpc; - -import com.alibaba.nacos.api.remote.response.PushCallBack; -import com.alibaba.nacos.core.utils.Loggers; -import com.alipay.hessian.clhm.ConcurrentLinkedHashMap; -import com.alipay.hessian.clhm.EvictionListener; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * serber push ack synchronier. - * - * @author liuzunfei - * @version $Id: GrpcAckSynchronizer.java, v 0.1 2020年07月29日 7:56 PM liuzunfei Exp $ - */ -public class GrpcAckSynchronizer { - - private static final Map ACK_WAITORS = new HashMap(); - - private static final long TIMEOUT = 60000L; - - static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - - private static final Map CALLBACK_CONTEXT = new ConcurrentLinkedHashMap.Builder() - .maximumWeightedCapacity(30000).listener(new EvictionListener() { - @Override - public void onEviction(String s, PushCallBackWraper pushCallBack) { - if (System.currentTimeMillis() - pushCallBack.getTimeStamp() > TIMEOUT && pushCallBack - .tryDeActive()) { - Loggers.CORE.warn("time out on eviction:" + pushCallBack.ackId); - pushCallBack.getPushCallBack().onTimeout(); - } else { - pushCallBack.getPushCallBack().onFail(new RuntimeException("callback pool overlimit")); - } - } - }).build(); - - static { - executor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - Set timeOutCalls = new HashSet<>(); - long now = System.currentTimeMillis(); - for (Map.Entry enrty : CALLBACK_CONTEXT.entrySet()) { - if (now - enrty.getValue().getTimeStamp() > TIMEOUT) { - timeOutCalls.add(enrty.getKey()); - } - } - for (String ackId : timeOutCalls) { - PushCallBackWraper remove = CALLBACK_CONTEXT.remove(ackId); - if (remove != null && remove.tryDeActive()) { - Loggers.CORE.warn("time out on scheduler:" + ackId); - remove.pushCallBack.onTimeout(); - } - } - } - }, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS); - } - - /** - * notify ackid. - * - * @param ackId ackId. - */ - public static void ackNotify(String ackId, boolean success) { - - PushCallBackWraper currentCallback = CALLBACK_CONTEXT.remove(ackId); - if (currentCallback != null && currentCallback.tryDeActive()) { - if (success) { - currentCallback.pushCallBack.onSuccess(); - } else { - currentCallback.pushCallBack.onFail(new RuntimeException("client return fail")); - } - } - - AckWaitor waiter = ACK_WAITORS.remove(ackId); - if (waiter != null) { - synchronized (waiter) { - waiter.setSuccess(success); - waiter.notify(); - } - } - - } - - /** - * notify ackid. - * - * @param ackId ackId. - */ - public static void release(String ackId) { - ACK_WAITORS.remove(ackId); - } - - /** - * notify ackid. - * - * @param ackId ackId. - */ - public static boolean waitAck(String ackId, long timeout) throws Exception { - AckWaitor waiter = ACK_WAITORS.get(ackId); - if (waiter != null) { - throw new RuntimeException("ackid conflict"); - } else { - AckWaitor lock = new AckWaitor(); - AckWaitor prev = ACK_WAITORS.putIfAbsent(ackId, lock); - if (prev == null) { - synchronized (lock) { - lock.wait(timeout); - return lock.success; - } - } else { - throw new RuntimeException("ackid conflict."); - } - } - } - - /** - * notify ackid. - * - * @param ackId ackId. - */ - public static void syncCallbackOnAck(String ackId, PushCallBack pushCallBack) throws Exception { - PushCallBackWraper pushCallBackPrev = CALLBACK_CONTEXT - .putIfAbsent(ackId, new PushCallBackWraper(pushCallBack, ackId)); - if (pushCallBackPrev != null) { - throw new RuntimeException("callback conflict."); - } - } - - static class AckWaitor { - - boolean success; - - /** - * Getter method for property success. - * - * @return property value of success - */ - public boolean isSuccess() { - return success; - } - - /** - * Setter method for property success. - * - * @param success value to be assigned to property success - */ - public void setSuccess(boolean success) { - this.success = success; - } - } - - static class PushCallBackWraper { - - long timeStamp; - - PushCallBack pushCallBack; - - String ackId; - - private AtomicBoolean active = new AtomicBoolean(true); - - public PushCallBackWraper(PushCallBack pushCallBack, String ackId) { - this.pushCallBack = pushCallBack; - this.ackId = ackId; - this.timeStamp = System.currentTimeMillis(); - } - - public boolean tryDeActive() { - return active.compareAndSet(true, false); - } - - /** - * Getter method for property timeStamp. - * - * @return property value of timeStamp - */ - public long getTimeStamp() { - return timeStamp; - } - - /** - * Getter method for property pushCallBack. - * - * @return property value of pushCallBack - */ - public PushCallBack getPushCallBack() { - return pushCallBack; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PushCallBackWraper that = (PushCallBackWraper) o; - return Objects.equals(ackId, that.ackId); - } - - @Override - public int hashCode() { - return Objects.hash(ackId); - } - } - -} - diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java index 38c1c0836..3ac3b9edd 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java @@ -21,16 +21,13 @@ import com.alibaba.nacos.api.remote.response.PushCallBack; import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; +import com.alibaba.nacos.core.remote.DefaultPushFuture; +import com.alibaba.nacos.core.remote.PushFuture; +import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer; import com.alibaba.nacos.core.utils.Loggers; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * grpc connection. * @@ -39,11 +36,6 @@ import java.util.concurrent.TimeUnit; */ public class GrpcConnection extends Connection { - static ThreadPoolExecutor pushWorkers = new ThreadPoolExecutor(10, 50, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(50000)); - - private static final long MAX_TIMEOUTS = 5000L; - private StreamObserver streamObserver; public GrpcConnection(ConnectionMetaInfo metaInfo, StreamObserver streamObserver) { @@ -57,54 +49,18 @@ public class GrpcConnection extends Connection { } @Override - public boolean sendRequest(ServerPushRequest request, long timeout) throws Exception { + public boolean sendRequest(ServerPushRequest request, long timeoutMills) throws Exception { + DefaultPushFuture pushFuture = (DefaultPushFuture) sendRequestWithFuture(request); try { - - Loggers.RPC_DIGEST.info("Grpc sendRequest :" + request); - - String requestId = String.valueOf(PushAckIdGenerator.getNextId()); - request.setRequestId(requestId); - streamObserver.onNext(GrpcUtils.convert(request, requestId)); - try { - return GrpcAckSynchronizer.waitAck(requestId, timeout); - } catch (Exception e) { - //Do nothing,return fail. - return false; - } finally { - GrpcAckSynchronizer.release(requestId); - } - } catch (Exception e) { - if (e instanceof StatusRuntimeException) { - //return true where client is not active yet. - return true; - } - throw e; - } - } - - private void sendRequestWithCallback(ServerPushRequest request, PushCallBack callBack) { - try { - Loggers.RPC_DIGEST.info("Grpc sendRequestWithCallback :" + request); - - String requestId = String.valueOf(PushAckIdGenerator.getNextId()); - request.setRequestId(requestId); - streamObserver.onNext(GrpcUtils.convert(request, requestId)); - GrpcAckSynchronizer.syncCallbackOnAck(requestId, callBack); - } catch (Exception e) { - if (e instanceof StatusRuntimeException) { - //return true where client is not active yet. - callBack.onSuccess(); - return; - } - callBack.onFail(e); + return pushFuture.get(timeoutMills); + } finally { + RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId()); } } @Override public void sendRequestNoAck(ServerPushRequest request) throws Exception { try { - - Loggers.RPC_DIGEST.info("Grpc sendRequestNoAck :" + request); streamObserver.onNext(GrpcUtils.convert(request, "")); } catch (Exception e) { if (e instanceof StatusRuntimeException) { @@ -115,36 +71,27 @@ public class GrpcConnection extends Connection { } @Override - public Future sendRequestWithFuture(ServerPushRequest request) throws Exception { - Loggers.RPC_DIGEST.info("Grpc sendRequestWithFuture :" + request); - return pushWorkers.submit(new PushCallable(request, MAX_TIMEOUTS)); + public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception { + return sendRequestInner(request, null); } @Override public void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception { - Loggers.RPC_DIGEST.info("Grpc sendRequestWithCallBack :" + request); - sendRequestWithCallback(request, callBack); + sendRequestInner(request, callBack); + } + + private DefaultPushFuture sendRequestInner(ServerPushRequest request, PushCallBack callBack) throws Exception { + Loggers.RPC_DIGEST.info("Grpc sendRequest :" + request); + String requestId = String.valueOf(PushAckIdGenerator.getNextId()); + request.setRequestId(requestId); + sendRequestNoAck(request); + DefaultPushFuture defaultPushFuture = new DefaultPushFuture(requestId, callBack); + RpcAckCallbackSynchronizer.syncCallback(getConnectionId(), requestId, defaultPushFuture); + return defaultPushFuture; } @Override public void closeGrapcefully() { } - class PushCallable implements Callable { - - private ServerPushRequest request; - - private long timeoutMills; - - public PushCallable(ServerPushRequest request, long timeoutMills) { - this.request = request; - this.timeoutMills = timeoutMills; - } - - @Override - public Boolean call() throws Exception { - return sendRequest(request, timeoutMills); - } - } - } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java index a91afbe5f..df6ff0a3f 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java @@ -29,6 +29,7 @@ import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.ServerCheckResponse; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.remote.ConnectionManager; +import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer; import com.alibaba.nacos.core.remote.RequestHandler; import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.utils.Loggers; @@ -62,9 +63,11 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase { return; } else if (RequestTypeConstants.PUSH_ACK.equals(type)) { // server push ack response. + String connectionId = grpcRequest.getMetadata().getConnectionId(); PushAckRequest request = JacksonUtils .toObj(grpcRequest.getBody().getValue().toStringUtf8(), PushAckRequest.class); - GrpcAckSynchronizer.ackNotify(request.getRequestId(), request.isSuccess()); + RpcAckCallbackSynchronizer + .ackNotify(connectionId, request.getRequestId(), request.isSuccess(), request.getException()); responseObserver.onNext(GrpcUtils.convert(new ServerCheckResponse())); responseObserver.onCompleted(); return; @@ -83,6 +86,7 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase { responseObserver.onCompleted(); return; } + connectionManager.refreshActiveTime(requestMeta.getConnectionId()); Response response = requestHandler.handle(request, requestMeta); responseObserver.onNext(GrpcUtils.convert(response)); responseObserver.onCompleted(); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java index 6d072bb0f..b47ca40bd 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java @@ -16,17 +16,28 @@ package com.alibaba.nacos.core.remote.grpc; -import com.alibaba.nacos.core.remote.ConnectionManager; +import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.remote.RpcServer; import com.alibaba.nacos.core.utils.ApplicationUtils; -import com.alibaba.nacos.core.utils.Loggers; +import io.grpc.Attributes; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Grpc; +import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerTransportFilter; +import io.grpc.internal.ServerStream; +import io.grpc.internal.ServerStreamHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; +import java.net.SocketAddress; +import java.util.UUID; /** * Grpc implementation as a rpc server. @@ -50,32 +61,35 @@ public class GrpcServer extends RpcServer { @Autowired private RequestHandlerRegistry requestHandlerRegistry; - @Autowired - private ConnectionManager connectionManager; - int grpcServerPort = ApplicationUtils.getPort() + rpcPortOffset(); private void init() { } - @PostConstruct @Override - public void start() throws Exception { - + public ConnectionType getConnectionType() { + return ConnectionType.GRPC; + } + + @Override + public void startServer() throws Exception { init(); server = ServerBuilder.forPort(grpcServerPort).addService(streamRequestHander).addService(requestHander) - .build(); + .addTransportFilter(new ServerTransportFilter() { + @Override + public Attributes transportReady(Attributes transportAttrs) { + System.out.println("transportReady:" + transportAttrs); + Attributes test = transportAttrs.toBuilder().set(key, UUID.randomUUID().toString()).build(); + return test; + } + + @Override + public void transportTerminated(Attributes transportAttrs) { + System.out.println("transportTerminated:" + transportAttrs); + super.transportTerminated(transportAttrs); + } + }).intercept(new ConnetionIntereptor()).build(); server.start(); - Loggers.RPC.info("Nacos gRPC server start successfully at port :" + grpcServerPort); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - Loggers.RPC.info("Nacos gRPC server stopping..."); - GrpcServer.this.stop(); - Loggers.RPC.info("Nacos gRPC server stopped successfully..."); - } - }); - } @Override @@ -84,17 +98,24 @@ public class GrpcServer extends RpcServer { } @Override - public void stop() { + public void shundownServer() { if (server != null) { - Loggers.RPC.info("Nacos clear all rpc clients..."); - connectionManager.expelAll(); - try { - //wait clients to switch server. - Thread.sleep(2000L); - } catch (InterruptedException e) { - //Do nothing. - } server.shutdown(); } } + + static final Attributes.Key key = Attributes.Key.create("conn_id"); + + static class ConnetionIntereptor implements ServerInterceptor { + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + Context ctx = Context.current(); + // System.out.println(build); + System.out.println(call.getAttributes().get(key).toString()); + return Contexts.interceptCall(Context.current(), call, headers, next); + + } + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java index eb9a86fd1..78cef7306 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java @@ -25,6 +25,7 @@ import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; +import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -43,12 +44,15 @@ public class GrpcStreamRequestHanderImpl extends RequestStreamGrpc.RequestStream @Override public void requestStream(GrpcRequest request, StreamObserver responseObserver) { + + Context current = Context.current(); + GrpcMetadata metadata = request.getMetadata(); String clientIp = metadata.getClientIp(); String connectionId = metadata.getConnectionId(); String version = metadata.getVersion(); ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp, ConnectionType.GRPC.getType(), - version); + version, metadata.getLabelsMap()); Connection connection = new GrpcConnection(metaInfo, responseObserver); if (connectionManager.isOverLimit()) { //Not register to the connection manager if current server is over limit. @@ -59,7 +63,9 @@ public class GrpcStreamRequestHanderImpl extends RequestStreamGrpc.RequestStream } } else { connectionManager.register(connectionId, connection); - + } } + + } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java index a1490f382..91ab7e3f6 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java @@ -24,15 +24,14 @@ import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.rsocket.RsocketUtils; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; +import com.alibaba.nacos.core.remote.PushFuture; import com.alibaba.nacos.core.utils.Loggers; import io.rsocket.Payload; import io.rsocket.RSocket; import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.Date; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -72,23 +71,12 @@ public class RsocketConnection extends Connection { } @Override - public Future sendRequestWithFuture(ServerPushRequest request) throws Exception { + public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception { Loggers.RPC_DIGEST.info("Rsocket sendRequestWithFuture :" + request); final Mono payloadMono = clientSocket .requestResponse(RsocketUtils.convertRequestToPayload(request, new RequestMeta())); - Future future = new Future() { - - private volatile boolean cancel = false; - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return cancel = true; - } - - @Override - public boolean isCancelled() { - return cancel; - } + + PushFuture defaultPushFuture = new PushFuture() { @Override public boolean isDone() { @@ -96,43 +84,53 @@ public class RsocketConnection extends Connection { } @Override - public Boolean get() throws InterruptedException, ExecutionException { + public boolean get() throws TimeoutException, InterruptedException { return payloadMono.block() == null; } @Override - public Boolean get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - - return payloadMono.block(Duration.ofMillis(unit.toMillis(timeout))) == null; + public boolean get(long timeout) throws TimeoutException, InterruptedException { + return payloadMono.block(Duration.ofMillis(timeout)) == null; } - }; - return future; + return defaultPushFuture; } @Override public void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception { Loggers.RPC_DIGEST.info("Rsocket sendRequestWithCallBack :" + request); + System.out.println(new Date() + "1"); Mono payloadMono = clientSocket .requestResponse(RsocketUtils.convertRequestToPayload(request, new RequestMeta())); payloadMono.subscribe(new Consumer() { + @Override public void accept(Payload payload) { Response response = RsocketUtils.parseResponseFromPayload(payload); + System.out.println(new Date().toString() + response); if (response.isSuccess()) { callBack.onSuccess(); } else { - callBack.onFail(new NacosException(response.getErrorCode(), "request fail")); + callBack.onFail(new NacosException(response.getErrorCode(), response.getMessage())); } } + }, new Consumer() { @Override public void accept(Throwable throwable) { callBack.onFail(new Exception(throwable)); } }); + try { + System.out.println(new Date() + "2"); + payloadMono.timeout(Duration.ofMillis(callBack.getTimeout())); + System.out.println(new Date() + "3"); + + } catch (Exception e) { + System.out.println("Timeout:" + e.getMessage()); + callBack.onTimeout(); + } } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java index 245eec9d4..3a4c1ea77 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java @@ -37,6 +37,7 @@ import com.alibaba.nacos.core.utils.Loggers; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -59,6 +60,8 @@ public class RsocketRpcServer extends RpcServer { private RSocketServer rSocketServer; + CloseableChannel closeChannel; + @Autowired private RequestHandlerRegistry requestHandlerRegistry; @@ -70,11 +73,15 @@ public class RsocketRpcServer extends RpcServer { return PORT_OFFSET; } - @PostConstruct @Override - public void start() throws Exception { + public void shundownServer() { + + } + + @Override + public void startServer() throws Exception { RSocketServer rSocketServerInner = RSocketServer.create(); - rSocketServerInner.acceptor(((setup, sendingSocket) -> { + closeChannel = rSocketServerInner.acceptor(((setup, sendingSocket) -> { Loggers.RPC.info("Receive connection rsocket:" + setup.getDataUtf8()); RsocketUtils.PlainRequest palinrequest = null; try { @@ -92,7 +99,7 @@ public class RsocketRpcServer extends RpcServer { .toObj(palinrequest.getBody(), ConnectionSetupRequest.class); ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionSetupRequest.getConnectionId(), connectionSetupRequest.getClientIp(), ConnectionType.RSOCKET.getType(), - connectionSetupRequest.getClientVersion()); + connectionSetupRequest.getClientVersion(), connectionSetupRequest.getLabels()); Connection connection = new RsocketConnection(metaInfo, sendingSocket); connectionManager.register(connection.getConnectionId(), connection); @@ -148,12 +155,18 @@ public class RsocketRpcServer extends RpcServer { })).bind(TcpServerTransport.create("0.0.0.0", (ApplicationUtils.getPort() + PORT_OFFSET))).block(); rSocketServer = rSocketServerInner; - Loggers.RPC.info("Nacos Rsocket server start on port :" + (ApplicationUtils.getPort() + PORT_OFFSET)); } @Override - public void stop() throws Exception { + public ConnectionType getConnectionType() { + return ConnectionType.RSOCKET; + } + @Override + public void stopServer() throws Exception { + if (this.closeChannel != null && !closeChannel.isDisposed()) { + this.closeChannel.dispose(); + } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java index 35b97a991..496b67555 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java @@ -20,8 +20,7 @@ import com.alibaba.nacos.api.remote.request.ServerPushRequest; import com.alibaba.nacos.api.remote.response.PushCallBack; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; - -import java.util.concurrent.Future; +import com.alibaba.nacos.core.remote.PushFuture; /** * Cluster connection. @@ -49,7 +48,7 @@ public class ClusterConnection extends Connection { } @Override - public Future sendRequestWithFuture(ServerPushRequest request) throws Exception { + public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception { return null; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java index 36be820ce..74b666c68 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java @@ -33,6 +33,7 @@ import com.alibaba.nacos.naming.remote.RemotingConnectionHolder; import com.google.common.collect.Lists; import org.springframework.stereotype.Component; +import java.util.HashMap; import java.util.List; /** @@ -79,7 +80,7 @@ public class ForwardInstanceRequestHandler extends RequestHandler()); remotingConnectionHolder.clientConnected(new ClusterConnection(metaInfo)); } }