diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java
index 6481ad5bd..b315e83e5 100644
--- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java
+++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigChangeNotifyRequest.java
@@ -112,12 +112,6 @@ public class ConfigChangeNotifyRequest extends ServerPushRequest {
this.tenant = tenant;
}
- @Override
- public String toString() {
- return "ConfigChangeNotifyResponse{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", tenant='"
- + tenant + '\'' + '}';
- }
-
/**
* Getter method for property beta.
*
diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java
index f4812d12d..105a6d08c 100644
--- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java
+++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java
@@ -145,6 +145,12 @@ public class ConfigChangeBatchListenResponse extends Response {
public void setTenant(String tenant) {
this.tenant = tenant;
}
+
+ @Override
+ public String toString() {
+ return "ConfigContext{" + "group='" + group + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant
+ + '\'' + '}';
+ }
}
}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java
new file mode 100644
index 000000000..8f21f0638
--- /dev/null
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/AbstractRequestCallBack.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.api.remote;
+
+/**
+ * abstract request call back.
+ *
+ * @author liuzunfei
+ * @version $Id: AbstractRequestCallBack.java, v 0.1 2020年09月07日 3:30 PM liuzunfei Exp $
+ */
+public abstract class AbstractRequestCallBack implements RequestCallBack {
+
+ long timeoutMills;
+
+ public AbstractRequestCallBack(long timeoutMill) {
+ this.timeoutMills = timeoutMill;
+ }
+
+ public AbstractRequestCallBack() {
+ this(3000L);
+ }
+
+ @Override
+ public long getTimeout() {
+ return timeoutMills;
+ }
+}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java
index d1c2c087e..0d20838e7 100644
--- a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java
@@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.remote.response.Response;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
@@ -40,8 +42,12 @@ public class DefaultRequestFuture implements RequestFuture {
private String requestId;
+ private String connectionId;
+
private Response response;
+ private ScheduledFuture timeoutFuture;
+
/**
* Getter method for property requestCallBack.
*
@@ -63,32 +69,39 @@ public class DefaultRequestFuture implements RequestFuture {
public DefaultRequestFuture() {
}
- public DefaultRequestFuture(String requestId) {
- this(requestId, null);
+ public DefaultRequestFuture(String connectionId, String requestId) {
+ this(connectionId, requestId, null);
}
- public DefaultRequestFuture(String requestId, RequestCallBack requestCallBack) {
+ public DefaultRequestFuture(String connectionId, String requestId, RequestCallBack requestCallBack) {
this.timeStamp = System.currentTimeMillis();
this.requestCallBack = requestCallBack;
this.requestId = requestId;
+ this.connectionId = connectionId;
+ this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER
+ .schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS);
}
- public void setResponse(Response response) {
+ public void setResponse(final Response response) {
isDone = true;
this.response = response;
this.isSuccess = response.isSuccess();
+ if (this.timeoutFuture != null) {
+ timeoutFuture.cancel(true);
+ }
synchronized (this) {
notifyAll();
}
if (requestCallBack != null) {
- requestCallBack.onResponse(response);
+ requestCallBack.getExcutor().execute(new CallBackHandler());
}
}
public void setFailResult(Exception e) {
isDone = true;
isSuccess = false;
+ this.exception = e;
synchronized (this) {
notifyAll();
}
@@ -142,4 +155,45 @@ public class DefaultRequestFuture implements RequestFuture {
throw new TimeoutException();
}
}
+
+ class CallBackHandler implements Runnable {
+
+ @Override
+ public void run() {
+ if (exception != null) {
+ requestCallBack.onException(exception);
+ } else {
+ requestCallBack.onResponse(response);
+ }
+ }
+ }
+
+ class TimeoutHandler implements Runnable {
+
+ public TimeoutHandler() {
+ }
+
+ @Override
+ public void run() {
+ setFailResult(new TimeoutException("Timeout After " + requestCallBack.getTimeout() + " millseconds."));
+ }
+ }
+
+ /**
+ * Getter method for property connectionId.
+ *
+ * @return property value of connectionId
+ */
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ /**
+ * Setter method for property timeoutFuture.
+ *
+ * @param timeoutFuture value to be assigned to property timeoutFuture
+ */
+ public void setTimeoutFuture(ScheduledFuture timeoutFuture) {
+ this.timeoutFuture = timeoutFuture;
+ }
}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java
index ef21d8597..08705250a 100644
--- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java
@@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.remote.response.Response;
+import java.util.concurrent.Executor;
+
/**
* call bakck for request.
*
@@ -26,6 +28,13 @@ import com.alibaba.nacos.api.remote.response.Response;
*/
public interface RequestCallBack {
+ /**
+ * get executor on callback.
+ *
+ * @return
+ */
+ public Executor getExcutor();
+
/**
* get timeout mills.
*
@@ -45,6 +54,6 @@ public interface RequestCallBack {
*
* @param e exception throwed.
*/
- public void onException(Exception e);
+ public void onException(Throwable e);
}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java
index cb9143bfb..c995d9afc 100644
--- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java
@@ -21,7 +21,7 @@ import com.alibaba.nacos.api.remote.response.Response;
import java.util.concurrent.TimeoutException;
/**
- * future for request
+ * future for request.
* @author liuzunfei
* @version $Id: RequestFuture.java, v 0.1 2020年09月01日 6:31 PM liuzunfei Exp $
*/
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java
new file mode 100644
index 000000000..6cb2f7448
--- /dev/null
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.api.remote;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * rpc scheduler executor .
+ *
+ * @author liuzunfei
+ * @version $Id: RpcScheduledExecutor.java, v 0.1 2020年09月07日 4:12 PM liuzunfei Exp $
+ */
+public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor {
+
+ public static final RpcScheduledExecutor TIMEOUT_SHEDULER = new RpcScheduledExecutor(
+ "com.alibaba.nacos.remote.TimerScheduler");
+
+ public RpcScheduledExecutor(final String threadName) {
+ super(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, threadName);
+ }
+ });
+ }
+
+}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
index 89847e87a..b323d4e93 100644
--- a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
@@ -28,8 +28,6 @@ public interface PushCallBack {
public void onSuccess();
- public void onFail(Exception e);
-
- public void onTimeout();
+ public void onFail(Throwable e);
}
diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java
index 686e30490..cdef3dfe5 100644
--- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java
+++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java
@@ -30,6 +30,7 @@ import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.naming.remote.response.ServiceListResponse;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
+import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.selector.AbstractSelector;
@@ -42,6 +43,8 @@ import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.JacksonUtils;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
@@ -62,7 +65,10 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
- this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC);
+ Map labels = new HashMap();
+ labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
+ labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
+ this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC, labels);
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}
diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
index 95c9f254d..411967476 100644
--- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
+++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
@@ -22,6 +22,8 @@ import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
+import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
+import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
@@ -34,10 +36,13 @@ import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;
+import java.util.concurrent.Executor;
import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR;
import static com.alibaba.nacos.api.common.Constants.WORD_SEPARATOR;
@@ -51,8 +56,8 @@ public class ConfigTest {
public void before() throws Exception {
Properties properties = new Properties();
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848");
- //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
+ //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848");
configService = NacosFactory.createConfigService(properties);
@@ -61,7 +66,10 @@ public class ConfigTest {
@Test
public void test222() throws Exception {
- RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET);
+ Map labels = new HashMap();
+ labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
+
+ RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels);
client.init(new ServerListFactory() {
@Override
public String genNextServer() {
@@ -120,6 +128,62 @@ public class ConfigTest {
}
+ @Test
+ public void test333() throws Exception {
+ Map labels = new HashMap();
+ labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
+
+ RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.GRPC, labels);
+ client.init(new ServerListFactory() {
+ @Override
+ public String genNextServer() {
+ return "127.0.0.1:8848";
+ }
+
+ @Override
+ public String getCurrentServer() {
+ return "127.0.0.1:8848";
+ }
+
+ @Override
+ public List getServerList() {
+ return Lists.newArrayList("127.0.0.1:8848");
+ }
+
+ });
+ client.start();
+
+ ConfigBatchListenRequest syncRequest = new ConfigBatchListenRequest();
+ syncRequest.setListen(true);
+ final String dataId = "xiaochun.xxc";
+ final String group = "xiaochun.xxc";
+ syncRequest.addConfigListenContext(group, dataId, null, null);
+ long start = System.currentTimeMillis();
+ System.out.println("send :" + System.currentTimeMillis());
+ client.asyncRequest(syncRequest, new AbstractRequestCallBack(2001L) {
+
+ @Override
+ public Executor getExcutor() {
+ return null;
+ }
+
+ @Override
+ public void onResponse(Response response) {
+ System.out.println("onSuccess:" + response);
+ System.out.println("receive :" + System.currentTimeMillis());
+ }
+
+ @Override
+ public void onException(Throwable throwable) {
+ System.out.println("onFailure:" + throwable);
+ }
+
+ });
+
+ Thread.sleep(10000L);
+
+ }
+
@After
public void cleanup() throws Exception {
configService.shutDown();
@@ -181,8 +245,6 @@ public class ConfigTest {
final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc";
final String content = "lessspring-" + System.currentTimeMillis();
- System.out.println(System.getProperty("nacos.logging.path"));
- System.out.println(System.getProperty("limitTime"));
Thread th = new Thread(new Runnable() {
@Override
@@ -192,11 +254,10 @@ public class ConfigTest {
int times = 1000;
while (times > 0) {
try {
- configService.publishConfig(dataId + random.nextInt(10), group,
- "value" + System.currentTimeMillis());
+ configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
times--;
- Thread.sleep(2000L);
+ Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
@@ -217,9 +278,11 @@ public class ConfigTest {
}
};
+ configService.addListener(dataId, group, listener);
+
for (int i = 0; i < 20; i++) {
final int ls = i;
- configService.addListener(dataId + i, group, listener);
+ //configService.addListener(dataId + i, group, listener);
}
diff --git a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java
index 792212418..46a0122f7 100644
--- a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java
+++ b/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java
@@ -18,26 +18,16 @@ package com.alibaba.nacos.client.config.listener.impl;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
-import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
-import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.impl.ClientWorker;
import com.alibaba.nacos.client.utils.ParamUtil;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import static org.mockito.Mockito.mock;
-
public class ClientWorkerTest {
@Mock
@@ -53,37 +43,6 @@ public class ClientWorkerTest {
private final String currentLongingTaskCount = "currentLongingTaskCount";
- @Before
- public void init() {
- MockitoAnnotations.initMocks(this);
- clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class),
- mock(Properties.class));
- try {
- Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService");
- executorServiceField.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL);
- executorServiceField.set(clientWorker, scheduledExecutorService);
- Listener listener = new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configInfo) {
-
- }
- };
- listeners = Arrays.asList(listener);
- } catch (NoSuchFieldException e) {
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- }
- }
-
@Test
public void testAddLongPollNumberThreads() {
try {
diff --git a/common/pom.xml b/common/pom.xml
index 7e6136a56..8584e99f2 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -87,8 +87,8 @@
org.apache.maven.plugins
maven-compiler-plugin
-
- 6
+
+ 8
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java
index a7adca298..5b2bff7e3 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java
@@ -17,10 +17,10 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
-import com.google.common.util.concurrent.FutureCallback;
import java.util.HashMap;
import java.util.Map;
@@ -75,18 +75,27 @@ public abstract class Connection {
/**
* send request.
- *
+ * default time out 3 seconds.
* @param request request.
* @return
*/
public abstract Response request(Request request, RequestMeta requestMeta) throws NacosException;
+ /**
+ * send request.
+ *
+ * @param request request.
+ * @param timeoutMills mills of timeouts.
+ * @return
+ */
+ public abstract Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException;
+
/**
* send aync request.
*
* @param request request.
*/
- public abstract void asyncRequest(Request request, RequestMeta requestMeta, FutureCallback callback)
+ public abstract void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
throws NacosException;
/**
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
index a4ba7c434..e68062f1e 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
@@ -18,6 +18,7 @@ package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.PayloadRegistry;
+import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
@@ -30,7 +31,6 @@ import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
-import com.google.common.util.concurrent.FutureCallback;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
@@ -462,7 +462,7 @@ public abstract class RpcClient implements Closeable {
* @param request request.
* @return
*/
- public void asyncRequest(Request request, FutureCallback callback) throws NacosException {
+ public void asyncRequest(Request request, RequestCallBack callback) throws NacosException {
int retryTimes = 3;
Exception exceptionToThrow = null;
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
index 2632a4031..6dc5661c7 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
@@ -18,7 +18,8 @@ package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.remote.ConnectionType;
-import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
+import com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient;
+import com.alibaba.nacos.common.remote.client.grpc.GrpcSdkClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient;
import java.util.HashMap;
@@ -70,23 +71,25 @@ public class RpcClientFactory {
* @param connectionType client type.
* @return
*/
- public static RpcClient createClient(String clientName, ConnectionType connectionType) {
+ public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) {
+ String clientNameInner = clientName;
synchronized (clientMap) {
- if (clientMap.get(clientName) == null) {
+ if (clientMap.get(clientNameInner) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
- moduleClient = new GrpcClient(clientName);
-
+ moduleClient = new GrpcSdkClient(clientNameInner);
+
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
- moduleClient = new RsocketRpcClient(clientName);
+ moduleClient = new RsocketRpcClient(clientNameInner);
}
if (moduleClient == null) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
- clientMap.put(clientName, moduleClient);
+ moduleClient.initLabels(labels);
+ clientMap.put(clientNameInner, moduleClient);
return moduleClient;
}
- return clientMap.get(clientName);
+ return clientMap.get(clientNameInner);
}
}
@@ -97,14 +100,14 @@ public class RpcClientFactory {
* @param connectionType client type.
* @return
*/
- public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) {
- //TODO to be deleted.
+ public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
+ Map labels) {
String clientNameInner = clientName;
synchronized (clientMap) {
if (clientMap.get(clientNameInner) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
- moduleClient = new GrpcClient(clientNameInner);
+ moduleClient = new GrpcClusterClient(clientNameInner);
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient(clientNameInner);
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
index b446b4a02..ec08459c7 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
* @author liuzunfei
* @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $
*/
-public class GrpcClient extends RpcClient {
+public abstract class GrpcClient extends RpcClient {
static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
@@ -101,10 +101,6 @@ public class GrpcClient extends RpcClient {
}
}
- @Override
- public int rpcPortOffset() {
- return 1000;
- }
/**
* Send Heart Beat Request.
@@ -174,6 +170,7 @@ public class GrpcClient extends RpcClient {
@Override
public void onNext(Payload payload) {
+
LOGGER.debug(" stream server reuqust receive ,original info :{}", payload.toString());
try {
final Request request = (Request) GrpcUtils.parse(payload).getBody();
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java
new file mode 100644
index 000000000..346fb913f
--- /dev/null
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.common.remote.client.grpc;
+
+/**
+ * sdk client for grpc.
+ *
+ * @author liuzunfei
+ * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
+ */
+public class GrpcClusterClient extends GrpcClient {
+
+ /**
+ * Empty constructor.
+ *
+ * @param name name of client.
+ */
+ public GrpcClusterClient(String name) {
+ super(name);
+ }
+
+ @Override
+ public int rpcPortOffset() {
+ return 1001;
+ }
+
+}
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java
index df6373ca6..1e8b90c8b 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java
@@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc;
+import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
@@ -34,8 +35,11 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* grpc connection.
@@ -48,7 +52,7 @@ public class GrpcConnection extends Connection {
/**
* executor to execute future request.
*/
- static ExecutorService aynsRequestExecutor = Executors
+ static ScheduledExecutorService aynsRequestExecutor = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
/**
@@ -75,21 +79,27 @@ public class GrpcConnection extends Connection {
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
+ return request(request, requestMeta, 3000L);
+ }
+
+ @Override
+ public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse = null;
try {
- grpcResponse = requestFuture.get();
+ grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
return null;
}
-
+
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
return response;
}
+
public void sendResponse(Response response) {
Payload convert = GrpcUtils.convert(response);
payloadStreamObserver.onNext(convert);
@@ -101,18 +111,20 @@ public class GrpcConnection extends Connection {
}
@Override
- public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback callback)
+ public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack)
throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest);
+
+ //set callback .
Futures.addCallback(requestFuture, new FutureCallback() {
@Override
public void onSuccess(@NullableDecl Payload grpcResponse) {
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
if (response != null && response.isSuccess()) {
- callback.onSuccess(response);
+ requestCallBack.onResponse(response);
} else {
- callback.onFailure(new NacosException(
+ requestCallBack.onException(new NacosException(
(response == null) ? ResponseCode.FAIL.getCode() : response.getErrorCode(),
(response == null) ? "null" : response.getMessage()));
}
@@ -120,9 +132,18 @@ public class GrpcConnection extends Connection {
@Override
public void onFailure(Throwable throwable) {
- callback.onFailure(throwable);
+ if (throwable instanceof CancellationException) {
+ requestCallBack.onException(
+ new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " millseconds."));
+ } else {
+ requestCallBack.onException(throwable);
+ }
}
}, aynsRequestExecutor);
+ // set timeout future.
+ ListenableFuture payloadListenableFuture = Futures
+ .withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS, aynsRequestExecutor);
+
}
@Override
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java
new file mode 100644
index 000000000..1e9aa66cc
--- /dev/null
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.common.remote.client.grpc;
+
+/**
+ * sdk client for grpc.
+ *
+ * @author liuzunfei
+ * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
+ */
+public class GrpcOpsClient extends GrpcClient {
+
+ /**
+ * Empty constructor.
+ *
+ * @param name name of client.
+ */
+ public GrpcOpsClient(String name) {
+ super(name);
+ }
+
+ @Override
+ public int rpcPortOffset() {
+ return 1002;
+ }
+
+}
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java
new file mode 100644
index 000000000..089ec3d45
--- /dev/null
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.common.remote.client.grpc;
+
+/**
+ * sdk client for grpc.
+ *
+ * @author liuzunfei
+ * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
+ */
+public class GrpcSdkClient extends GrpcClient {
+
+ /**
+ * Empty constructor.
+ *
+ * @param name name of client.
+ */
+ public GrpcSdkClient(String name) {
+ super(name);
+ }
+
+ @Override
+ public int rpcPortOffset() {
+ return 1000;
+ }
+
+}
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java
index c8eaaaece..44c95afaf 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java
@@ -17,19 +17,26 @@
package com.alibaba.nacos.common.remote.client.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.remote.RequestCallBack;
+import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.RsocketUtils;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
-import com.google.common.util.concurrent.FutureCallback;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
/**
* rsocket connection.
*
@@ -47,29 +54,51 @@ public class RsocketConnection extends Connection {
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
+ return request(request, requestMeta, 3000L);
+ }
+
+ @Override
+ public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload response = rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta))
- .block();
+ .block(Duration.ofMillis(timeouts));
return RsocketUtils.parseResponseFromPayload(response);
}
@Override
- public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback callback)
+ public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack)
throws NacosException {
try {
Mono response = rSocketClient
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
-
- response.subscribe(new Consumer() {
- @Override
- public void accept(Payload payload) {
- callback.onSuccess(RsocketUtils.parseResponseFromPayload(payload));
- }
+
+ response.toFuture().acceptEither(RsocketConnection.failAfter(requestCallBack.getTimeout()),
+ new Consumer() {
+ @Override
+ public void accept(Payload payload) {
+ requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
+ }
+ }).exceptionally(throwable -> {
+ requestCallBack.onException(throwable);
+ return null;
});
+
} catch (Exception e) {
- callback.onFailure(e);
+ requestCallBack.onException(e);
}
}
+ private static CompletableFuture failAfter(final long timeouts) {
+ final CompletableFuture promise = new CompletableFuture();
+ RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable