diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigCommonRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/AbstractConfigRequest.java similarity index 94% rename from api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigCommonRequest.java rename to api/src/main/java/com/alibaba/nacos/api/config/remote/request/AbstractConfigRequest.java index 8ef4ee1c2..a03097ac0 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigCommonRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/AbstractConfigRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.remote.request.Request; * @author liuzunfei * @version $Id: ConfigCommonRequest.java, v 0.1 2020年07月13日 9:05 PM liuzunfei Exp $ */ -public abstract class ConfigCommonRequest extends Request { +public abstract class AbstractConfigRequest extends Request { @Override public String getModule() { diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java index 757b91112..99b1ecaef 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java @@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request; * @author liuzunfei * @version $Id: ConfigBatchListenRequest.java, v 0.1 2020年07月27日 7:46 PM liuzunfei Exp $ */ -public class ConfigBatchListenRequest extends ConfigCommonRequest { +public class ConfigBatchListenRequest extends AbstractConfigRequest { private static final String Y = "Y"; diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java index 611ca1644..ef677b104 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java @@ -25,7 +25,7 @@ import java.util.Map; * @author liuzunfei * @version $Id: ConfigPublishRequest.java, v 0.1 2020年07月16日 4:30 PM liuzunfei Exp $ */ -public class ConfigPublishRequest extends ConfigCommonRequest { +public class ConfigPublishRequest extends AbstractConfigRequest { String dataId; @@ -59,7 +59,7 @@ public class ConfigPublishRequest extends ConfigCommonRequest { */ public void putAdditonalParam(String key, String value) { if (additonMap == null) { - additonMap = new HashMap(); + additonMap = new HashMap(2); } additonMap.put(key, value); } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigQueryRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigQueryRequest.java index 672c728e3..cf131a97c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigQueryRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigQueryRequest.java @@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request; * @author liuzunfei * @version $Id: ConfigQueryRequest.java, v 0.1 2020年07月13日 9:06 PM liuzunfei Exp $ */ -public class ConfigQueryRequest extends ConfigCommonRequest { +public class ConfigQueryRequest extends AbstractConfigRequest { private String dataId; diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRemoveRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRemoveRequest.java index d8d07c3ec..1e3ab186c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRemoveRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigRemoveRequest.java @@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request; * @author liuzunfei * @version $Id: ConfigRemoveRequest.java, v 0.1 2020年07月16日 4:31 PM liuzunfei Exp $ */ -public class ConfigRemoveRequest extends ConfigCommonRequest { +public class ConfigRemoveRequest extends AbstractConfigRequest { String dataId; @@ -119,4 +119,4 @@ public class ConfigRemoveRequest extends ConfigCommonRequest { public void setTenant(String tenant) { this.tenant = tenant; } -} \ No newline at end of file +} diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java index a0f319003..7ee3b20d8 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java @@ -35,4 +35,8 @@ public class NamingRemoteConstants { public static final String NOTIFY_SUBSCRIBER = "notifySubscriber"; public static final String LIST_SERVICE = "listService"; + + public static final String FORWARD_INSTANCE = "forwardInstance"; + + public static final String FORWARD_HEART_BEAT = "forwardHeartBeat"; } diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NamingCommonRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/AbstractNamingRequest.java similarity index 89% rename from api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NamingCommonRequest.java rename to api/src/main/java/com/alibaba/nacos/api/naming/remote/request/AbstractNamingRequest.java index 5a2158e93..6961b5701 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/NamingCommonRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/AbstractNamingRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.remote.request.Request; * * @author liuzunfei */ -public abstract class NamingCommonRequest extends Request { +public abstract class AbstractNamingRequest extends Request { private String namespace; @@ -31,10 +31,10 @@ public abstract class NamingCommonRequest extends Request { private String groupName; - public NamingCommonRequest() { + public AbstractNamingRequest() { } - public NamingCommonRequest(String namespace, String serviceName, String groupName) { + public AbstractNamingRequest(String namespace, String serviceName, String groupName) { this.namespace = namespace; this.serviceName = serviceName; this.groupName = groupName; diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java index ee075e7d8..63cf96f4a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.pojo.Instance; * * @author xiweng.yy */ -public class InstanceRequest extends NamingCommonRequest { +public class InstanceRequest extends AbstractNamingRequest { private String type; diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceListRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceListRequest.java index 6b0645ed2..fac39e0f0 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceListRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceListRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; * * @author xiweng.yy */ -public class ServiceListRequest extends NamingCommonRequest { +public class ServiceListRequest extends AbstractNamingRequest { private int pageNo; diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceQueryRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceQueryRequest.java index 148e70d8d..d6371b674 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceQueryRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/ServiceQueryRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; * * @author xiweng.yy */ -public class ServiceQueryRequest extends NamingCommonRequest { +public class ServiceQueryRequest extends AbstractNamingRequest { private String cluster; diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java index 46b9738f1..a6be260e6 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java @@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; * * @author xiweng.yy */ -public class SubscribeServiceRequest extends NamingCommonRequest { +public class SubscribeServiceRequest extends AbstractNamingRequest { private boolean subscribe; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/connection/Connection.java b/api/src/main/java/com/alibaba/nacos/api/remote/connection/Connection.java index 39d41a8ce..6cda0195d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/connection/Connection.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/connection/Connection.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; * @author liuzunfei * @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $ */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class Connection { public static final String HEALTHY = "healthy"; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/connection/ConnectionType.java b/api/src/main/java/com/alibaba/nacos/api/remote/connection/ConnectionType.java index 57f689646..31401cf2e 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/connection/ConnectionType.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/connection/ConnectionType.java @@ -23,6 +23,9 @@ package com.alibaba.nacos.api.remote.connection; */ public enum ConnectionType { + /** + * gRPC connection. + */ GRPC("GRPC", "Grpc Connection"); String type; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/InternalRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/InternalRequest.java index 9db0e7b5c..c62b8af24 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/InternalRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/InternalRequest.java @@ -22,6 +22,7 @@ package com.alibaba.nacos.api.remote.request; * @author liuzunfei * @version $Id: InternalRequest.java, v 0.1 2020年07月22日 8:33 PM liuzunfei Exp $ */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class InternalRequest extends Request { @Override diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java index f1e6c020b..eba057618 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java @@ -24,6 +24,7 @@ import java.util.Map; * * @author liuzunfei */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class Request { private final Map headers = new HashMap(); diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java index c37c2326c..358712300 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder; * @author liuzunfei * @version $Id: Response.java, v 0.1 2020年07月13日 6:03 PM liuzunfei Exp $ */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class Response { int resultCode = ResponseCode.SUCCESS.getCode(); diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseCode.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseCode.java index e221cd956..f5c33817d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseCode.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/ResponseCode.java @@ -23,8 +23,14 @@ package com.alibaba.nacos.api.remote.response; */ public enum ResponseCode { + /** + * Request success. + */ SUCCESS(200, "response ok"), + /** + * Request failed. + */ FAIL(500, "response fail"); int code; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerPushResponse.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerPushResponse.java index 06dea689d..e98e41d4c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerPushResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerPushResponse.java @@ -22,6 +22,7 @@ package com.alibaba.nacos.api.remote.response; * @author liuzunfei * @version $Id: ServerPushResponse.java, v 0.1 2020年07月20日 1:21 PM liuzunfei Exp $ */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class ServerPushResponse extends Response { /** diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java index 36760c2b3..9630c36c8 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java @@ -56,10 +56,13 @@ public class NamingGrpcClientProxy implements NamingClientProxy { private final RpcClient rpcClient; + private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener; + public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { this.namespaceId = namespaceId; this.rpcClient = RpcClientFactory.getClient("naming"); + this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); start(serverListFactory, serviceInfoHolder); } @@ -67,6 +70,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy { rpcClient.init(serverListFactory); rpcClient.start(); rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder)); + rpcClient.registerConnectionListener(namingGrpcConnectionEventListener); } @Override @@ -76,6 +80,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy { InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); requestToServer(request, Response.class); + namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance); } @Override @@ -86,6 +91,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy { InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance); requestToServer(request, Response.class); + namingGrpcConnectionEventListener.removeInstanceForRedo(serviceName, groupName, instance); } @Override @@ -147,14 +153,17 @@ public class NamingGrpcClientProxy implements NamingClientProxy { SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters, true); SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); + namingGrpcConnectionEventListener.cacheSubscriberForRedo(serviceNameWithGroup, clusters); return response.getServiceInfo(); } @Override public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException { - SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, - NamingUtils.getGroupedName(serviceName, groupName), clusters, false); + String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); + SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters, + false); requestToServer(request, SubscribeServiceResponse.class); + namingGrpcConnectionEventListener.removeSubscriberForRedo(serviceNameWithGroup, clusters); } @Override diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java new file mode 100644 index 000000000..9e2afd8ec --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java @@ -0,0 +1,132 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.client.remote.ConnectionEventListener; +import com.alibaba.nacos.client.utils.LogUtils; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Naming client gprc connection event listener. + * + *

+ * When connection reconnect to server, redo the register and subscribe. + *

+ * + * @author xiweng.yy + */ +public class NamingGrpcConnectionEventListener implements ConnectionEventListener { + + private final NamingGrpcClientProxy clientProxy; + + private final ConcurrentMap> registeredInstanceCached = new ConcurrentHashMap>(); + + private final Set subscribes = new ConcurrentHashSet(); + + public NamingGrpcConnectionEventListener(NamingGrpcClientProxy clientProxy) { + this.clientProxy = clientProxy; + } + + @Override + public void onConnected() { + } + + @Override + public void onReconnected() { + redoSubscribe(); + redoRegisterEachService(); + } + + private void redoSubscribe() { + for (String each : subscribes) { + ServiceInfo serviceInfo = ServiceInfo.fromKey(each); + try { + clientProxy.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters()); + } catch (NacosException e) { + LogUtils.NAMING_LOGGER.warn(String.format("re subscribe service %s failed", serviceInfo.getName()), e); + } + } + } + + private void redoRegisterEachService() { + for (Map.Entry> each : registeredInstanceCached.entrySet()) { + String serviceName = NamingUtils.getServiceName(each.getKey()); + String groupName = NamingUtils.getGroupName(each.getKey()); + redoRegisterEachInstance(serviceName, groupName, each.getValue()); + } + } + + private void redoRegisterEachInstance(String serviceName, String groupName, Set instances) { + for (Instance each : instances) { + try { + clientProxy.registerService(serviceName, groupName, each); + } catch (NacosException e) { + LogUtils.NAMING_LOGGER + .warn(String.format("redo register for service %s@@%s failed", groupName, serviceName), e); + } + } + } + + @Override + public void onDisConnect() { + } + + /** + * Cache registered instance for redo. + * + * @param serviceName service name + * @param groupName group name + * @param instance registered instance + */ + public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) { + String key = NamingUtils.getGroupedName(serviceName, groupName); + registeredInstanceCached.putIfAbsent(key, new ConcurrentHashSet()); + registeredInstanceCached.get(key).add(instance); + } + + /** + * Remove registered instance for redo. + * + * @param serviceName service name + * @param groupName group name + * @param instance registered instance + */ + public void removeInstanceForRedo(String serviceName, String groupName, Instance instance) { + String key = NamingUtils.getGroupedName(serviceName, groupName); + Set instances = registeredInstanceCached.get(key); + if (null != instances) { + instances.remove(instance); + } + } + + public void cacheSubscriberForRedo(String fullServiceName, String cluster) { + subscribes.add(ServiceInfo.getKey(fullServiceName, cluster)); + } + + public void removeSubscriberForRedo(String fullServiceName, String cluster) { + subscribes.remove(ServiceInfo.getKey(fullServiceName, cluster)); + } +} diff --git a/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java b/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java index 95ec5c82f..597e7dd2c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java +++ b/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java @@ -23,11 +23,11 @@ import com.alibaba.nacos.api.grpc.GrpcResponse; import com.alibaba.nacos.api.grpc.RequestGrpc; import com.alibaba.nacos.api.grpc.RequestStreamGrpc; import com.alibaba.nacos.api.remote.ResponseRegistry; -import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.request.HeartBeatRequest; import com.alibaba.nacos.api.remote.request.PushAckRequest; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.ServerCheckRequest; +import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse; import com.alibaba.nacos.api.remote.response.PlainBodyResponse; import com.alibaba.nacos.api.remote.response.Response; @@ -101,18 +101,18 @@ public class GrpcClient extends RpcClient { ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext() .build(); - + RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp); boolean checkSucess = serverCheck(grpcServiceStubTemp); - + if (checkSucess) { return grpcServiceStubTemp; } else { shuntDownChannel(managedChannelTemp); return null; } - + } /** @@ -129,7 +129,7 @@ public class GrpcClient extends RpcClient { private void connectToServer() { rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING); - + GrpcServerInfo serverInfo = nextServer(); RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp, serverInfo.serverPort); @@ -142,8 +142,8 @@ public class GrpcClient extends RpcClient { bindRequestStream(requestStreamStubTemp); //switch current channel and stub channel = (ManagedChannel) newChannelStubTemp.getChannel(); - grpcStreamServiceStub = requestStreamStubTemp; grpcFutureServiceStub = grpcFutureServiceStubTemp; + grpcStreamServiceStub = requestStreamStubTemp; rpcClientStatus.set(RpcClientStatus.RUNNING); eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED)); notifyConnected(); @@ -154,7 +154,7 @@ public class GrpcClient extends RpcClient { @Override public void start() throws NacosException { - + if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) { LOGGER.error("RpcClient has not init yet, please check init ServerListFactory..."); throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet"); @@ -162,22 +162,22 @@ public class GrpcClient extends RpcClient { if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) { return; } - + connectToServer(); - + executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { sendBeat(); } }, 0, 3000, TimeUnit.MILLISECONDS); - + super.registerServerPushResponseHandler(new ServerPushResponseHandler() { @Override public void responseReply(Response response) { if (response instanceof ConnectResetResponse) { try { - + if (!isRunning()) { return; } @@ -189,7 +189,7 @@ public class GrpcClient extends RpcClient { } } }); - + eventExecutor.submit(new Runnable() { @Override public void run() { @@ -213,7 +213,7 @@ public class GrpcClient extends RpcClient { * switch a new server. */ private void switchServer(final boolean onStarting) { - + //try to get operate lock. boolean lockResult = lock.tryLock(); if (!lockResult) { @@ -248,7 +248,7 @@ public class GrpcClient extends RpcClient { if (newChannelStubTemp != null) { RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); - + bindRequestStream(requestStreamStubTemp); final ManagedChannel depratedChannel = channel; //switch current channel and stub @@ -278,10 +278,10 @@ public class GrpcClient extends RpcClient { * Send Heart Beat Request. */ public void sendBeat() { - + int maxRetryTimes = 3; while (maxRetryTimes > 0) { - + try { if (!isRunning()) { return; @@ -305,7 +305,7 @@ public class GrpcClient extends RpcClient { LOGGER.error("Send heart beat error, ", e); } } - + eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED)); LOGGER.warn("Max retry times for send heart beat fail reached,trying to switch server... "); switchServer(false); @@ -314,7 +314,7 @@ public class GrpcClient extends RpcClient { private GrpcMetadata buildMeta() { GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) .setVersion(ClientCommonUtils.VERSION).build(); - + return meta; } @@ -326,11 +326,11 @@ public class GrpcClient extends RpcClient { */ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) { try { - + ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()) - .setType(serverCheckRequest.getType()) - .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest))) + .setType(serverCheckRequest.getType()).setBody( + Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest))) .build()).build(); ListenableFuture responseFuture = requestBlockingStub.request(streamRequest); GrpcResponse response = responseFuture.get(); @@ -351,7 +351,7 @@ public class GrpcClient extends RpcClient { streamStub.requestStream(streamRequest, new StreamObserver() { @Override public void onNext(GrpcResponse grpcResponse) { - + LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse); try { sendAckResponse(grpcResponse.getAck(), true); @@ -367,14 +367,14 @@ public class GrpcClient extends RpcClient { myresponse.setBodyString(bodyString); response = myresponse; } - + serverPushResponseListeners.forEach(new Consumer() { @Override public void accept(ServerPushResponseHandler serverPushResponseHandler) { serverPushResponseHandler.responseReply(response); } }); - + } catch (Exception e) { e.printStackTrace(System.out); LOGGER.error("error tp process server push response :{}", grpcResponse); @@ -406,11 +406,11 @@ public class GrpcClient extends RpcClient { @Override public Response request(Request request) throws NacosException { - + int maxRetryTimes = 3; while (maxRetryTimes > 0) { try { - + GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType()) .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))) .build(); @@ -425,10 +425,10 @@ public class GrpcClient extends RpcClient { LOGGER.error("grpc client request error, retry...", e.getMessage(), e); } } - + LOGGER.warn("Max retry times for request fail reached."); throw new NacosException(NacosException.SERVER_ERROR, "Fail to request."); - + } private Response convertResponse(GrpcResponse grpcResponse) { diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 3c3efcabc..babf69f0f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -85,6 +85,7 @@ public class DefaultPublisher extends Thread implements EventPublisher { } } + @Override public long currentEventSize() { return queue.size(); } diff --git a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java index 1ddfdf66a..ef6c9563c 100644 --- a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java +++ b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java @@ -52,5 +52,4 @@ public class ServerLoaderController { return ResponseEntity.ok().body("success"); } - } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index 9df511b27..1d015d08a 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -75,7 +75,7 @@ public class ConnectionManager { if (remove != null) { remove.closeGrapcefully(); Loggers.GRPC.info(" connection unregistered successfully,connectionid = {} ", connectionId); - clientConnectionEventListenerRegistry.notifyClientConnected(remove); + clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/HeartBeatRequestHandler.java b/core/src/main/java/com/alibaba/nacos/core/remote/HeartBeatRequestHandler.java index 4532ed105..12e40b344 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/HeartBeatRequestHandler.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/HeartBeatRequestHandler.java @@ -23,7 +23,9 @@ import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.RequestTypeConstants; import com.alibaba.nacos.api.remote.response.HeartBeatResponse; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.remote.event.RemotingHeartBeatEvent; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -51,6 +53,7 @@ public class HeartBeatRequestHandler extends RequestHandler { public Response handle(Request request, RequestMeta meta) throws NacosException { String connectionId = meta.getConnectionId(); connectionManager.refreshActiveTime(connectionId); + NotifyCenter.publishEvent(new RemotingHeartBeatEvent(connectionId, meta.getClientIp(), meta.getClientVersion())); return new HeartBeatResponse(); } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/event/RemotingHeartBeatEvent.java b/core/src/main/java/com/alibaba/nacos/core/remote/event/RemotingHeartBeatEvent.java new file mode 100644 index 000000000..140478361 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/event/RemotingHeartBeatEvent.java @@ -0,0 +1,51 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote.event; + +import com.alibaba.nacos.common.notify.Event; + +/** + * Remoting connection heart beat event. + * + * @author xiweng.yy + */ +public class RemotingHeartBeatEvent extends Event { + + private final String connectionId; + + private final String clientIp; + + private final String clientVersion; + + public RemotingHeartBeatEvent(String connectionId, String clientIp, String clientVersion) { + this.connectionId = connectionId; + this.clientIp = clientIp; + this.clientVersion = clientVersion; + } + + public String getConnectionId() { + return connectionId; + } + + public String getClientIp() { + return clientIp; + } + + public String getClientVersion() { + return clientVersion; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClient.java new file mode 100644 index 000000000..bc3630130 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClient.java @@ -0,0 +1,38 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.response.Response; + +/** + * Cluster client. + * + * @author xiweng.yy + */ +public interface ClusterClient { + + /** + * Send request to target server. + * + * @param request request + * @return response + * @throws NacosException nacos exception + */ + Response request(Request request) throws NacosException; +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClientManager.java new file mode 100644 index 000000000..4c4846fc3 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterClientManager.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.ServerMemberManager; +import com.alibaba.nacos.naming.cluster.remote.grpc.GrpcClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Cluster client proxy. + * + * @author xiweng.yy + */ +@Component +public class ClusterClientManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterClientManager.class); + + private final ConcurrentMap clientMap = new ConcurrentHashMap<>(); + + private final ServerMemberManager memberManager; + + public ClusterClientManager(ServerMemberManager memberManager) { + this.memberManager = memberManager; + } + + /** + * Init cluster client manager. + */ + @PostConstruct + public void init() { + for (Member each : memberManager.allMembersWithoutSelf()) { + clientMap.put(each.getAddress(), new GrpcClient(each.getAddress())); + } + for (ClusterClient each : clientMap.values()) { + try { + ((GrpcClient) each).start(); + } catch (NacosException nacosException) { + LOGGER.error("Create cluster connection failed", nacosException); + } + } + } + + public boolean hasClientForMember(String memberAddress) { + return clientMap.containsKey(memberAddress); + } + + public ClusterClient getClusterClient(String memberAddress) { + return clientMap.getOrDefault(memberAddress, null); + } + + public Collection getAllClusterClient() { + return clientMap.values(); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java new file mode 100644 index 000000000..fdb4afc92 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java @@ -0,0 +1,60 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote; + +import com.alibaba.nacos.api.remote.connection.Connection; +import com.alibaba.nacos.api.remote.connection.ConnectionMetaInfo; +import com.alibaba.nacos.api.remote.response.PushCallBack; +import com.alibaba.nacos.api.remote.response.ServerPushResponse; + +import java.util.concurrent.Future; + +/** + * Cluster connection. + * + * @author xiweng.yy + */ +public class ClusterConnection extends Connection { + + public ClusterConnection(ConnectionMetaInfo metaInfo) { + super(metaInfo); + } + + @Override + public boolean sendPush(ServerPushResponse request, long timeoutMills) throws Exception { + return false; + } + + @Override + public boolean sendPushNoAck(ServerPushResponse request) throws Exception { + return false; + } + + @Override + public Future sendPushWithFuture(ServerPushResponse request) throws Exception { + return null; + } + + @Override + public void sendPushCallBackWithCallBack(ServerPushResponse request, PushCallBack callBack) throws Exception { + + } + + @Override + public void closeGrapcefully() { + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClient.java new file mode 100644 index 000000000..4eea64ee6 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClient.java @@ -0,0 +1,121 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.lifecycle.Closeable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; + +/** + * abstract remote client to connect to server. + * + * @author liuzunfei + * @version $Id: RpcClient.java, v 0.1 2020年07月13日 9:15 PM liuzunfei Exp $ + */ +public abstract class RpcClient implements Closeable, ClusterClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class); + + protected String connectionId; + + protected String target; + + protected AtomicReference rpcClientStatus = new AtomicReference( + RpcClientStatus.WAIT_INIT); + + protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("com.alibaba.nacos.client.config.grpc.worker"); + t.setDaemon(true); + return t; + } + }); + + public RpcClient() { + } + + public RpcClient(String target) { + init(target); + } + + /** + * init server list factory. + * + * @param target target address + */ + public void init(String target) { + if (!isWaitInited()) { + return; + } + this.connectionId = UUID.randomUUID().toString(); + rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITED); + this.target = target; + LOGGER.info("RpcClient init ,connectionId={}, target ={}", this.connectionId, target); + } + + /** + * check is this client is inited. + * + * @return true if is waiting init + */ + public boolean isWaitInited() { + return this.rpcClientStatus.get() == RpcClientStatus.WAIT_INIT; + } + + /** + * check is this client is running. + * + * @return true if is running + */ + public boolean isRunning() { + return this.rpcClientStatus.get() == RpcClientStatus.RUNNING; + } + + /** + * check is this client is in init status,have not start th client. + * + * @return true if is init + */ + public boolean isInitStatus() { + return this.rpcClientStatus.get() == RpcClientStatus.INITED; + } + + /** + * check is this client is in starting process. + * + * @return true if is starting + */ + public boolean isStarting() { + return this.rpcClientStatus.get() == RpcClientStatus.STARTING; + } + + /** + * Start this client. + * + * @throws NacosException nacos exception + */ + public abstract void start() throws NacosException; +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClientStatus.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClientStatus.java new file mode 100644 index 000000000..5dc4c7c52 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/RpcClientStatus.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote; + +/** + * RpcClientStatus. + * @author liuzunfei + * @version $Id: RpcClientStatus.java, v 0.1 2020年07月14日 3:49 PM liuzunfei Exp $ + */ +public enum RpcClientStatus { + + WAIT_INIT(0, "wait to init serverlist factory... "), + INITED(1, "server list factory is ready,wait to start"), + STARTING(2, "server list factory is ready,wait to start"), + RUNNING(3, "client is running..."), + SWITCHING_SERVER(5, "reconnecting..."); + + + int status; + + String desc; + + RpcClientStatus(int status, String desc) { + this.status = status; + this.desc = desc; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java new file mode 100644 index 000000000..60322764e --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java @@ -0,0 +1,373 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote.grpc; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.grpc.GrpcMetadata; +import com.alibaba.nacos.api.grpc.GrpcRequest; +import com.alibaba.nacos.api.grpc.GrpcResponse; +import com.alibaba.nacos.api.grpc.RequestGrpc; +import com.alibaba.nacos.api.grpc.RequestStreamGrpc; +import com.alibaba.nacos.api.remote.ResponseRegistry; +import com.alibaba.nacos.api.remote.request.HeartBeatRequest; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.ServerCheckRequest; +import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse; +import com.alibaba.nacos.api.remote.response.PlainBodyResponse; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.response.ResponseTypeConstants; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.naming.cluster.remote.RpcClient; +import com.alibaba.nacos.naming.cluster.remote.RpcClientStatus; +import com.alibaba.nacos.naming.misc.NetUtils; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * gRPC Client. + * + * @author liuzunfei + * @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $ + */ +public class GrpcClient extends RpcClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class); + + /** + * change listeners handler registry. + */ + protected List serverPushResponseListeners = new ArrayList(); + + protected ManagedChannel channel; + + protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub; + + protected RequestGrpc.RequestBlockingStub grpcServiceStub; + + public GrpcClient(String target) { + super(target); + } + + /** + * create a new channel . + * + * @param serverIp serverIp. + * @param serverPort serverPort. + * @return if server check success,return stub. + */ + private RequestGrpc.RequestBlockingStub createNewChannelStub(String serverIp, int serverPort) { + + ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext() + .build(); + + RequestGrpc.RequestBlockingStub grpcServiceStubTemp = RequestGrpc.newBlockingStub(managedChannelTemp); + boolean checkSuccess = serverCheck(grpcServiceStubTemp); + LOGGER.info(String.format("create cluster channel to %s:%d result %s", serverIp, serverPort, checkSuccess)); + + if (checkSuccess) { + return grpcServiceStubTemp; + } else { + shuntDownChannel(managedChannelTemp); + return null; + } + + } + + /** + * shutdown a channel. + * + * @param managedChannel channel to be shutdown. + */ + private void shuntDownChannel(ManagedChannel managedChannel) { + if (managedChannel != null && !managedChannel.isShutdown()) { + managedChannel.shutdownNow(); + } + } + + private void connectToServer() { + rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING); + GrpcServerInfo serverInfo = resolveServerInfo(target); + RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp, + serverInfo.serverPort); + if (newChannelStubTemp != null) { + RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc + .newStub(newChannelStubTemp.getChannel()); + bindRequestStream(requestStreamStubTemp); + //switch current channel and stub + channel = (ManagedChannel) newChannelStubTemp.getChannel(); + grpcStreamServiceStub = requestStreamStubTemp; + grpcServiceStub = newChannelStubTemp; + rpcClientStatus.set(RpcClientStatus.RUNNING); + } else { + switchServer(true); + } + } + + @Override + public void start() throws NacosException { + + if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) { + LOGGER.error("RpcClient has not init yet, please check init ServerListFactory..."); + throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet"); + } + if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) { + return; + } + + connectToServer(); + + executorService.scheduleWithFixedDelay(() -> sendBeat(), 0, 3000, TimeUnit.MILLISECONDS); + } + + /** + * switch a new server. + */ + private void switchServer(final boolean onStarting) { + + if (onStarting) { + // access on startup fail + rpcClientStatus.set(RpcClientStatus.SWITCHING_SERVER); + + } else { + // access from running status, sendbeat fail or receive reset message from server. + boolean changeStatusSuccess = rpcClientStatus + .compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.SWITCHING_SERVER); + if (!changeStatusSuccess) { + return; + } + } + + executorService.schedule(() -> { + // loop until start client success. + while (!isRunning()) { + + //1.get a new server + GrpcServerInfo serverInfo = resolveServerInfo(target); + + //2.get a new channel to new server + RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp, + serverInfo.serverPort); + if (newChannelStubTemp != null) { + RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc + .newStub(newChannelStubTemp.getChannel()); + bindRequestStream(requestStreamStubTemp); + final ManagedChannel depratedChannel = channel; + //switch current channel and stub + channel = (ManagedChannel) newChannelStubTemp.getChannel(); + grpcStreamServiceStub = requestStreamStubTemp; + grpcServiceStub = newChannelStubTemp; + rpcClientStatus.getAndSet(RpcClientStatus.RUNNING); + shuntDownChannel(depratedChannel); + continue; + } + try { + //sleep 3 second to switch next server. + Thread.sleep(3000L); + } catch (InterruptedException e) { + // Do nothing. + } + } + }, 0L, TimeUnit.MILLISECONDS); + + } + + /** + * Send Heart Beat Request. + */ + public void sendBeat() { + try { + + if (!isRunning()) { + return; + } + HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()) + .setType(heartBeatRequest.getType()).setBody( + Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))) + .build()).build(); + GrpcResponse response = grpcServiceStub.request(streamRequest); + if (ResponseTypeConstants.CONNECION_UNREGISTER.equals(response.getType())) { + LOGGER.warn("Send heart beat fail,connection is not registerd,trying to switch server "); + switchServer(false); + } + } catch (StatusRuntimeException e) { + if (Status.UNAVAILABLE.getCode().equals(e.getStatus().getCode())) { + LOGGER.warn("Send heart beat fail,server is not avaliable now,trying to switch server "); + switchServer(false); + return; + } + throw e; + } catch (Exception e) { + LOGGER.error("Send heart beat error, ", e); + } + } + + private GrpcMetadata buildMeta() { + GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localServer()) + .setVersion(UtilsAndCommons.SERVER_VERSION).build(); + + return meta; + } + + /** + * chenck server if ok. + * + * @param requestBlockingStub requestBlockingStub used to check server. + * @return + */ + private boolean serverCheck(RequestGrpc.RequestBlockingStub requestBlockingStub) { + try { + + ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()) + .setType(serverCheckRequest.getType()).setBody( + Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest))) + .build()).build(); + GrpcResponse response = requestBlockingStub.request(streamRequest); + return response != null; + } catch (Exception e) { + return false; + } + } + + /** + * bind request stream observer (send a connection). + * + * @param streamStub streamStub to bind. + */ + private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) { + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build(); + LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest); + streamStub.requestStream(streamRequest, new StreamObserver() { + @Override + public void onNext(GrpcResponse grpcResponse) { + + LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse); + try { + + String message = grpcResponse.getBody().getValue().toStringUtf8(); + String type = grpcResponse.getType(); + String bodyString = grpcResponse.getBody().getValue().toStringUtf8(); + Class classByType = ResponseRegistry.getClassByType(type); + final Response response; + if (classByType != null) { + response = (Response) JacksonUtils.toObj(bodyString, classByType); + } else { + PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class); + myresponse.setBodyString(bodyString); + response = myresponse; + } + serverPushResponseListeners + .forEach(serverPushResponseHandler -> serverPushResponseHandler.responseReply(response)); + } catch (Exception e) { + LOGGER.error("error tp process server push response :{}", grpcResponse); + } + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }); + } + + @Override + public Response request(Request request) throws NacosException { + + if (!this.isRunning()) { + throw new IllegalStateException("Client is not connected to any server now,please retry later"); + } + try { + + GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType()) + .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build(); + GrpcResponse response = grpcServiceStub.request(grpcrequest); + String type = response.getType(); + String bodyString = response.getBody().getValue().toStringUtf8(); + + // transfrom grpcResponse to response model + Class classByType = ResponseRegistry.getClassByType(type); + if (classByType != null) { + Object object = JacksonUtils.toObj(bodyString, classByType); + if (object instanceof ConnectionUnregisterResponse) { + switchServer(false); + throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "connection is not connected."); + } + return (Response) object; + } else { + PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class); + myresponse.setBodyString(bodyString); + return (PlainBodyResponse) myresponse; + } + } catch (StatusRuntimeException e) { + if (Status.UNAVAILABLE.equals(e.getStatus())) { + LOGGER.warn("request fail,server is not avaliable now,trying to switch server "); + switchServer(false); + } + throw e; + } catch (Exception e) { + LOGGER.error("grpc client request error, error message is ", e.getMessage(), e); + throw new NacosException(NacosException.SERVER_ERROR, e); + } + } + + @Override + public void shutdown() throws NacosException { + if (this.channel != null && !this.channel.isShutdown()) { + this.channel.shutdownNow(); + } + } + + private GrpcServerInfo resolveServerInfo(String serverAddress) { + GrpcServerInfo serverInfo = new GrpcServerInfo(); + serverInfo.serverPort = 1000; + if (serverAddress.contains("http")) { + serverInfo.serverIp = serverAddress.split(":")[1].replaceAll("//", ""); + serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", "")); + } else { + serverInfo.serverIp = serverAddress.split(":")[0]; + serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[1]); + } + return serverInfo; + } + + class GrpcServerInfo { + + String serverIp; + + int serverPort; + + } +} + + + diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/ServerPushResponseHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/ServerPushResponseHandler.java new file mode 100644 index 000000000..896d25dd7 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/ServerPushResponseHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote.grpc; + +import com.alibaba.nacos.api.remote.response.Response; + +/** + * ServerPushResponseHandler. + * + * @author liuzunfei + * @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $ + */ +public interface ServerPushResponseHandler { + + /** + * Handle logic when response received. + * @param response response + */ + void responseReply(Response response); + +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/AbstractClusterRequest.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/AbstractClusterRequest.java new file mode 100644 index 000000000..5aaf3a54a --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/AbstractClusterRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote.request; + +import com.alibaba.nacos.api.remote.request.Request; + +/** + * Cluster request. + * + * @author xiweng.yy + */ +public abstract class AbstractClusterRequest extends Request { + + @Override + public String getModule() { + return "cluster"; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardHeartBeatRequest.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardHeartBeatRequest.java new file mode 100644 index 000000000..a8a5be66f --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardHeartBeatRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote.request; + +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; + +/** + * Forward heart beat request. + * + * @author xiweng.yy + */ +public class ForwardHeartBeatRequest extends AbstractClusterRequest { + + private String connectionId; + + public ForwardHeartBeatRequest() { + } + + public ForwardHeartBeatRequest(String connectionId) { + this.connectionId = connectionId; + } + + @Override + public String getType() { + return NamingRemoteConstants.FORWARD_HEART_BEAT; + } + + public String getConnectionId() { + return connectionId; + } + + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardInstanceRequest.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardInstanceRequest.java new file mode 100644 index 000000000..c41153a22 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/request/ForwardInstanceRequest.java @@ -0,0 +1,62 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.cluster.remote.request; + +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.naming.remote.request.InstanceRequest; +import com.alibaba.nacos.api.remote.request.RequestMeta; + +/** + * Forward instance request. + * + * @author xiweng.yy + */ +public class ForwardInstanceRequest extends AbstractClusterRequest { + + private InstanceRequest instanceRequest; + + private RequestMeta sourceRequestMeta; + + public ForwardInstanceRequest() { + } + + public ForwardInstanceRequest(InstanceRequest instanceRequest, RequestMeta sourceRequestMeta) { + this.instanceRequest = instanceRequest; + this.sourceRequestMeta = sourceRequestMeta; + } + + @Override + public String getType() { + return NamingRemoteConstants.FORWARD_INSTANCE; + } + + public InstanceRequest getInstanceRequest() { + return instanceRequest; + } + + public void setInstanceRequest(InstanceRequest instanceRequest) { + this.instanceRequest = instanceRequest; + } + + public RequestMeta getSourceRequestMeta() { + return sourceRequestMeta; + } + + public void setSourceRequestMeta(RequestMeta sourceRequestMeta) { + this.sourceRequestMeta = sourceRequestMeta; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java index e840c0a89..c43110b44 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java @@ -127,6 +127,10 @@ public class GlobalExecutor { .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), new NameThreadFactory("com.alibaba.nacos.naming.nacos-server-performance")); + private static final ScheduledExecutorService REMOTE_CONNECTION_EXECUTOR = ExecutorFactory.Managed + .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), + new NameThreadFactory("com.alibaba.nacos.naming.remote-connection-manager")); + public static void submitDataSync(Runnable runnable, long delay) { DATA_SYNC_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS); } @@ -241,4 +245,9 @@ public class GlobalExecutor { public static void schedulePerformanceLogger(Runnable runnable, long initialDelay, long delay, TimeUnit unit) { SERVER_PERFORMANCE_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit); } + + public static void scheduleRemoteConnectionManager(Runnable runnable, long initialDelay, long delay, + TimeUnit unit) { + REMOTE_CONNECTION_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnection.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnection.java index 6e31120a4..00b805bbc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnection.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnection.java @@ -40,8 +40,11 @@ public class RemotingConnection { private final Connection connection; + private long lastHeartBeatTime; + public RemotingConnection(Connection connection) { this.connection = connection; + this.lastHeartBeatTime = System.currentTimeMillis(); } public String getConnectionId() { @@ -115,4 +118,12 @@ public class RemotingConnection { public ConcurrentMap> getInstanceIndex() { return instanceIndex; } + + public long getLastHeartBeatTime() { + return lastHeartBeatTime; + } + + public void setLastHeartBeatTime(long lastHeartBeatTime) { + this.lastHeartBeatTime = lastHeartBeatTime; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnectionHolder.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnectionHolder.java index 07e7580d2..121e61c82 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnectionHolder.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/RemotingConnectionHolder.java @@ -16,19 +16,29 @@ package com.alibaba.nacos.naming.remote; +import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.connection.Connection; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.core.remote.ClientConnectionEventListener; -import com.alibaba.nacos.naming.consistency.KeyBuilder; -import com.alibaba.nacos.naming.core.Instance; +import com.alibaba.nacos.core.remote.event.RemotingHeartBeatEvent; +import com.alibaba.nacos.naming.cluster.remote.ClusterClient; +import com.alibaba.nacos.naming.cluster.remote.ClusterClientManager; +import com.alibaba.nacos.naming.cluster.remote.request.ForwardHeartBeatRequest; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.push.RemotePushService; +import com.alibaba.nacos.naming.remote.task.RenewInstanceBeatTask; +import com.alibaba.nacos.naming.remote.worker.RemotingWorkersManager; import org.springframework.stereotype.Component; -import java.util.Set; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; /** * Remoting connection holder. @@ -44,35 +54,112 @@ public class RemotingConnectionHolder extends ClientConnectionEventListener { private final ServiceManager serviceManager; - public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager) { + public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager, + ClusterClientManager clusterClientManager) { this.remotePushService = remotePushService; this.serviceManager = serviceManager; + NotifyCenter.registerSubscriber(new RemotingHeartBeatSubscriber(this, clusterClientManager)); + GlobalExecutor.scheduleRemoteConnectionManager(new RemotingConnectionCleaner(this), 0, + Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS); } @Override public void clientConnected(Connection connect) { - connectionCache.put(connect.getConnectionId(), new RemotingConnection(connect)); + Loggers.SRV_LOG.info("Client connection {} connect", connect.getConnectionId()); + if (!connectionCache.containsKey(connect.getConnectionId())) { + connectionCache.put(connect.getConnectionId(), new RemotingConnection(connect)); + } } @Override public void clientDisConnected(Connection connect) { - RemotingConnection remotingConnection = connectionCache.remove(connect.getConnectionId()); - try { - for (String each : remotingConnection.getInstanceIndex().keySet()) { - Set instances = remotingConnection.getInstanceIndex().get(each); - serviceManager.removeInstance(KeyBuilder.getNamespace(each), KeyBuilder.getServiceName(each), true, - instances.toArray(new Instance[instances.size()])); - } - for (String each : remotingConnection.getSubscriberIndex().keySet()) { - remotePushService.removeAllSubscribeForService(each); - } - } catch (NacosException e) { - Loggers.SRV_LOG - .error(String.format("Remove context of connection %s failed", connect.getConnectionId()), e); + clientDisConnected(connect.getConnectionId()); + } + + private void clientDisConnected(String connectionId) { + Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", connectionId); + RemotingConnection remotingConnection = connectionCache.remove(connectionId); + if (null == remotingConnection) { + return; + } + for (String each : remotingConnection.getSubscriberIndex().keySet()) { + remotePushService.removeAllSubscribeForService(each); } } public RemotingConnection getRemotingConnection(String connectionId) { return connectionCache.get(connectionId); } + + public Collection getAllConnectionId() { + return connectionCache.keySet(); + } + + /** + * Renew remoting connection. + * + * @param connectionId connection id + */ + public void renewRemotingConnection(String connectionId) { + if (!connectionCache.containsKey(connectionId)) { + return; + } + RemotingConnection remotingConnection = connectionCache.get(connectionId); + remotingConnection.setLastHeartBeatTime(System.currentTimeMillis()); + RemotingWorkersManager.dispatch(connectionId, new RenewInstanceBeatTask(remotingConnection, serviceManager)); + } + + private static class RemotingHeartBeatSubscriber extends Subscriber { + + private final RemotingConnectionHolder remotingConnectionHolder; + + private final ClusterClientManager clusterClientManager; + + public RemotingHeartBeatSubscriber(RemotingConnectionHolder remotingConnectionHolder, + ClusterClientManager clusterClientManager) { + this.remotingConnectionHolder = remotingConnectionHolder; + this.clusterClientManager = clusterClientManager; + } + + @Override + public void onEvent(RemotingHeartBeatEvent event) { + remotingConnectionHolder.renewRemotingConnection(event.getConnectionId()); + for (ClusterClient each : clusterClientManager.getAllClusterClient()) { + try { + each.request(new ForwardHeartBeatRequest(event.getConnectionId())); + } catch (NacosException nacosException) { + Loggers.DISTRO.warn("Forward heart beat failed.", nacosException); + } + } + } + + @Override + public Class subscribeType() { + return RemotingHeartBeatEvent.class; + } + } + + private static class RemotingConnectionCleaner implements Runnable { + + private final RemotingConnectionHolder remotingConnectionHolder; + + public RemotingConnectionCleaner(RemotingConnectionHolder remotingConnectionHolder) { + this.remotingConnectionHolder = remotingConnectionHolder; + } + + @Override + public void run() { + long currentTime = System.currentTimeMillis(); + for (String each : remotingConnectionHolder.getAllConnectionId()) { + RemotingConnection remotingConnection = remotingConnectionHolder.getRemotingConnection(each); + if (null != remotingConnection && isExpireConnection(currentTime, remotingConnection)) { + remotingConnectionHolder.clientDisConnected(each); + } + } + } + + private boolean isExpireConnection(long currentTime, RemotingConnection remotingConnection) { + return remotingConnection.getLastHeartBeatTime() - currentTime > Constants.DEFAULT_IP_DELETE_TIMEOUT * 2; + } + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardHeartBeatRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardHeartBeatRequestHandler.java new file mode 100644 index 000000000..d126c7470 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardHeartBeatRequestHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.handler; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.response.HeartBeatResponse; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.naming.cluster.remote.request.ForwardHeartBeatRequest; +import com.alibaba.nacos.naming.remote.RemotingConnectionHolder; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Forward heart beat request handler. + * + * @author xiweng.yy + */ +@Component +public class ForwardHeartBeatRequestHandler extends RequestHandler { + + private final RemotingConnectionHolder remotingConnectionHolder; + + public ForwardHeartBeatRequestHandler(RemotingConnectionHolder remotingConnectionHolder) { + this.remotingConnectionHolder = remotingConnectionHolder; + } + + @Override + public ForwardHeartBeatRequest parseBodyString(String bodyString) { + return JacksonUtils.toObj(bodyString, ForwardHeartBeatRequest.class); + } + + @Override + public Response handle(Request request, RequestMeta meta) throws NacosException { + remotingConnectionHolder.renewRemotingConnection(((ForwardHeartBeatRequest) request).getConnectionId()); + return new HeartBeatResponse(); + } + + @Override + public List getRequestTypes() { + return Lists.newArrayList(NamingRemoteConstants.FORWARD_HEART_BEAT); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java new file mode 100644 index 000000000..f61ab8e92 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ForwardInstanceRequestHandler.java @@ -0,0 +1,91 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.handler; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.naming.remote.request.InstanceRequest; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.api.remote.connection.ConnectionMetaInfo; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.naming.cluster.remote.ClusterConnection; +import com.alibaba.nacos.naming.cluster.remote.request.ForwardInstanceRequest; +import com.alibaba.nacos.naming.core.DistroMapper; +import com.alibaba.nacos.naming.remote.RemotingConnectionHolder; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Forward instance request handler. + * + * @author xiweng.yy + */ +@Component +public class ForwardInstanceRequestHandler extends RequestHandler { + + private final InstanceRequestHandler instanceRequestHandler; + + private final RemotingConnectionHolder remotingConnectionHolder; + + private final DistroMapper distroMapper; + + public ForwardInstanceRequestHandler(InstanceRequestHandler instanceRequestHandler, + RemotingConnectionHolder remotingConnectionHolder, DistroMapper distroMapper) { + this.instanceRequestHandler = instanceRequestHandler; + this.remotingConnectionHolder = remotingConnectionHolder; + this.distroMapper = distroMapper; + } + + @Override + public ForwardInstanceRequest parseBodyString(String bodyString) { + return JacksonUtils.toObj(bodyString, ForwardInstanceRequest.class); + } + + @Override + public Response handle(Request request, RequestMeta meta) throws NacosException { + ForwardInstanceRequest actualRequest = (ForwardInstanceRequest) request; + InstanceRequest instanceRequest = actualRequest.getInstanceRequest(); + String serviceName = NamingUtils + .getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName()); + if (distroMapper.responsible(serviceName)) { + RequestMeta sourceRequestMeta = actualRequest.getSourceRequestMeta(); + addRemotingConnectionIfAbsent(sourceRequestMeta); + return instanceRequestHandler.handle(instanceRequest, sourceRequestMeta); + } + throw new NacosException(NacosException.BAD_GATEWAY, + String.format("Forward instance request to error server, service: %s", serviceName)); + } + + private void addRemotingConnectionIfAbsent(RequestMeta sourceRequestMeta) { + if (null == remotingConnectionHolder.getRemotingConnection(sourceRequestMeta.getConnectionId())) { + ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(sourceRequestMeta.getConnectionId(), + sourceRequestMeta.getClientIp(), "cluster", sourceRequestMeta.getClientVersion()); + remotingConnectionHolder.clientConnected(new ClusterConnection(metaInfo)); + } + } + + @Override + public List getRequestTypes() { + return Lists.newArrayList(NamingRemoteConstants.FORWARD_INSTANCE); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java index 6bb4cdef8..b563966a4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java @@ -27,6 +27,9 @@ import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.naming.cluster.remote.ClusterClientManager; +import com.alibaba.nacos.naming.cluster.remote.request.ForwardInstanceRequest; +import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.ServiceManager; import com.alibaba.nacos.naming.misc.Loggers; @@ -49,9 +52,16 @@ public class InstanceRequestHandler extends RequestHandler { private final RemotingConnectionHolder remotingConnectionHolder; - public InstanceRequestHandler(ServiceManager serviceManager, RemotingConnectionHolder remotingConnectionHolder) { + private final ClusterClientManager clusterClientManager; + + private final DistroMapper distroMapper; + + public InstanceRequestHandler(ServiceManager serviceManager, RemotingConnectionHolder remotingConnectionHolder, + ClusterClientManager clusterClientManager, DistroMapper distroMapper) { this.serviceManager = serviceManager; this.remotingConnectionHolder = remotingConnectionHolder; + this.clusterClientManager = clusterClientManager; + this.distroMapper = distroMapper; } @Override @@ -62,20 +72,40 @@ public class InstanceRequestHandler extends RequestHandler { @Override public Response handle(Request request, RequestMeta meta) throws NacosException { InstanceRequest instanceRequest = (InstanceRequest) request; - String namespace = instanceRequest.getNamespace(); String serviceName = NamingUtils .getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName()); - switch (instanceRequest.getType()) { + if (distroMapper.responsible(serviceName)) { + return handleResponsibleRequest(serviceName, instanceRequest, meta); + } else { + return forwardRequestToResponsibleServer(serviceName, instanceRequest, meta); + } + } + + private Response handleResponsibleRequest(String serviceName, InstanceRequest request, RequestMeta meta) + throws NacosException { + String namespace = request.getNamespace(); + switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: - return registerInstance(namespace, serviceName, instanceRequest, meta); + return registerInstance(namespace, serviceName, request, meta); case NamingRemoteConstants.DE_REGISTER_INSTANCE: - return deregisterInstance(namespace, serviceName, instanceRequest, meta); + return deregisterInstance(namespace, serviceName, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, - String.format("Unsupported request type %s", instanceRequest.getType())); + String.format("Unsupported request type %s", request.getType())); } } + private Response forwardRequestToResponsibleServer(String serviceName, InstanceRequest request, RequestMeta meta) + throws NacosException { + String targetAddress = distroMapper.mapSrv(serviceName); + if (clusterClientManager.hasClientForMember(targetAddress)) { + return clusterClientManager.getClusterClient(targetAddress) + .request(new ForwardInstanceRequest(request, meta)); + } + throw new NacosException(NacosException.BAD_GATEWAY, + String.format("Can't find responsible server for service %s", serviceName)); + } + private Response registerInstance(String namespace, String serviceName, InstanceRequest instanceRequest, RequestMeta meta) throws NacosException { if (!serviceManager.containService(namespace, serviceName)) { @@ -86,8 +116,6 @@ public class InstanceRequestHandler extends RequestHandler { instance.setInstanceId(instance.generateInstanceId()); instance.setLastBeat(System.currentTimeMillis()); // Register instance by connection, do not need keep alive by beat. - instance.setMarked(true); - instance.validate(); serviceManager.addInstance(namespace, serviceName, instance.isEphemeral(), instance); remotingConnectionHolder.getRemotingConnection(meta.getConnectionId()) .addNewInstance(namespace, serviceName, instance); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/task/RenewInstanceBeatTask.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/task/RenewInstanceBeatTask.java new file mode 100644 index 000000000..db4fa42f5 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/task/RenewInstanceBeatTask.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.task; + +import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.core.Instance; +import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.remote.RemotingConnection; + +import java.util.Set; + +/** + * Renew instance beat task. + * + * @author xiweng.yy + */ +public class RenewInstanceBeatTask implements Runnable { + + private final RemotingConnection remotingConnection; + + private final ServiceManager serviceManager; + + public RenewInstanceBeatTask(RemotingConnection remotingConnection, ServiceManager serviceManager) { + this.remotingConnection = remotingConnection; + this.serviceManager = serviceManager; + } + + @Override + public void run() { + for (String each : remotingConnection.getInstanceIndex().keySet()) { + Set instances = remotingConnection.getInstanceIndex().get(each); + Service service = serviceManager.getService(KeyBuilder.getNamespace(each), KeyBuilder.getServiceName(each)); + for (Instance actual : service.allIPs(true)) { + if (instances.contains(actual)) { + actual.setHealthy(true); + actual.setLastBeat(System.currentTimeMillis()); + } + } + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorker.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorker.java new file mode 100644 index 000000000..576faca2c --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorker.java @@ -0,0 +1,115 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.worker; + +import com.alibaba.nacos.common.lifecycle.Closeable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * Remoting worker. + * + * @author xiweng.yy + */ +public final class RemotingWorker implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorker.class); + + private static final String SEPARATOR = "_"; + + private static final int QUEUE_CAPACITY = 50000; + + private final BlockingQueue queue; + + private final String name; + + private final InnerWorker worker; + + public RemotingWorker(final int mod, final int total) { + name = getClass().getName() + "_" + mod + "%" + total; + queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + worker = new InnerWorker(name); + worker.start(); + } + + public String getName() { + return name; + } + + /** + * Execute task. + */ + public void execute(Runnable task) { + putTask(task); + } + + private void putTask(Runnable task) { + try { + queue.put(task); + } catch (InterruptedException ire) { + LOGGER.error(ire.toString(), ire); + } + } + + public int pendingTaskCount() { + return queue.size(); + } + + @Override + public void shutdown() { + worker.shutdown(); + queue.clear(); + } + + /** + * Real worker thread. + */ + private class InnerWorker extends Thread implements Closeable { + + private volatile boolean start = true; + + InnerWorker(String name) { + setDaemon(false); + setName(name); + } + + @Override + public void run() { + while (start) { + try { + Runnable task = queue.take(); + long begin = System.currentTimeMillis(); + task.run(); + long duration = System.currentTimeMillis() - begin; + if (duration > 1000L) { + LOGGER.warn("it takes {}ms to run task {}", duration, task); + } + } catch (Throwable e) { + LOGGER.error("[remoting-worker-error] " + e.toString(), e); + } + } + } + + @Override + public void shutdown() { + start = false; + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorkersManager.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorkersManager.java new file mode 100644 index 000000000..3e97a2576 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/worker/RemotingWorkersManager.java @@ -0,0 +1,92 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.worker; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.lifecycle.Closeable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Remoting workers manager. + * + * @author xiweng.yy + */ +public final class RemotingWorkersManager implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorkersManager.class); + + private static final int TIMES_FOR_CORE = 2; + + /** + * power of 2. + */ + private static final RemotingWorker[] REMOTING_WORKERS; + + private RemotingWorkersManager() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + shutdown(); + } catch (NacosException nacosException) { + LOGGER.warn("shutdown RemotingWorkersManager failed", nacosException); + } + })); + } + + static { + // Find a power of 2 >= cpuCores * 2. + final int coreCount = Runtime.getRuntime().availableProcessors(); + int workerCount = 1; + while (workerCount < coreCount * TIMES_FOR_CORE) { + workerCount <<= 1; + } + REMOTING_WORKERS = new RemotingWorker[workerCount]; + for (int mod = 0; mod < workerCount; ++mod) { + REMOTING_WORKERS[mod] = new RemotingWorker(mod, workerCount); + } + } + + /** + * Dispatch task by connectionId. + */ + public static void dispatch(String connectionId, Runnable task) { + RemotingWorker worker = getWorker(connectionId); + worker.execute(task); + } + + /** + * Get worker of connection id. + * + * @param connectionId connection Id + * @return remoting worker + */ + private static RemotingWorker getWorker(String connectionId) { + int idx = connectionId.hashCode() & (REMOTING_WORKERS.length - 1); + return REMOTING_WORKERS[idx]; + } + + public static int workersCount() { + return REMOTING_WORKERS.length; + } + + @Override + public void shutdown() throws NacosException { + for (RemotingWorker each : REMOTING_WORKERS) { + each.shutdown(); + } + } +} diff --git a/pom.xml b/pom.xml index 61479b12e..3e9820f12 100644 --- a/pom.xml +++ b/pom.xml @@ -277,6 +277,7 @@ **/istio/model/mcp/*.java **/istio/model/naming/*.java **/istio/model/*.java + **/api/grpc/*.java @@ -304,7 +305,7 @@ UTF-8 true true - **/istio/model/**,**/nacos/test/** + **/istio/model/**,**/nacos/test/**,**/api/grpc/**