request callback timeout support both at client and server; split sdk (#3776)

and cluster grpc server port.
This commit is contained in:
nov.lzf 2020-09-07 18:49:17 +08:00 committed by GitHub
parent d82222d3f6
commit 723dedebfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 623 additions and 171 deletions

View File

@ -112,12 +112,6 @@ public class ConfigChangeNotifyRequest extends ServerPushRequest {
this.tenant = tenant;
}
@Override
public String toString() {
return "ConfigChangeNotifyResponse{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", tenant='"
+ tenant + '\'' + '}';
}
/**
* Getter method for property <tt>beta</tt>.
*

View File

@ -145,6 +145,12 @@ public class ConfigChangeBatchListenResponse extends Response {
public void setTenant(String tenant) {
this.tenant = tenant;
}
@Override
public String toString() {
return "ConfigContext{" + "group='" + group + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant
+ '\'' + '}';
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote;
/**
* abstract request call back.
*
* @author liuzunfei
* @version $Id: AbstractRequestCallBack.java, v 0.1 2020年09月07日 3:30 PM liuzunfei Exp $
*/
public abstract class AbstractRequestCallBack implements RequestCallBack {
long timeoutMills;
public AbstractRequestCallBack(long timeoutMill) {
this.timeoutMills = timeoutMill;
}
public AbstractRequestCallBack() {
this(3000L);
}
@Override
public long getTimeout() {
return timeoutMills;
}
}

View File

@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.remote.response.Response;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
@ -40,8 +42,12 @@ public class DefaultRequestFuture implements RequestFuture {
private String requestId;
private String connectionId;
private Response response;
private ScheduledFuture timeoutFuture;
/**
* Getter method for property <tt>requestCallBack</tt>.
*
@ -63,32 +69,39 @@ public class DefaultRequestFuture implements RequestFuture {
public DefaultRequestFuture() {
}
public DefaultRequestFuture(String requestId) {
this(requestId, null);
public DefaultRequestFuture(String connectionId, String requestId) {
this(connectionId, requestId, null);
}
public DefaultRequestFuture(String requestId, RequestCallBack requestCallBack) {
public DefaultRequestFuture(String connectionId, String requestId, RequestCallBack requestCallBack) {
this.timeStamp = System.currentTimeMillis();
this.requestCallBack = requestCallBack;
this.requestId = requestId;
this.connectionId = connectionId;
this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER
.schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS);
}
public void setResponse(Response response) {
public void setResponse(final Response response) {
isDone = true;
this.response = response;
this.isSuccess = response.isSuccess();
if (this.timeoutFuture != null) {
timeoutFuture.cancel(true);
}
synchronized (this) {
notifyAll();
}
if (requestCallBack != null) {
requestCallBack.onResponse(response);
requestCallBack.getExcutor().execute(new CallBackHandler());
}
}
public void setFailResult(Exception e) {
isDone = true;
isSuccess = false;
this.exception = e;
synchronized (this) {
notifyAll();
}
@ -142,4 +155,45 @@ public class DefaultRequestFuture implements RequestFuture {
throw new TimeoutException();
}
}
class CallBackHandler implements Runnable {
@Override
public void run() {
if (exception != null) {
requestCallBack.onException(exception);
} else {
requestCallBack.onResponse(response);
}
}
}
class TimeoutHandler implements Runnable {
public TimeoutHandler() {
}
@Override
public void run() {
setFailResult(new TimeoutException("Timeout After " + requestCallBack.getTimeout() + " millseconds."));
}
}
/**
* Getter method for property <tt>connectionId</tt>.
*
* @return property value of connectionId
*/
public String getConnectionId() {
return connectionId;
}
/**
* Setter method for property <tt>timeoutFuture</tt>.
*
* @param timeoutFuture value to be assigned to property timeoutFuture
*/
public void setTimeoutFuture(ScheduledFuture timeoutFuture) {
this.timeoutFuture = timeoutFuture;
}
}

View File

@ -18,6 +18,8 @@ package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.remote.response.Response;
import java.util.concurrent.Executor;
/**
* call bakck for request.
*
@ -26,6 +28,13 @@ import com.alibaba.nacos.api.remote.response.Response;
*/
public interface RequestCallBack {
/**
* get executor on callback.
*
* @return
*/
public Executor getExcutor();
/**
* get timeout mills.
*
@ -45,6 +54,6 @@ public interface RequestCallBack {
*
* @param e exception throwed.
*/
public void onException(Exception e);
public void onException(Throwable e);
}

View File

@ -21,7 +21,7 @@ import com.alibaba.nacos.api.remote.response.Response;
import java.util.concurrent.TimeoutException;
/**
* future for request
* future for request.
* @author liuzunfei
* @version $Id: RequestFuture.java, v 0.1 2020年09月01日 6:31 PM liuzunfei Exp $
*/

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/**
* rpc scheduler executor .
*
* @author liuzunfei
* @version $Id: RpcScheduledExecutor.java, v 0.1 2020年09月07日 4:12 PM liuzunfei Exp $
*/
public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor {
public static final RpcScheduledExecutor TIMEOUT_SHEDULER = new RpcScheduledExecutor(
"com.alibaba.nacos.remote.TimerScheduler");
public RpcScheduledExecutor(final String threadName) {
super(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadName);
}
});
}
}

View File

@ -28,8 +28,6 @@ public interface PushCallBack {
public void onSuccess();
public void onFail(Exception e);
public void onTimeout();
public void onFail(Throwable e);
}

View File

@ -30,6 +30,7 @@ import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.naming.remote.response.ServiceListResponse;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.selector.AbstractSelector;
@ -42,6 +43,8 @@ import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.JacksonUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
@ -62,7 +65,10 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC);
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC, labels);
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}

View File

@ -22,6 +22,8 @@ import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
@ -34,10 +36,13 @@ import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.Executor;
import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR;
import static com.alibaba.nacos.api.common.Constants.WORD_SEPARATOR;
@ -51,8 +56,8 @@ public class ConfigTest {
public void before() throws Exception {
Properties properties = new Properties();
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848");
configService = NacosFactory.createConfigService(properties);
@ -61,7 +66,10 @@ public class ConfigTest {
@Test
public void test222() throws Exception {
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET);
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels);
client.init(new ServerListFactory() {
@Override
public String genNextServer() {
@ -120,6 +128,62 @@ public class ConfigTest {
}
@Test
public void test333() throws Exception {
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.GRPC, labels);
client.init(new ServerListFactory() {
@Override
public String genNextServer() {
return "127.0.0.1:8848";
}
@Override
public String getCurrentServer() {
return "127.0.0.1:8848";
}
@Override
public List<String> getServerList() {
return Lists.newArrayList("127.0.0.1:8848");
}
});
client.start();
ConfigBatchListenRequest syncRequest = new ConfigBatchListenRequest();
syncRequest.setListen(true);
final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc";
syncRequest.addConfigListenContext(group, dataId, null, null);
long start = System.currentTimeMillis();
System.out.println("send :" + System.currentTimeMillis());
client.asyncRequest(syncRequest, new AbstractRequestCallBack(2001L) {
@Override
public Executor getExcutor() {
return null;
}
@Override
public void onResponse(Response response) {
System.out.println("onSuccess:" + response);
System.out.println("receive :" + System.currentTimeMillis());
}
@Override
public void onException(Throwable throwable) {
System.out.println("onFailure:" + throwable);
}
});
Thread.sleep(10000L);
}
@After
public void cleanup() throws Exception {
configService.shutDown();
@ -181,8 +245,6 @@ public class ConfigTest {
final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc";
final String content = "lessspring-" + System.currentTimeMillis();
System.out.println(System.getProperty("nacos.logging.path"));
System.out.println(System.getProperty("limitTime"));
Thread th = new Thread(new Runnable() {
@Override
@ -192,11 +254,10 @@ public class ConfigTest {
int times = 1000;
while (times > 0) {
try {
configService.publishConfig(dataId + random.nextInt(10), group,
"value" + System.currentTimeMillis());
configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
times--;
Thread.sleep(2000L);
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
@ -217,9 +278,11 @@ public class ConfigTest {
}
};
configService.addListener(dataId, group, listener);
for (int i = 0; i < 20; i++) {
final int ls = i;
configService.addListener(dataId + i, group, listener);
//configService.addListener(dataId + i, group, listener);
}

View File

@ -18,26 +18,16 @@ package com.alibaba.nacos.client.config.listener.impl;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.impl.ClientWorker;
import com.alibaba.nacos.client.utils.ParamUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import static org.mockito.Mockito.mock;
public class ClientWorkerTest {
@Mock
@ -53,37 +43,6 @@ public class ClientWorkerTest {
private final String currentLongingTaskCount = "currentLongingTaskCount";
@Before
public void init() {
MockitoAnnotations.initMocks(this);
clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class),
mock(Properties.class));
try {
Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService");
executorServiceField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL);
executorServiceField.set(clientWorker, scheduledExecutorService);
Listener listener = new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
}
};
listeners = Arrays.asList(listener);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
@Test
public void testAddLongPollNumberThreads() {
try {

View File

@ -87,8 +87,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>

View File

@ -17,10 +17,10 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.google.common.util.concurrent.FutureCallback;
import java.util.HashMap;
import java.util.Map;
@ -75,18 +75,27 @@ public abstract class Connection {
/**
* send request.
*
* default time out 3 seconds.
* @param request request.
* @return
*/
public abstract Response request(Request request, RequestMeta requestMeta) throws NacosException;
/**
* send request.
*
* @param request request.
* @param timeoutMills mills of timeouts.
* @return
*/
public abstract Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException;
/**
* send aync request.
*
* @param request request.
*/
public abstract void asyncRequest(Request request, RequestMeta requestMeta, FutureCallback<Response> callback)
public abstract void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
throws NacosException;
/**

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.PayloadRegistry;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
@ -30,7 +31,6 @@ import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.FutureCallback;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
@ -462,7 +462,7 @@ public abstract class RpcClient implements Closeable {
* @param request request.
* @return
*/
public void asyncRequest(Request request, FutureCallback<Response> callback) throws NacosException {
public void asyncRequest(Request request, RequestCallBack callback) throws NacosException {
int retryTimes = 3;
Exception exceptionToThrow = null;

View File

@ -18,7 +18,8 @@ package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient;
import com.alibaba.nacos.common.remote.client.grpc.GrpcSdkClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient;
import java.util.HashMap;
@ -70,23 +71,25 @@ public class RpcClientFactory {
* @param connectionType client type.
* @return
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType) {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
String clientNameInner = clientName;
synchronized (clientMap) {
if (clientMap.get(clientName) == null) {
if (clientMap.get(clientNameInner) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcClient(clientName);
moduleClient = new GrpcSdkClient(clientNameInner);
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient(clientName);
moduleClient = new RsocketRpcClient(clientNameInner);
}
if (moduleClient == null) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
clientMap.put(clientName, moduleClient);
moduleClient.initLabels(labels);
clientMap.put(clientNameInner, moduleClient);
return moduleClient;
}
return clientMap.get(clientName);
return clientMap.get(clientNameInner);
}
}
@ -97,14 +100,14 @@ public class RpcClientFactory {
* @param connectionType client type.
* @return
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
//TODO to be deleted.
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels) {
String clientNameInner = clientName;
synchronized (clientMap) {
if (clientMap.get(clientNameInner) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcClient(clientNameInner);
moduleClient = new GrpcClusterClient(clientNameInner);
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient(clientNameInner);

View File

@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
* @author liuzunfei
* @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $
*/
public class GrpcClient extends RpcClient {
public abstract class GrpcClient extends RpcClient {
static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
@ -101,10 +101,6 @@ public class GrpcClient extends RpcClient {
}
}
@Override
public int rpcPortOffset() {
return 1000;
}
/**
* Send Heart Beat Request.
@ -174,6 +170,7 @@ public class GrpcClient extends RpcClient {
@Override
public void onNext(Payload payload) {
LOGGER.debug(" stream server reuqust receive ,original info :{}", payload.toString());
try {
final Request request = (Request) GrpcUtils.parse(payload).getBody();

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
*/
public class GrpcClusterClient extends GrpcClient {
/**
* Empty constructor.
*
* @param name name of client.
*/
public GrpcClusterClient(String name) {
super(name);
}
@Override
public int rpcPortOffset() {
return 1001;
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
@ -34,8 +35,11 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* grpc connection.
@ -48,7 +52,7 @@ public class GrpcConnection extends Connection {
/**
* executor to execute future request.
*/
static ExecutorService aynsRequestExecutor = Executors
static ScheduledExecutorService aynsRequestExecutor = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
/**
@ -75,21 +79,27 @@ public class GrpcConnection extends Connection {
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
return request(request, requestMeta, 3000L);
}
@Override
public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse = null;
try {
grpcResponse = requestFuture.get();
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
return null;
}
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
return response;
}
public void sendResponse(Response response) {
Payload convert = GrpcUtils.convert(response);
payloadStreamObserver.onNext(convert);
@ -101,18 +111,20 @@ public class GrpcConnection extends Connection {
}
@Override
public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback<Response> callback)
public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack)
throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
//set callback .
Futures.addCallback(requestFuture, new FutureCallback<Payload>() {
@Override
public void onSuccess(@NullableDecl Payload grpcResponse) {
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
if (response != null && response.isSuccess()) {
callback.onSuccess(response);
requestCallBack.onResponse(response);
} else {
callback.onFailure(new NacosException(
requestCallBack.onException(new NacosException(
(response == null) ? ResponseCode.FAIL.getCode() : response.getErrorCode(),
(response == null) ? "null" : response.getMessage()));
}
@ -120,9 +132,18 @@ public class GrpcConnection extends Connection {
@Override
public void onFailure(Throwable throwable) {
callback.onFailure(throwable);
if (throwable instanceof CancellationException) {
requestCallBack.onException(
new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " millseconds."));
} else {
requestCallBack.onException(throwable);
}
}
}, aynsRequestExecutor);
// set timeout future.
ListenableFuture<Payload> payloadListenableFuture = Futures
.withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS, aynsRequestExecutor);
}
@Override

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
*/
public class GrpcOpsClient extends GrpcClient {
/**
* Empty constructor.
*
* @param name name of client.
*/
public GrpcOpsClient(String name) {
super(name);
}
@Override
public int rpcPortOffset() {
return 1002;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
*/
public class GrpcSdkClient extends GrpcClient {
/**
* Empty constructor.
*
* @param name name of client.
*/
public GrpcSdkClient(String name) {
super(name);
}
@Override
public int rpcPortOffset() {
return 1000;
}
}

View File

@ -17,19 +17,26 @@
package com.alibaba.nacos.common.remote.client.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.RsocketUtils;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.google.common.util.concurrent.FutureCallback;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* rsocket connection.
*
@ -47,29 +54,51 @@ public class RsocketConnection extends Connection {
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
return request(request, requestMeta, 3000L);
}
@Override
public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload response = rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta))
.block();
.block(Duration.ofMillis(timeouts));
return RsocketUtils.parseResponseFromPayload(response);
}
@Override
public void asyncRequest(Request request, RequestMeta requestMeta, final FutureCallback<Response> callback)
public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack)
throws NacosException {
try {
Mono<Payload> response = rSocketClient
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
response.subscribe(new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
callback.onSuccess(RsocketUtils.parseResponseFromPayload(payload));
}
response.toFuture().acceptEither(RsocketConnection.<Payload>failAfter(requestCallBack.getTimeout()),
new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
}
}).exceptionally(throwable -> {
requestCallBack.onException(throwable);
return null;
});
} catch (Exception e) {
callback.onFailure(e);
requestCallBack.onException(e);
}
}
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
return promise.completeExceptionally(ex);
}
}, timeouts, MILLISECONDS);
return promise;
}
@Override
public void close() {
if (this.rSocketClient != null && !rSocketClient.isDisposed()) {

View File

@ -73,7 +73,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
private ConnectionManager connectionManager;
/**
* adaptor to config module ,when server side congif change ,invoke this method.
* adaptor to config module ,when server side config change ,invoke this method.
*
* @param groupKey groupKey
* @param notifyRequet notifyRequet
@ -84,7 +84,6 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
if (listeners == null || listeners.isEmpty()) {
return;
}
Set<String> clients = new HashSet<>(listeners);
int notifyCount = 0;
if (!CollectionUtils.isEmpty(clients)) {
@ -100,8 +99,8 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
continue;
}
}
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 5, client);
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 50, client);
push(rpcPushRetryTask);
notifyCount++;
}
@ -168,24 +167,17 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
}
@Override
public void onFail(Exception e) {
public void onFail(Throwable e) {
Loggers.CORE.error("On failt ", e);
Loggers.CORE.warn("push fail.dataId={},group={},tenant={},clientId={},tryTimes={}",
notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId,
retryTimes);
push(RpcPushTask.this);
}
@Override
public void onTimeout() {
Loggers.CORE.warn("push timeout.dataId={},group={},tenant={},clientId={},tryTimes={}",
notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId,
retryTimes);
push(RpcPushTask.this);
}
});
}, ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR);
tryTimes++;
}
}

View File

@ -109,12 +109,13 @@ public class ClusterRpcClientProxy extends MemberChangeListener {
}
private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
RpcClient client = RpcClientFactory.createClient(memberClientKey(member), type);
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
RpcClient client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels);
if (!client.getConnectionType().equals(type)) {
RpcClientFactory.destroyClient(memberClientKey(member));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
client = RpcClientFactory.createClient(memberClientKey(member), type, labels);
client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels);
}
if (client.isWaitInited()) {
@ -144,6 +145,7 @@ public class ClusterRpcClientProxy extends MemberChangeListener {
/**
* send request to member.
*
* @param member
* @param request
* @return

View File

@ -60,10 +60,11 @@ public class RpcAckCallbackSynchronizer {
return;
}
DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.get(response.getRequestId());
DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.remove(response.getRequestId());
if (currentCallback == null) {
return;
}
if (response.isSuccess()) {
currentCallback.setResponse(response);
} else {
@ -71,6 +72,23 @@ public class RpcAckCallbackSynchronizer {
}
}
/**
* notify ackid.
*/
public static void exceptionNotify(String connectionId, String requestId, Exception e) {
Map<String, DefaultRequestFuture> stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connectionId);
if (stringDefaultPushFutureMap == null) {
return;
}
DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.remove(requestId);
if (currentCallback == null) {
return;
}
currentCallback.setFailResult(e);
}
/**
* notify ackid.
*/

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.response.Response;
@ -26,6 +26,8 @@ import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
/**
* push response to clients.
*
@ -45,14 +47,16 @@ public class RpcPushService {
* @param request request.
* @param requestCallBack requestCallBack.
*/
public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack) {
public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack,
Executor executor) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
try {
connection.sendRequestWithCallBack(request, new RequestCallBack() {
connection.sendRequestWithCallBack(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
@Override
public long getTimeout() {
return requestCallBack.getTimeout();
public Executor getExcutor() {
return executor;
}
@Override
@ -65,7 +69,7 @@ public class RpcPushService {
}
@Override
public void onException(Exception e) {
public void onException(Throwable e) {
requestCallBack.onFail(e);
}
});

View File

@ -45,22 +45,24 @@ public abstract class RpcServer {
@PostConstruct
public void start() throws Exception {
Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getConnectionType(),
Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getClass().getSimpleName(),
(ApplicationUtils.getPort() + rpcPortOffset()));
startServer();
Loggers.RPC.info("Nacos {} Rpc server started at port {}", getConnectionType(),
Loggers.RPC.info("Nacos {} Rpc server started at port {}", getClass().getSimpleName(),
(ApplicationUtils.getPort() + rpcPortOffset()));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Loggers.RPC.info("Nacos {} Rpc server stopping", getConnectionType());
Loggers.RPC.info("Nacos {} Rpc server stopping", getClass().getSimpleName());
try {
RpcServer.this.stopServer();
Loggers.RPC.info("Nacos {} Rpc server stopped successfully...", getConnectionType());
Loggers.RPC.info("Nacos {} Rpc server stopped successfully...",
RpcServer.this.getClass().getSimpleName());
} catch (Exception e) {
Loggers.RPC.error("Nacos {} Rpc server stopped fail...", getConnectionType(), e);
Loggers.RPC
.error("Nacos {} Rpc server stopped fail...", RpcServer.this.getClass().getSimpleName(), e);
}
}
});

View File

@ -0,0 +1,36 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import org.springframework.stereotype.Service;
/**
* Grpc implementation as a rpc server.
*
* @author liuzunfei
* @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $
*/
@Service
public class GrpcClusterServer extends GrpcServer {
private static final int PORT_OFFSET = 1001;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
}

View File

@ -37,6 +37,7 @@ import io.grpc.stub.StreamObserver;
/**
* grpc connection.
*
* @author liuzunfei
* @version $Id: GrpcConnection.java, v 0.1 2020年07月13日 7:26 PM liuzunfei Exp $
*/
@ -97,7 +98,7 @@ public class GrpcConnection extends Connection {
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
sendRequestNoAck(request);
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(requestId, callBack);
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(this.getConnectionId(), requestId, callBack);
RpcAckCallbackSynchronizer.syncCallback(getConnectionId(), requestId, defaultPushFuture);
return defaultPushFuture;
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import org.springframework.stereotype.Service;
/**
* Grpc implementation as a rpc server.
*
* @author liuzunfei
* @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $
*/
@Service
public class GrpcSdkServer extends GrpcServer {
private static final int PORT_OFFSET = 1000;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
}

View File

@ -44,7 +44,6 @@ import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.util.MutableHandlerRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Grpc implementation as a rpc server.
@ -52,10 +51,7 @@ import org.springframework.stereotype.Service;
* @author liuzunfei
* @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $
*/
@Service
public class GrpcServer extends RpcServer {
private static final int PORT_OFFSET = 1000;
public abstract class GrpcServer extends RpcServer {
private Server server;
@ -162,11 +158,6 @@ public class GrpcServer extends RpcServer {
}
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
@Override
public void shundownServer() {
if (server != null) {

View File

@ -19,6 +19,7 @@ package com.alibaba.nacos.core.remote.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
@ -33,9 +34,13 @@ import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* connection of rsocket.
*
@ -107,28 +112,40 @@ public class RsocketConnection extends Connection {
}
@Override
public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException {
public void sendRequestWithCallBack(Request request, RequestCallBack requestCallBack) throws NacosException {
long id = System.currentTimeMillis();
Loggers.RPC_DIGEST.info("Rsocket sendRequestWithCallBack :" + request);
Mono<Payload> payloadMono = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta()));
payloadMono.subscribe(new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
Response response = RsocketUtils.parseResponseFromPayload(payload);
callBack.onResponse(response);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
callBack.onException(new Exception(throwable));
}
});
try {
Mono<Payload> response = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta()));
response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
}
}).exceptionally(throwable -> {
requestCallBack.onException(throwable);
return null;
});
} catch (Exception e) {
requestCallBack.onException(e);
}
}
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
return promise.completeExceptionally(ex);
}
}, timeouts, MILLISECONDS);
return promise;
}
@Override

View File

@ -19,7 +19,6 @@ package com.alibaba.nacos.naming.cluster.remote.grpc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
import com.alibaba.nacos.naming.cluster.remote.ClusterClient;
@ -33,7 +32,7 @@ public class GrpcClusterClient implements ClusterClient {
private final GrpcClient grpcClient;
public GrpcClusterClient(String targetAddress) {
this.grpcClient = new GrpcClient(targetAddress);
this.grpcClient = new com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient(targetAddress);
}
@Override