From 723dedebfe51a94152e7c2da6011d450b7c6c0f2 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Mon, 7 Sep 2020 18:49:17 +0800 Subject: [PATCH] request callback timeout support both at client and server; split sdk (#3776) and cluster grpc server port. --- .../request/ConfigChangeNotifyRequest.java | 6 -- .../ConfigChangeBatchListenResponse.java | 6 ++ .../api/remote/AbstractRequestCallBack.java | 41 ++++++++++ .../api/remote/DefaultRequestFuture.java | 64 +++++++++++++-- .../nacos/api/remote/RequestCallBack.java | 11 ++- .../nacos/api/remote/RequestFuture.java | 2 +- .../api/remote/RpcScheduledExecutor.java | 42 ++++++++++ .../api/remote/response/PushCallBack.java | 4 +- .../remote/gprc/NamingGrpcClientProxy.java | 8 +- .../com/alibaba/nacos/client/ConfigTest.java | 81 ++++++++++++++++--- .../listener/impl/ClientWorkerTest.java | 41 ---------- common/pom.xml | 4 +- .../common/remote/client/Connection.java | 15 +++- .../nacos/common/remote/client/RpcClient.java | 4 +- .../remote/client/RpcClientFactory.java | 25 +++--- .../common/remote/client/grpc/GrpcClient.java | 7 +- .../remote/client/grpc/GrpcClusterClient.java | 41 ++++++++++ .../remote/client/grpc/GrpcConnection.java | 37 +++++++-- .../remote/client/grpc/GrpcOpsClient.java | 41 ++++++++++ .../remote/client/grpc/GrpcSdkClient.java | 41 ++++++++++ .../client/rsocket/RsocketConnection.java | 49 ++++++++--- .../remote/RpcConfigChangeNotifier.java | 22 ++--- .../cluster/remote/ClusterRpcClientProxy.java | 10 ++- .../remote/RpcAckCallbackSynchronizer.java | 20 ++++- .../nacos/core/remote/RpcPushService.java | 16 ++-- .../alibaba/nacos/core/remote/RpcServer.java | 12 +-- .../core/remote/grpc/GrpcClusterServer.java | 36 +++++++++ .../core/remote/grpc/GrpcConnection.java | 3 +- .../nacos/core/remote/grpc/GrpcSdkServer.java | 36 +++++++++ .../nacos/core/remote/grpc/GrpcServer.java | 11 +-- .../remote/rsocket/RsocketConnection.java | 55 ++++++++----- .../remote/grpc/GrpcClusterClient.java | 3 +- 32 files changed, 623 insertions(+), 171 deletions(-) create mode 100644 api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java create mode 100644 api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java index 6481ad5bd..b315e83e5 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java @@ -112,12 +112,6 @@ public class ConfigChangeNotifyRequest extends ServerPushRequest { this.tenant = tenant; } - @Override - public String toString() { - return "ConfigChangeNotifyResponse{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", tenant='" - + tenant + '\'' + '}'; - } - /** * Getter method for property beta. * diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java index f4812d12d..105a6d08c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java @@ -145,6 +145,12 @@ public class ConfigChangeBatchListenResponse extends Response { public void setTenant(String tenant) { this.tenant = tenant; } + + @Override + public String toString() { + return "ConfigContext{" + "group='" + group + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant + + '\'' + '}'; + } } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java new file mode 100644 index 000000000..8f21f0638 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote; + +/** + * abstract request call back. + * + * @author liuzunfei + * @version $Id: AbstractRequestCallBack.java, v 0.1 2020年09月07日 3:30 PM liuzunfei Exp $ + */ +public abstract class AbstractRequestCallBack implements RequestCallBack { + + long timeoutMills; + + public AbstractRequestCallBack(long timeoutMill) { + this.timeoutMills = timeoutMill; + } + + public AbstractRequestCallBack() { + this(3000L); + } + + @Override + public long getTimeout() { + return timeoutMills; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java index d1c2c087e..0d20838e7 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java @@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote; import com.alibaba.nacos.api.remote.response.Response; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -40,8 +42,12 @@ public class DefaultRequestFuture implements RequestFuture { private String requestId; + private String connectionId; + private Response response; + private ScheduledFuture timeoutFuture; + /** * Getter method for property requestCallBack. * @@ -63,32 +69,39 @@ public class DefaultRequestFuture implements RequestFuture { public DefaultRequestFuture() { } - public DefaultRequestFuture(String requestId) { - this(requestId, null); + public DefaultRequestFuture(String connectionId, String requestId) { + this(connectionId, requestId, null); } - public DefaultRequestFuture(String requestId, RequestCallBack requestCallBack) { + public DefaultRequestFuture(String connectionId, String requestId, RequestCallBack requestCallBack) { this.timeStamp = System.currentTimeMillis(); this.requestCallBack = requestCallBack; this.requestId = requestId; + this.connectionId = connectionId; + this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER + .schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS); } - public void setResponse(Response response) { + public void setResponse(final Response response) { isDone = true; this.response = response; this.isSuccess = response.isSuccess(); + if (this.timeoutFuture != null) { + timeoutFuture.cancel(true); + } synchronized (this) { notifyAll(); } if (requestCallBack != null) { - requestCallBack.onResponse(response); + requestCallBack.getExcutor().execute(new CallBackHandler()); } } public void setFailResult(Exception e) { isDone = true; isSuccess = false; + this.exception = e; synchronized (this) { notifyAll(); } @@ -142,4 +155,45 @@ public class DefaultRequestFuture implements RequestFuture { throw new TimeoutException(); } } + + class CallBackHandler implements Runnable { + + @Override + public void run() { + if (exception != null) { + requestCallBack.onException(exception); + } else { + requestCallBack.onResponse(response); + } + } + } + + class TimeoutHandler implements Runnable { + + public TimeoutHandler() { + } + + @Override + public void run() { + setFailResult(new TimeoutException("Timeout After " + requestCallBack.getTimeout() + " millseconds.")); + } + } + + /** + * Getter method for property connectionId. + * + * @return property value of connectionId + */ + public String getConnectionId() { + return connectionId; + } + + /** + * Setter method for property timeoutFuture. + * + * @param timeoutFuture value to be assigned to property timeoutFuture + */ + public void setTimeoutFuture(ScheduledFuture timeoutFuture) { + this.timeoutFuture = timeoutFuture; + } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java index ef21d8597..08705250a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java @@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote; import com.alibaba.nacos.api.remote.response.Response; +import java.util.concurrent.Executor; + /** * call bakck for request. * @@ -26,6 +28,13 @@ import com.alibaba.nacos.api.remote.response.Response; */ public interface RequestCallBack { + /** + * get executor on callback. + * + * @return + */ + public Executor getExcutor(); + /** * get timeout mills. * @@ -45,6 +54,6 @@ public interface RequestCallBack { * * @param e exception throwed. */ - public void onException(Exception e); + public void onException(Throwable e); } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java index cb9143bfb..c995d9afc 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java @@ -21,7 +21,7 @@ import com.alibaba.nacos.api.remote.response.Response; import java.util.concurrent.TimeoutException; /** - * future for request + * future for request. * @author liuzunfei * @version $Id: RequestFuture.java, v 0.1 2020年09月01日 6:31 PM liuzunfei Exp $ */ diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java new file mode 100644 index 000000000..6cb2f7448 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +/** + * rpc scheduler executor . + * + * @author liuzunfei + * @version $Id: RpcScheduledExecutor.java, v 0.1 2020年09月07日 4:12 PM liuzunfei Exp $ + */ +public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor { + + public static final RpcScheduledExecutor TIMEOUT_SHEDULER = new RpcScheduledExecutor( + "com.alibaba.nacos.remote.TimerScheduler"); + + public RpcScheduledExecutor(final String threadName) { + super(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, threadName); + } + }); + } + +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java index 89847e87a..b323d4e93 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java @@ -28,8 +28,6 @@ public interface PushCallBack { public void onSuccess(); - public void onFail(Exception e); - - public void onTimeout(); + public void onFail(Throwable e); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java index 686e30490..cdef3dfe5 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java @@ -30,6 +30,7 @@ import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse; import com.alibaba.nacos.api.naming.remote.response.ServiceListResponse; import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse; import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.selector.AbstractSelector; @@ -42,6 +43,8 @@ import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.common.remote.client.ServerListFactory; import com.alibaba.nacos.common.utils.JacksonUtils; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER; @@ -62,7 +65,10 @@ public class NamingGrpcClientProxy implements NamingClientProxy { public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { this.namespaceId = namespaceId; - this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC); + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); + labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING); + this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC, labels); this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); start(serverListFactory, serviceInfoHolder); } diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index 95c9f254d..411967476 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -22,6 +22,8 @@ import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.AbstractListener; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest; +import com.alibaba.nacos.api.remote.AbstractRequestCallBack; +import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.client.RpcClient; @@ -34,10 +36,13 @@ import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Scanner; +import java.util.concurrent.Executor; import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR; import static com.alibaba.nacos.api.common.Constants.WORD_SEPARATOR; @@ -51,8 +56,8 @@ public class ConfigTest { public void before() throws Exception { Properties properties = new Properties(); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848"); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848"); configService = NacosFactory.createConfigService(properties); @@ -61,7 +66,10 @@ public class ConfigTest { @Test public void test222() throws Exception { - RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET); + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); + + RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels); client.init(new ServerListFactory() { @Override public String genNextServer() { @@ -120,6 +128,62 @@ public class ConfigTest { } + @Test + public void test333() throws Exception { + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); + + RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.GRPC, labels); + client.init(new ServerListFactory() { + @Override + public String genNextServer() { + return "127.0.0.1:8848"; + } + + @Override + public String getCurrentServer() { + return "127.0.0.1:8848"; + } + + @Override + public List getServerList() { + return Lists.newArrayList("127.0.0.1:8848"); + } + + }); + client.start(); + + ConfigBatchListenRequest syncRequest = new ConfigBatchListenRequest(); + syncRequest.setListen(true); + final String dataId = "xiaochun.xxc"; + final String group = "xiaochun.xxc"; + syncRequest.addConfigListenContext(group, dataId, null, null); + long start = System.currentTimeMillis(); + System.out.println("send :" + System.currentTimeMillis()); + client.asyncRequest(syncRequest, new AbstractRequestCallBack(2001L) { + + @Override + public Executor getExcutor() { + return null; + } + + @Override + public void onResponse(Response response) { + System.out.println("onSuccess:" + response); + System.out.println("receive :" + System.currentTimeMillis()); + } + + @Override + public void onException(Throwable throwable) { + System.out.println("onFailure:" + throwable); + } + + }); + + Thread.sleep(10000L); + + } + @After public void cleanup() throws Exception { configService.shutDown(); @@ -181,8 +245,6 @@ public class ConfigTest { final String dataId = "xiaochun.xxc"; final String group = "xiaochun.xxc"; final String content = "lessspring-" + System.currentTimeMillis(); - System.out.println(System.getProperty("nacos.logging.path")); - System.out.println(System.getProperty("limitTime")); Thread th = new Thread(new Runnable() { @Override @@ -192,11 +254,10 @@ public class ConfigTest { int times = 1000; while (times > 0) { try { - configService.publishConfig(dataId + random.nextInt(10), group, - "value" + System.currentTimeMillis()); + configService.publishConfig(dataId, group, "value" + System.currentTimeMillis()); times--; - Thread.sleep(2000L); + Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } @@ -217,9 +278,11 @@ public class ConfigTest { } }; + configService.addListener(dataId, group, listener); + for (int i = 0; i < 20; i++) { final int ls = i; - configService.addListener(dataId + i, group, listener); + //configService.addListener(dataId + i, group, listener); } diff --git a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java index 792212418..46a0122f7 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java @@ -18,26 +18,16 @@ package com.alibaba.nacos.client.config.listener.impl; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager; -import com.alibaba.nacos.client.config.http.MetricsHttpAgent; import com.alibaba.nacos.client.config.impl.ClientWorker; import com.alibaba.nacos.client.utils.ParamUtil; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import static org.mockito.Mockito.mock; - public class ClientWorkerTest { @Mock @@ -53,37 +43,6 @@ public class ClientWorkerTest { private final String currentLongingTaskCount = "currentLongingTaskCount"; - @Before - public void init() { - MockitoAnnotations.initMocks(this); - clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class), - mock(Properties.class)); - try { - Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService"); - executorServiceField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL); - executorServiceField.set(clientWorker, scheduledExecutorService); - Listener listener = new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - - } - }; - listeners = Arrays.asList(listener); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } - } - @Test public void testAddLongPollNumberThreads() { try { diff --git a/common/pom.xml b/common/pom.xml index 7e6136a56..8584e99f2 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -87,8 +87,8 @@ org.apache.maven.plugins maven-compiler-plugin - 6 - 6 + 8 + 8 diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java index a7adca298..5b2bff7e3 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java @@ -17,10 +17,10 @@ package com.alibaba.nacos.common.remote.client; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; -import com.google.common.util.concurrent.FutureCallback; import java.util.HashMap; import java.util.Map; @@ -75,18 +75,27 @@ public abstract class Connection { /** * send request. - * + * default time out 3 seconds. * @param request request. * @return */ public abstract Response request(Request request, RequestMeta requestMeta) throws NacosException; + /** + * send request. + * + * @param request request. + * @param timeoutMills mills of timeouts. + * @return + */ + public abstract Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException; + /** * send aync request. * * @param request request. */ - public abstract void asyncRequest(Request request, RequestMeta requestMeta, FutureCallback callback) + public abstract void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) throws NacosException; /** diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index a4ba7c434..e68062f1e 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.common.remote.client; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.PayloadRegistry; +import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; @@ -30,7 +31,6 @@ import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.VersionUtils; -import com.google.common.util.concurrent.FutureCallback; import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; @@ -462,7 +462,7 @@ public abstract class RpcClient implements Closeable { * @param request request. * @return */ - public void asyncRequest(Request request, FutureCallback callback) throws NacosException { + public void asyncRequest(Request request, RequestCallBack callback) throws NacosException { int retryTimes = 3; Exception exceptionToThrow = null; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java index 2632a4031..6dc5661c7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java @@ -18,7 +18,8 @@ package com.alibaba.nacos.common.remote.client; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.remote.ConnectionType; -import com.alibaba.nacos.common.remote.client.grpc.GrpcClient; +import com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient; +import com.alibaba.nacos.common.remote.client.grpc.GrpcSdkClient; import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient; import java.util.HashMap; @@ -70,23 +71,25 @@ public class RpcClientFactory { * @param connectionType client type. * @return */ - public static RpcClient createClient(String clientName, ConnectionType connectionType) { + public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) { + String clientNameInner = clientName; synchronized (clientMap) { - if (clientMap.get(clientName) == null) { + if (clientMap.get(clientNameInner) == null) { RpcClient moduleClient = null; if (ConnectionType.GRPC.equals(connectionType)) { - moduleClient = new GrpcClient(clientName); - + moduleClient = new GrpcSdkClient(clientNameInner); + } else if (ConnectionType.RSOCKET.equals(connectionType)) { - moduleClient = new RsocketRpcClient(clientName); + moduleClient = new RsocketRpcClient(clientNameInner); } if (moduleClient == null) { throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType()); } - clientMap.put(clientName, moduleClient); + moduleClient.initLabels(labels); + clientMap.put(clientNameInner, moduleClient); return moduleClient; } - return clientMap.get(clientName); + return clientMap.get(clientNameInner); } } @@ -97,14 +100,14 @@ public class RpcClientFactory { * @param connectionType client type. * @return */ - public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) { - //TODO to be deleted. + public static RpcClient createClusterClient(String clientName, ConnectionType connectionType, + Map labels) { String clientNameInner = clientName; synchronized (clientMap) { if (clientMap.get(clientNameInner) == null) { RpcClient moduleClient = null; if (ConnectionType.GRPC.equals(connectionType)) { - moduleClient = new GrpcClient(clientNameInner); + moduleClient = new GrpcClusterClient(clientNameInner); } else if (ConnectionType.RSOCKET.equals(connectionType)) { moduleClient = new RsocketRpcClient(clientNameInner); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index b446b4a02..ec08459c7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit; * @author liuzunfei * @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $ */ -public class GrpcClient extends RpcClient { +public abstract class GrpcClient extends RpcClient { static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client"); @@ -101,10 +101,6 @@ public class GrpcClient extends RpcClient { } } - @Override - public int rpcPortOffset() { - return 1000; - } /** * Send Heart Beat Request. @@ -174,6 +170,7 @@ public class GrpcClient extends RpcClient { @Override public void onNext(Payload payload) { + LOGGER.debug(" stream server reuqust receive ,original info :{}", payload.toString()); try { final Request request = (Request) GrpcUtils.parse(payload).getBody(); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java new file mode 100644 index 000000000..346fb913f --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.remote.client.grpc; + +/** + * sdk client for grpc. + * + * @author liuzunfei + * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ + */ +public class GrpcClusterClient extends GrpcClient { + + /** + * Empty constructor. + * + * @param name name of client. + */ + public GrpcClusterClient(String name) { + super(name); + } + + @Override + public int rpcPortOffset() { + return 1001; + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java index df6373ca6..1e8b90c8b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.RequestGrpc; import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc; +import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; @@ -34,8 +35,11 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import org.checkerframework.checker.nullness.compatqual.NullableDecl; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.CancellationException; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * grpc connection. @@ -48,7 +52,7 @@ public class GrpcConnection extends Connection { /** * executor to execute future request. */ - static ExecutorService aynsRequestExecutor = Executors + static ScheduledExecutorService aynsRequestExecutor = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); /** @@ -75,21 +79,27 @@ public class GrpcConnection extends Connection { @Override public Response request(Request request, RequestMeta requestMeta) throws NacosException { + return request(request, requestMeta, 3000L); + } + + @Override + public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException { Payload grpcRequest = GrpcUtils.convert(request, requestMeta); ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest); Payload grpcResponse = null; try { - grpcResponse = requestFuture.get(); + grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); return null; } - + Response response = (Response) GrpcUtils.parse(grpcResponse).getBody(); return response; } + public void sendResponse(Response response) { Payload convert = GrpcUtils.convert(response); payloadStreamObserver.onNext(convert); @@ -101,18 +111,20 @@ public class GrpcConnection extends Connection { } @Override - public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback callback) + public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack) throws NacosException { Payload grpcRequest = GrpcUtils.convert(request, requestMeta); ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest); + + //set callback . Futures.addCallback(requestFuture, new FutureCallback() { @Override public void onSuccess(@NullableDecl Payload grpcResponse) { Response response = (Response) GrpcUtils.parse(grpcResponse).getBody(); if (response != null && response.isSuccess()) { - callback.onSuccess(response); + requestCallBack.onResponse(response); } else { - callback.onFailure(new NacosException( + requestCallBack.onException(new NacosException( (response == null) ? ResponseCode.FAIL.getCode() : response.getErrorCode(), (response == null) ? "null" : response.getMessage())); } @@ -120,9 +132,18 @@ public class GrpcConnection extends Connection { @Override public void onFailure(Throwable throwable) { - callback.onFailure(throwable); + if (throwable instanceof CancellationException) { + requestCallBack.onException( + new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " millseconds.")); + } else { + requestCallBack.onException(throwable); + } } }, aynsRequestExecutor); + // set timeout future. + ListenableFuture payloadListenableFuture = Futures + .withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS, aynsRequestExecutor); + } @Override diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java new file mode 100644 index 000000000..1e9aa66cc --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.remote.client.grpc; + +/** + * sdk client for grpc. + * + * @author liuzunfei + * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ + */ +public class GrpcOpsClient extends GrpcClient { + + /** + * Empty constructor. + * + * @param name name of client. + */ + public GrpcOpsClient(String name) { + super(name); + } + + @Override + public int rpcPortOffset() { + return 1002; + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java new file mode 100644 index 000000000..089ec3d45 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.remote.client.grpc; + +/** + * sdk client for grpc. + * + * @author liuzunfei + * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ + */ +public class GrpcSdkClient extends GrpcClient { + + /** + * Empty constructor. + * + * @param name name of client. + */ + public GrpcSdkClient(String name) { + super(name); + } + + @Override + public int rpcPortOffset() { + return 1000; + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java index c8eaaaece..44c95afaf 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java @@ -17,19 +17,26 @@ package com.alibaba.nacos.common.remote.client.rsocket; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.RpcScheduledExecutor; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.remote.RsocketUtils; import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; -import com.google.common.util.concurrent.FutureCallback; import io.rsocket.Payload; import io.rsocket.RSocket; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * rsocket connection. * @@ -47,29 +54,51 @@ public class RsocketConnection extends Connection { @Override public Response request(Request request, RequestMeta requestMeta) throws NacosException { + return request(request, requestMeta, 3000L); + } + + @Override + public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException { Payload response = rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)) - .block(); + .block(Duration.ofMillis(timeouts)); return RsocketUtils.parseResponseFromPayload(response); } @Override - public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback callback) + public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack) throws NacosException { try { Mono response = rSocketClient .requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)); - - response.subscribe(new Consumer() { - @Override - public void accept(Payload payload) { - callback.onSuccess(RsocketUtils.parseResponseFromPayload(payload)); - } + + response.toFuture().acceptEither(RsocketConnection.failAfter(requestCallBack.getTimeout()), + new Consumer() { + @Override + public void accept(Payload payload) { + requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload)); + } + }).exceptionally(throwable -> { + requestCallBack.onException(throwable); + return null; }); + } catch (Exception e) { - callback.onFailure(e); + requestCallBack.onException(e); } } + private static CompletableFuture failAfter(final long timeouts) { + final CompletableFuture promise = new CompletableFuture(); + RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { + @Override + public Object call() throws Exception { + final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); + return promise.completeExceptionally(ex); + } + }, timeouts, MILLISECONDS); + return promise; + } + @Override public void close() { if (this.rSocketClient != null && !rSocketClient.isDisposed()) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java index 57d4dda09..deb3ff38e 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/RpcConfigChangeNotifier.java @@ -73,7 +73,7 @@ public class RpcConfigChangeNotifier extends Subscriber { private ConnectionManager connectionManager; /** - * adaptor to config module ,when server side congif change ,invoke this method. + * adaptor to config module ,when server side config change ,invoke this method. * * @param groupKey groupKey * @param notifyRequet notifyRequet @@ -84,7 +84,6 @@ public class RpcConfigChangeNotifier extends Subscriber { if (listeners == null || listeners.isEmpty()) { return; } - Set clients = new HashSet<>(listeners); int notifyCount = 0; if (!CollectionUtils.isEmpty(clients)) { @@ -100,8 +99,8 @@ public class RpcConfigChangeNotifier extends Subscriber { continue; } } - - RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 5, client); + + RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 50, client); push(rpcPushRetryTask); notifyCount++; } @@ -168,24 +167,17 @@ public class RpcConfigChangeNotifier extends Subscriber { } @Override - public void onFail(Exception e) { + public void onFail(Throwable e) { + Loggers.CORE.error("On failt ", e); Loggers.CORE.warn("push fail.dataId={},group={},tenant={},clientId={},tryTimes={}", notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId, retryTimes); push(RpcPushTask.this); } - - @Override - public void onTimeout() { - Loggers.CORE.warn("push timeout.dataId={},group={},tenant={},clientId={},tryTimes={}", - notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId, - retryTimes); - push(RpcPushTask.this); - } - - }); + }, ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR); + tryTimes++; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java index 451f8d6c2..7cf461180 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java @@ -109,12 +109,13 @@ public class ClusterRpcClientProxy extends MemberChangeListener { } private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException { - RpcClient client = RpcClientFactory.createClient(memberClientKey(member), type); + Map labels = new HashMap(); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); + + RpcClient client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels); if (!client.getConnectionType().equals(type)) { RpcClientFactory.destroyClient(memberClientKey(member)); - Map labels = new HashMap(); - labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); - client = RpcClientFactory.createClient(memberClientKey(member), type, labels); + client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels); } if (client.isWaitInited()) { @@ -144,6 +145,7 @@ public class ClusterRpcClientProxy extends MemberChangeListener { /** * send request to member. + * * @param member * @param request * @return diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java index 4837be137..f25efafea 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java @@ -60,10 +60,11 @@ public class RpcAckCallbackSynchronizer { return; } - DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.get(response.getRequestId()); + DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.remove(response.getRequestId()); if (currentCallback == null) { return; } + if (response.isSuccess()) { currentCallback.setResponse(response); } else { @@ -71,6 +72,23 @@ public class RpcAckCallbackSynchronizer { } } + /** + * notify ackid. + */ + public static void exceptionNotify(String connectionId, String requestId, Exception e) { + + Map stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connectionId); + if (stringDefaultPushFutureMap == null) { + return; + } + + DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.remove(requestId); + if (currentCallback == null) { + return; + } + currentCallback.setFailResult(e); + } + /** * notify ackid. */ diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java index b973d68a3..87918a6d1 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.core.remote; import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.AbstractRequestCallBack; import com.alibaba.nacos.api.remote.request.ServerPushRequest; import com.alibaba.nacos.api.remote.response.PushCallBack; import com.alibaba.nacos.api.remote.response.Response; @@ -26,6 +26,8 @@ import com.alibaba.nacos.core.utils.Loggers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.concurrent.Executor; + /** * push response to clients. * @@ -45,14 +47,16 @@ public class RpcPushService { * @param request request. * @param requestCallBack requestCallBack. */ - public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack) { + public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack, + Executor executor) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { - connection.sendRequestWithCallBack(request, new RequestCallBack() { + connection.sendRequestWithCallBack(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) { + @Override - public long getTimeout() { - return requestCallBack.getTimeout(); + public Executor getExcutor() { + return executor; } @Override @@ -65,7 +69,7 @@ public class RpcPushService { } @Override - public void onException(Exception e) { + public void onException(Throwable e) { requestCallBack.onFail(e); } }); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java index 7125b73d8..de7b85993 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java @@ -45,22 +45,24 @@ public abstract class RpcServer { @PostConstruct public void start() throws Exception { - Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getConnectionType(), + Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getClass().getSimpleName(), (ApplicationUtils.getPort() + rpcPortOffset())); startServer(); - Loggers.RPC.info("Nacos {} Rpc server started at port {}", getConnectionType(), + Loggers.RPC.info("Nacos {} Rpc server started at port {}", getClass().getSimpleName(), (ApplicationUtils.getPort() + rpcPortOffset())); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - Loggers.RPC.info("Nacos {} Rpc server stopping", getConnectionType()); + Loggers.RPC.info("Nacos {} Rpc server stopping", getClass().getSimpleName()); try { RpcServer.this.stopServer(); - Loggers.RPC.info("Nacos {} Rpc server stopped successfully...", getConnectionType()); + Loggers.RPC.info("Nacos {} Rpc server stopped successfully...", + RpcServer.this.getClass().getSimpleName()); } catch (Exception e) { - Loggers.RPC.error("Nacos {} Rpc server stopped fail...", getConnectionType(), e); + Loggers.RPC + .error("Nacos {} Rpc server stopped fail...", RpcServer.this.getClass().getSimpleName(), e); } } }); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java new file mode 100644 index 000000000..b0342c014 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java @@ -0,0 +1,36 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote.grpc; + +import org.springframework.stereotype.Service; + +/** + * Grpc implementation as a rpc server. + * + * @author liuzunfei + * @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $ + */ +@Service +public class GrpcClusterServer extends GrpcServer { + + private static final int PORT_OFFSET = 1001; + + @Override + public int rpcPortOffset() { + return PORT_OFFSET; + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java index d7f24746a..95c56a583 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java @@ -37,6 +37,7 @@ import io.grpc.stub.StreamObserver; /** * grpc connection. + * * @author liuzunfei * @version $Id: GrpcConnection.java, v 0.1 2020年07月13日 7:26 PM liuzunfei Exp $ */ @@ -97,7 +98,7 @@ public class GrpcConnection extends Connection { String requestId = String.valueOf(PushAckIdGenerator.getNextId()); request.setRequestId(requestId); sendRequestNoAck(request); - DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(requestId, callBack); + DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(this.getConnectionId(), requestId, callBack); RpcAckCallbackSynchronizer.syncCallback(getConnectionId(), requestId, defaultPushFuture); return defaultPushFuture; } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java new file mode 100644 index 000000000..f1bdf7bf8 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java @@ -0,0 +1,36 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.remote.grpc; + +import org.springframework.stereotype.Service; + +/** + * Grpc implementation as a rpc server. + * + * @author liuzunfei + * @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $ + */ +@Service +public class GrpcSdkServer extends GrpcServer { + + private static final int PORT_OFFSET = 1000; + + @Override + public int rpcPortOffset() { + return PORT_OFFSET; + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java index 503ce1842..7c22cb763 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java @@ -44,7 +44,6 @@ import io.grpc.protobuf.ProtoUtils; import io.grpc.stub.ServerCalls; import io.grpc.util.MutableHandlerRegistry; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; /** * Grpc implementation as a rpc server. @@ -52,10 +51,7 @@ import org.springframework.stereotype.Service; * @author liuzunfei * @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $ */ -@Service -public class GrpcServer extends RpcServer { - - private static final int PORT_OFFSET = 1000; +public abstract class GrpcServer extends RpcServer { private Server server; @@ -162,11 +158,6 @@ public class GrpcServer extends RpcServer { } - @Override - public int rpcPortOffset() { - return PORT_OFFSET; - } - @Override public void shundownServer() { if (server != null) { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java index 98efcc5d2..e2361b227 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java @@ -19,6 +19,7 @@ package com.alibaba.nacos.core.remote.rsocket; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.RequestFuture; +import com.alibaba.nacos.api.remote.RpcScheduledExecutor; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; @@ -33,9 +34,13 @@ import io.rsocket.RSocket; import reactor.core.publisher.Mono; import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * connection of rsocket. * @@ -107,28 +112,40 @@ public class RsocketConnection extends Connection { } @Override - public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException { - + public void sendRequestWithCallBack(Request request, RequestCallBack requestCallBack) throws NacosException { + long id = System.currentTimeMillis(); Loggers.RPC_DIGEST.info("Rsocket sendRequestWithCallBack :" + request); - - Mono payloadMono = clientSocket - .requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta())); - payloadMono.subscribe(new Consumer() { - - @Override - public void accept(Payload payload) { - Response response = RsocketUtils.parseResponseFromPayload(payload); - callBack.onResponse(response); - } - - }, new Consumer() { - @Override - public void accept(Throwable throwable) { - callBack.onException(new Exception(throwable)); - } - }); + try { + Mono response = clientSocket + .requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta())); + + response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer() { + @Override + public void accept(Payload payload) { + requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload)); + } + }).exceptionally(throwable -> { + requestCallBack.onException(throwable); + return null; + }); + + } catch (Exception e) { + requestCallBack.onException(e); + } + } + + private static CompletableFuture failAfter(final long timeouts) { + final CompletableFuture promise = new CompletableFuture(); + RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { + @Override + public Object call() throws Exception { + final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); + return promise.completeExceptionally(ex); + } + }, timeouts, MILLISECONDS); + return promise; } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java index bd9d38af4..36b2d6510 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java @@ -19,7 +19,6 @@ package com.alibaba.nacos.naming.cluster.remote.grpc; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.common.remote.client.grpc.GrpcClient; import com.alibaba.nacos.naming.cluster.remote.ClusterClient; @@ -33,7 +32,7 @@ public class GrpcClusterClient implements ClusterClient { private final GrpcClient grpcClient; public GrpcClusterClient(String targetAddress) { - this.grpcClient = new GrpcClient(targetAddress); + this.grpcClient = new com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient(targetAddress); } @Override