client use same executor during different connection;metrics count bugfix.;recommend server check (#4659)

* client use same executor during different connection.

* metrics count bugfix.

* recommend server check

* remove rsocket ;add client metrics

* add client metrics to get cache value

* rpc tps control basic api submit.

* check style ,pmd fix.
This commit is contained in:
nov.lzf 2021-01-11 14:11:32 +08:00 committed by GitHub
parent c66aba5f4d
commit 6027d36222
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 661 additions and 3379 deletions

View File

@ -19,7 +19,7 @@
<parent>
<artifactId>nacos-all</artifactId>
<groupId>com.alibaba.nacos</groupId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -106,14 +106,6 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>

View File

@ -0,0 +1,107 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.request;
import com.alibaba.nacos.api.remote.request.ServerRequest;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* request of config module metrics.
*
* @author liuzunfei
* @version $Id: ClientConfigMetricRequest.java, v 0.1 2020年12月30日 9:05 PM liuzunfei Exp $
*/
public class ClientConfigMetricRequest extends ServerRequest {
private List<MetricsKey> metricsKeys = new ArrayList<MetricsKey>();
@Override
public String getModule() {
return "config";
}
public List<MetricsKey> getMetricsKeys() {
return metricsKeys;
}
public void setMetricsKeys(List<MetricsKey> metricsKeys) {
this.metricsKeys = metricsKeys;
}
public static class MetricsKey implements Serializable {
String type;
String key;
/**
* budile metrics key.
* @param type type.
* @param key key.
* @return metric key.
*/
public static MetricsKey build(String type, String key) {
MetricsKey metricsKey = new MetricsKey();
metricsKey.type = type;
metricsKey.key = key;
return metricsKey;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
@Override
public String toString() {
return "MetricsKey{" + "type='" + type + '\'' + ", key='" + key + '\'' + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MetricsKey that = (MetricsKey) o;
return Objects.equals(type, that.type) && Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(type, key);
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.response;
import com.alibaba.nacos.api.remote.response.Response;
import java.util.HashMap;
import java.util.Map;
/**
* client config metrics response.
*
* @author liuzunfei
* @version $Id: ClientConfigMetricResponse.java, v 0.1 2020年12月30日 2:59 PM liuzunfei Exp $
*/
public class ClientConfigMetricResponse extends Response {
private Map<String, Object> metrics = new HashMap<>();
public Map<String, Object> getMetrics() {
return metrics;
}
public void setMetrics(Map<String, Object> metrics) {
this.metrics = metrics;
}
public void putMetric(String key, Object value) {
metrics.put(key, value);
}
}

View File

@ -115,9 +115,11 @@ public class DefaultRequestFuture implements RequestFuture {
private void callBacInvoke() {
if (requestCallBack != null) {
requestCallBack.getExecutor().execute(new CallBackHandler());
} else {
new CallBackHandler().run();
if (requestCallBack.getExecutor() != null) {
requestCallBack.getExecutor().execute(new CallBackHandler());
} else {
new CallBackHandler().run();
}
}
}
@ -165,7 +167,7 @@ public class DefaultRequestFuture implements RequestFuture {
if (timeoutInnerTrigger != null) {
timeoutInnerTrigger.triggerOnTimeout();
}
throw new TimeoutException();
throw new TimeoutException("request timeout after " + timeout + " milliseconds.");
}
}

View File

@ -35,8 +35,6 @@ public class RequestMeta {
private String clientVersion = "";
private String appName = "";
private Map<String, String> labels = new HashMap<String, String>();
/**
@ -129,24 +127,6 @@ public class RequestMeta {
this.clientIp = clientIp;
}
/**
* get app name.
*
* @return
*/
public String getAppName() {
return appName;
}
/**
* set app name.
*
* @param appName app name.
*/
public void setAppName(String appName) {
this.appName = appName;
}
@Override
public String toString() {
return "RequestMeta{" + "connectionId='" + connectionId + '\'' + ", clientIp='" + clientIp + '\''

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -19,7 +19,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -20,12 +20,14 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigReSyncRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
@ -46,6 +48,7 @@ import com.alibaba.nacos.client.config.utils.ContentUtils;
import com.alibaba.nacos.client.config.utils.ParamUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.utils.AppNameUtils;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TenantUtil;
@ -60,9 +63,11 @@ import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
@ -489,6 +494,37 @@ public class ClientWorker implements Closeable {
.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
}
private Map<String, Object> getMetrics(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
Map<String, Object> metric = new HashMap<>(16);
metric.put("listenKeys", String.valueOf(this.cacheMap.get().size()));
metric.put("clientVersion", VersionUtils.getFullClientVersion());
Map<ClientConfigMetricRequest.MetricsKey, Object> metricValues = getMetricsValue(metricsKeys);
metric.put("metricValues", metricValues);
Map<String, Object> metrics = new HashMap<String, Object>(1);
metrics.put(uuid, JacksonUtils.toJson(metric));
return metrics;
}
private Map<ClientConfigMetricRequest.MetricsKey, Object> getMetricsValue(
List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
if (metricsKeys == null) {
return null;
}
Map<ClientConfigMetricRequest.MetricsKey, Object> values = new HashMap<>(16);
for (ClientConfigMetricRequest.MetricsKey metricsKey : metricsKeys) {
if ("cacheData".equals(metricsKey.getType())) {
values.putIfAbsent(metricsKey, cacheMap.get().get(metricsKey.getKey()));
}
if ("snapshotData".equals(metricsKey.getType())) {
String[] configStr = GroupKey.parseKey(metricsKey.getKey());
String snapshot = LocalConfigInfoProcessor
.getSnapshot(this.agent.getName(), configStr[0], configStr[1], configStr[2]);
values.putIfAbsent(metricsKey, snapshot);
}
}
return values;
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
@ -554,6 +590,8 @@ public class ClientWorker implements Closeable {
Map<String, String> labels = new HashMap<String, String>(2, 1);
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_CONFIG);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
return labels;
}
@ -582,6 +620,15 @@ public class ClientWorker implements Closeable {
return null;
});
rpcClientInner.registerServerRequestHandler((request, requestMeta) -> {
if (request instanceof ClientConfigMetricRequest) {
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys()));
return response;
}
return null;
});
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override

View File

@ -21,7 +21,7 @@
<parent>
<artifactId>nacos-all</artifactId>
<groupId>com.alibaba.nacos</groupId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -46,14 +46,7 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>

View File

@ -24,11 +24,6 @@ package com.alibaba.nacos.common.remote;
*/
public enum ConnectionType {
/**
* Rsocket connection.
*/
RSOCKET("RSOCKET", "Rsocket Connection"),
/**
* gRPC connection.
*/
@ -48,7 +43,7 @@ public enum ConnectionType {
return null;
}
private ConnectionType(String type, String name) {
ConnectionType(String type, String name) {
this.type = type;
this.name = name;
}

View File

@ -31,7 +31,6 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.AppNameUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.NumberUtil;
import com.alibaba.nacos.common.utils.StringUtils;
@ -108,7 +107,6 @@ public abstract class RpcClient implements Closeable {
RequestMeta meta = new RequestMeta();
meta.setClientVersion(VersionUtils.getFullClientVersion());
meta.setClientIp(NetUtils.localIP());
meta.setAppName(AppNameUtils.getAppName());
meta.setLabels(labels);
return meta;
}
@ -263,6 +261,14 @@ public abstract class RpcClient implements Closeable {
while (true) {
try {
ReconnectContext reconnectContext = reconnectionSignal.take();
if (reconnectContext.serverInfo != null) {
//clear recommend server if server is not in server list.
String address = reconnectContext.serverInfo.serverIp + Constants.COLON
+ reconnectContext.serverInfo.serverPort;
if (!getServerListFactory().getServerList().contains(address)) {
reconnectContext.serverInfo = null;
}
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
//Do nothing

View File

@ -20,7 +20,6 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClusterClient;
import com.alibaba.nacos.common.remote.client.grpc.GrpcSdkClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient;
import java.util.HashMap;
import java.util.Map;
@ -78,8 +77,6 @@ public class RpcClientFactory {
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcSdkClient(clientNameInner);
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient(clientNameInner);
}
if (moduleClient == null) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
@ -108,8 +105,6 @@ public class RpcClientFactory {
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcClusterClient(clientNameInner);
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient(clientNameInner);
}
if (moduleClient == null) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());

View File

@ -34,8 +34,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,7 +85,7 @@ public abstract class GrpcClient extends RpcClient {
private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) {
ManagedChannelBuilder<?> o = ManagedChannelBuilder.forAddress(serverIp, serverPort).executor(executor)
.usePlaintext();
.keepAliveTime(30, TimeUnit.SECONDS).usePlaintext();
ManagedChannel managedChannelTemp = o.build();
@ -128,7 +126,7 @@ public abstract class GrpcClient extends RpcClient {
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
Payload grpcRequest = GrpcUtils.convert(serverCheckRequest, buildMeta());
ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
Payload response = responseFuture.get();
Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
return response != null;
} catch (Exception e) {
return false;
@ -183,14 +181,7 @@ public abstract class GrpcClient extends RpcClient {
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
GrpcClient.this.getName(), throwable);
if (throwable instanceof StatusRuntimeException) {
Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode();
if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
switchServerAsync();
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]ignore error event,isRunning:{},isAbandon={}",
GrpcClient.this.getName(), isRunning, isAbandon);

View File

@ -1,146 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* rsocket connection.
*
* @author liuzunfei
* @version $Id: RsocketConnection.java, v 0.1 2020年08月09日 2:57 PM liuzunfei Exp $
*/
public class RsocketConnection extends Connection {
private RSocket rSocketClient;
public RsocketConnection(RpcClient.ServerInfo serverInfo, RSocket rSocketClient) {
super(serverInfo);
this.rSocketClient = rSocketClient;
}
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
return request(request, requestMeta, 3000L);
}
@Override
public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload response = rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta))
.block(Duration.ofMillis(timeouts));
return RsocketUtils.parseResponseFromPayload(response);
}
@Override
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
final Mono<Payload> response = rSocketClient
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
final CompletableFuture<Payload> payloadCompletableFuture = response.toFuture();
return new RequestFuture() {
@Override
public boolean isDone() {
return payloadCompletableFuture.isDone();
}
@Override
public Response get() throws InterruptedException, ExecutionException {
Payload block = payloadCompletableFuture.get();
return RsocketUtils.parseResponseFromPayload(block);
}
@Override
public Response get(long timeout) throws TimeoutException, InterruptedException, ExecutionException {
Payload block = payloadCompletableFuture.get(timeout, TimeUnit.MILLISECONDS);
return RsocketUtils.parseResponseFromPayload(block);
}
};
}
@Override
public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack)
throws NacosException {
try {
Mono<Payload> response = rSocketClient
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
CompletableFuture<Payload> payloadCompletableFuture = response.toFuture();
payloadCompletableFuture.acceptEither(RsocketConnection.<Payload>failAfter(requestCallBack.getTimeout()),
new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
}
});
payloadCompletableFuture.exceptionally(new Function<Throwable, Payload>() {
@Override
public Payload apply(Throwable throwable) {
requestCallBack.onException(throwable);
return null;
}
});
} catch (Exception e) {
requestCallBack.onException(e);
}
}
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
return promise.completeExceptionally(ex);
}
}, timeouts, MILLISECONDS);
return promise;
}
@Override
public void close() {
if (this.rSocketClient != null && !rSocketClient.isDisposed()) {
rSocketClient.dispose();
}
}
@Override
public String toString() {
return "RsocketConnection{" + "serverInfo=" + serverInfo + ", labels=" + labels + '}';
}
}

View File

@ -1,172 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.rsocket;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.UnKnowResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicReference;
/**
* rsocket implementation of rpc client.
*
* @author liuzunfei
* @version $Id: RsocketRpcClient.java, v 0.1 2020年08月06日 10:46 AM liuzunfei Exp $
*/
public class RsocketRpcClient extends RpcClient {
static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
private static final int RSOCKET_PORT_OFFSET = 1100;
private AtomicReference<RSocket> rSocketClient = new AtomicReference<RSocket>();
public RsocketRpcClient(String name) {
super(name);
}
@Override
public ConnectionType getConnectionType() {
return ConnectionType.RSOCKET;
}
@Override
public int rpcPortOffset() {
return RSOCKET_PORT_OFFSET;
}
@Override
public Connection connectToServer(ServerInfo serverInfo) throws Exception {
RSocket rSocket = null;
try {
ConnectionSetupRequest connectionSetupRequest = new ConnectionSetupRequest();
Payload setUpPayload = RsocketUtils.convertRequestToPayload(connectionSetupRequest, buildMeta());
rSocket = RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor() {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
RSocket rsocket = new RSocketProxy(sendingSocket) {
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
final RsocketUtils.PlainRequest plainRequest = RsocketUtils
.parsePlainRequestFromPayload(payload);
try {
Response response = handleServerRequest(plainRequest.getBody(),
plainRequest.metadata);
response.setRequestId(plainRequest.getBody().getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
} catch (Exception e) {
Response response = new UnKnowResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
response.setRequestId(plainRequest.getBody().getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
}
} catch (Exception e) {
UnKnowResponse response = new UnKnowResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
return Mono
.just(DefaultPayload.create(RsocketUtils.convertResponseToPayload(response)));
}
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
final RsocketUtils.PlainRequest plainRequest = RsocketUtils
.parsePlainRequestFromPayload(payload);
handleServerRequest(plainRequest.getBody(), plainRequest.metadata);
return Mono.empty();
}
};
return Mono.just(rsocket);
}
}).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block();
RsocketConnection connection = new RsocketConnection(serverInfo, rSocket);
fireOnCloseEvent(rSocket, connection);
return connection;
} catch (Exception e) {
shutDownRsocketClient(rSocket);
throw e;
}
}
void shutDownRsocketClient(RSocket client) {
if (client != null && !client.isDisposed()) {
client.dispose();
}
}
void fireOnCloseEvent(final RSocket rSocket, final Connection connectionInner) {
Subscriber subscriber = new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
if (isRunning() && !connectionInner.isAbandon()) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
@Override
public void onComplete() {
if (isRunning() && !connectionInner.isAbandon()) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
};
rSocket.onClose().subscribe(subscriber);
}
}

View File

@ -1,264 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.rsocket;
import com.alibaba.nacos.api.exception.runtime.NacosDeserializationException;
import com.alibaba.nacos.api.exception.runtime.NacosSerializationException;
import com.alibaba.nacos.api.remote.PayloadRegistry;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import io.rsocket.Payload;
import io.rsocket.util.DefaultPayload;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* rsocket utils.
*
* @author liuzunfei
* @version $Id: RsocketUtils.java, v 0.1 2020年08月06日 2:25 PM liuzunfei Exp $
*/
public class RsocketUtils {
static ObjectMapper mapper = new ObjectMapper();
static {
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
/**
* Object to json string.
*
* @param obj obj
* @return json string
* @throws NacosSerializationException if transfer failed
*/
public static String toJson(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new NacosSerializationException(obj.getClass(), e);
}
}
/**
* Json string deserialize to Object.
*
* @param json json string
* @param cls class of object
* @param <T> General type
* @return object
* @throws NacosDeserializationException if deserialize failed
*/
private static <T> T toObj(String json, Class<T> cls) {
try {
return mapper.readValue(json, cls);
} catch (IOException e) {
throw new NacosDeserializationException(cls, e);
}
}
/**
* Json string deserialize to Jackson {@link JsonNode}.
*
* @param json json string
* @return {@link JsonNode}
* @throws NacosDeserializationException if deserialize failed
*/
public static JsonNode toObj(String json) {
try {
return mapper.readTree(json);
} catch (IOException e) {
throw new NacosDeserializationException(e);
}
}
/**
* convert request to palyload.
*
* @param request request.
* @param meta request meta.
* @return payload of rsocket
*/
public static Payload convertRequestToPayload(Request request, RequestMeta meta) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("clientPort", meta.getClientPort());
jsonObject.addProperty("connectionId", meta.getConnectionId());
jsonObject.addProperty("clientIp", meta.getClientIp());
jsonObject.addProperty("clientVersion", meta.getClientVersion());
jsonObject.addProperty("labels", toJson(meta.getLabels()));
jsonObject.addProperty("headers", toJson(request.getHeaders()));
jsonObject.addProperty("type", request.getClass().getName());
request.clearHeaders();
return DefaultPayload.create(toJson(request).getBytes(), jsonObject.toString().getBytes());
}
/**
* convert response to palyload.
*
* @param response response.
* @return payload.
*/
public static Payload convertResponseToPayload(Response response) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("type", response.getClass().getName());
return DefaultPayload.create(toJson(response).getBytes(), jsonObject.toString().getBytes());
}
/**
* parse response from payload.
*
* @param payload payload.
* @return response.
*/
public static Response parseResponseFromPayload(Payload payload) {
//parse meta
String metaString = payload.getMetadataUtf8();
JsonNode metaJsonNode = toObj(metaString);
String type = metaJsonNode.get("type").textValue();
String bodyString = payload.getDataUtf8();
Class classbyType = PayloadRegistry.getClassByType(type);
PlainRequest plainRequest = new PlainRequest();
plainRequest.setType(type);
Response response = (Response) toObj(bodyString, classbyType);
return response;
}
/**
* parse plain request type from payload.
*
* @param payload payload.
* @return plain request.
*/
public static PlainRequest parsePlainRequestFromPayload(Payload payload) {
//parse meta
String metaString = payload.getMetadataUtf8();
JsonNode metaJsonNode = toObj(metaString);
String type = metaJsonNode.get("type").textValue();
Map<String, String> labels = (Map<String, String>) toObj(metaJsonNode.get("labels").textValue(), Map.class);
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientVersion(
metaJsonNode.has("clientVersion") ? metaJsonNode.get("clientVersion").textValue() : "");
requestMeta
.setConnectionId(metaJsonNode.has("connectionId") ? metaJsonNode.get("connectionId").textValue() : "");
requestMeta.setClientPort(metaJsonNode.has("clientPort") ? metaJsonNode.get("clientPort").intValue() : 0);
requestMeta.setClientIp(metaJsonNode.has("clientIp") ? metaJsonNode.get("clientIp").textValue() : "");
requestMeta.setLabels(labels);
String bodyString = payload.getDataUtf8();
Class classbyType = PayloadRegistry.getClassByType(type);
PlainRequest plainRequest = new PlainRequest();
plainRequest.setType(type);
Request request = (Request) toObj(bodyString, classbyType);
Map<String, String> headers = (Map<String, String>) toObj(metaJsonNode.get("headers").textValue(), Map.class);
request.putAllHeader(headers);
plainRequest.setBody(request);
plainRequest.setMetadata(requestMeta);
return plainRequest;
}
private static String getPayloadString(Payload payload) {
ByteBuffer data1 = payload.getData();
byte[] data = new byte[data1.remaining()];
payload.data().readBytes(data);
byte[] bytes = new byte[0];
return new String(bytes);
}
public static class PlainRequest {
String type;
Request body;
RequestMeta metadata;
@Override
public String toString() {
return "PlainRequest{" + "type='" + type + '\'' + ", body=" + body + ", metadata=" + metadata + '}';
}
/**
* Getter method for property <tt>metadata</tt>.
*
* @return property value of metadata
*/
public RequestMeta getMetadata() {
return metadata;
}
/**
* Setter method for property <tt>metadata</tt>.
*
* @param metadata value to be assigned to property metadata
*/
public void setMetadata(RequestMeta metadata) {
this.metadata = metadata;
}
/**
* Getter method for property <tt>type</tt>.
*
* @return property value of type
*/
public String getType() {
return type;
}
/**
* Setter method for property <tt>type</tt>.
*
* @param type value to be assigned to property type
*/
public void setType(String type) {
this.type = type;
}
/**
* Getter method for property <tt>body</tt>.
*
* @return property value of body
*/
public Request getBody() {
return body;
}
/**
* Setter method for property <tt>body</tt>.
*
* @param body value to be assigned to property body
*/
public void setBody(Request body) {
this.body = body;
}
}
}

View File

@ -1,97 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.utils;
import java.io.File;
/**
* appName util.
*
* @author Nacos
*/
public class AppNameUtils {
private static final String PARAM_MARKING_PROJECT = "project.name";
private static final String PARAM_MARKING_JBOSS = "jboss.server.home.dir";
private static final String PARAM_MARKING_JETTY = "jetty.home";
private static final String PARAM_MARKING_TOMCAT = "catalina.base";
private static final String LINUX_ADMIN_HOME = "/home/admin/";
private static final String SERVER_JBOSS = "jboss";
private static final String SERVER_JETTY = "jetty";
private static final String SERVER_TOMCAT = "tomcat";
private static final String SERVER_UNKNOWN = "unknown server";
public static String getAppName() {
String appName = null;
appName = getAppNameByProjectName();
if (appName != null) {
return appName;
}
appName = getAppNameByServerHome();
if (appName != null) {
return appName;
}
return "unknown";
}
private static String getAppNameByProjectName() {
return System.getProperty(PARAM_MARKING_PROJECT);
}
private static String getAppNameByServerHome() {
String serverHome = null;
if (SERVER_JBOSS.equals(getServerType())) {
serverHome = System.getProperty(PARAM_MARKING_JBOSS);
} else if (SERVER_JETTY.equals(getServerType())) {
serverHome = System.getProperty(PARAM_MARKING_JETTY);
} else if (SERVER_TOMCAT.equals(getServerType())) {
serverHome = System.getProperty(PARAM_MARKING_TOMCAT);
}
if (serverHome != null && serverHome.startsWith(LINUX_ADMIN_HOME)) {
return StringUtils.substringBetween(serverHome, LINUX_ADMIN_HOME, File.separator);
}
return null;
}
private static String getServerType() {
String serverType = null;
if (System.getProperty(PARAM_MARKING_JBOSS) != null) {
serverType = SERVER_JBOSS;
} else if (System.getProperty(PARAM_MARKING_JETTY) != null) {
serverType = SERVER_JETTY;
} else if (System.getProperty(PARAM_MARKING_TOMCAT) != null) {
serverType = SERVER_TOMCAT;
} else {
serverType = SERVER_UNKNOWN;
}
return serverType;
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient;
import org.junit.Test;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
/**
* junit test from rsocket rpc client.
*
* @author liuzunfei
* @version $Id: RSocketRpcClientTest.java, v 0.1 2020年08月07日 1:15 PM liuzunfei Exp $
*/
public class RSocketRpcClientTest {
@Test
public void testConectToServer() throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
System.out.println(compiler.getSourceVersions());
RsocketRpcClient rsocketRpcClient = new RsocketRpcClient("");
RpcClient.ServerInfo serverInfo = new RpcClient.ServerInfo();
serverInfo.setServerIp("127.0.0.1");
serverInfo.setServerPort(9948);
Connection rSocket = rsocketRpcClient.connectToServer(serverInfo);
System.out.println("Client :" + rSocket);
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -16,7 +16,9 @@
package com.alibaba.nacos.config.server.controller;
import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.config.server.constant.Constants;
@ -180,4 +182,32 @@ public class CommunicationController {
}
/**
* Get client config listener lists of subscriber in local machine.
*/
@GetMapping("/clientMetrics")
public Map<String, Object> getClientMetrics(@RequestParam("ip") String ip,
@RequestParam(value = "dataId", required = false) String dataId,
@RequestParam(value = "group", required = false) String group,
@RequestParam(value = "tenant", required = false) String tenant) {
Map<String, Object> metrics = new HashMap<>(16);
List<Connection> connectionsByIp = connectionManager.getConnectionByIp(ip);
for (Connection connectionByIp : connectionsByIp) {
try {
ClientConfigMetricRequest clientMetrics = new ClientConfigMetricRequest();
if (StringUtils.isNotBlank(dataId)) {
clientMetrics.getMetricsKeys().add(ClientConfigMetricRequest.MetricsKey
.build("cacheData", GroupKey2.getKey(dataId, group, tenant)));
}
ClientConfigMetricResponse request1 = (ClientConfigMetricResponse) connectionByIp
.request(clientMetrics, new RequestMeta());
metrics.putAll(request1.getMetrics());
} catch (NacosException e) {
e.printStackTrace();
}
}
return metrics;
}
}

View File

@ -87,7 +87,7 @@
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<properties>

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<artifactId>nacos-console</artifactId>
<packaging>jar</packaging>

View File

@ -0,0 +1,56 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.console.controller;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.HashMap;
import java.util.Map;
/**
* ClientMetricsController.
*
* @author zunfei.lzf
*/
@RequestMapping("/v1/console/client")
public class ClientMetricsController {
@Autowired
private ServerMemberManager serverMemberManager;
/**
* get client metric.
* @param clientIp client ip .
* @return
*/
@GetMapping("/metrics")
public ResponseEntity metric(@RequestParam String clientIp) {
Map<String, String> responseMap = new HashMap<>(3);
return ResponseEntity.ok().body(responseMap);
}
private Map<String, String> geMetrics(String clientIp) {
return null;
}
}

View File

@ -300,7 +300,8 @@ public class ServerLoaderController {
CompletionService<ServerLoaderMetrics> completionService = new ExecutorCompletionService<ServerLoaderMetrics>(
executorService);
int count = 0;
// default include self.
int count = 1;
for (Member member : serverMemberManager.allMembersWithoutSelf()) {
if (MemberUtil.isSupportedLongCon(member)) {
count++;
@ -309,6 +310,7 @@ public class ServerLoaderController {
}
}
int resultCount = 0;
List<ServerLoaderMetrics> responseList = new LinkedList<ServerLoaderMetrics>();
try {
@ -317,12 +319,12 @@ public class ServerLoaderController {
ServerLoaderMetrics metris = new ServerLoaderMetrics();
metris.setAddress(serverMemberManager.getSelf().getAddress());
metris.setMetric(handle.getLoaderMetrics());
resultCount++;
responseList.add(metris);
} catch (NacosException e) {
e.printStackTrace();
}
int resultCount = 0;
for (int i = 0; i < count; i++) {
try {
Future<ServerLoaderMetrics> f = completionService.poll(1000, TimeUnit.MILLISECONDS);
@ -349,7 +351,7 @@ public class ServerLoaderController {
Map<String, Object> responseMap = new HashMap<>(3);
responseMap.put("detail", responseList);
responseMap.put("memberCount", count);
responseMap.put("memberCount", serverMemberManager.allMembers().size());
responseMap.put("metricsCount", resultCount);
int max = 0;

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -58,15 +58,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>nacos-consistency</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>

View File

@ -35,8 +35,6 @@ import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* request auth filter for remote.
@ -62,23 +60,12 @@ public class RemoteRequestAuthFilter extends AbstractRequestFilter {
}
}
private Class getResponseClazz(Class handlerClazz) throws NacosException {
ParameterizedType parameterizedType = (ParameterizedType) handlerClazz.getGenericSuperclass();
try {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
return Class.forName(actualTypeArguments[1].getTypeName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
@Override
public Response filter(Request request, RequestMeta meta, Class handlerClazz) {
Response response = null;
try {
response = (Response) getResponseClazz(handlerClazz).getDeclaredConstructor().newInstance();
response = (Response) super.getResponseClazz(handlerClazz).getDeclaredConstructor().newInstance();
} catch (Exception e) {
Loggers.AUTH.error("auth fail, request: {},exception:{}", request.getClass().getSimpleName(), e);

View File

@ -16,12 +16,15 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* interceptor fo request.
@ -42,12 +45,23 @@ public abstract class AbstractRequestFilter {
requestFilters.registerFilter(this);
}
protected Class getResponseClazz(Class handlerClazz) throws NacosException {
ParameterizedType parameterizedType = (ParameterizedType) handlerClazz.getGenericSuperclass();
try {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
return Class.forName(actualTypeArguments[1].getTypeName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
/**
* filter request.
*
* @param request request.
* @param meta request meta.
* @param handlerClazz request handler clazz.
* @param request request.
* @param meta request meta.
* @param handlerClazz request handler clazz.
* @return response
*/
protected abstract Response filter(Request request, RequestMeta meta, Class handlerClazz);

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.RequestMeta;
@ -141,9 +142,10 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
}
}
// 2.check rule of specific client app limit.
if (connectionLimitRule.getCountLimitPerClientApp().containsKey(connection.getMetaInfo().getAppName())) {
Integer integerApp = connectionLimitRule.getCountLimitPerClientApp()
.get(connection.getMetaInfo().getAppName());
String appName = connection.getMetaInfo().getAppName();
if (StringUtils.isNotBlank(appName) && connectionLimitRule.getCountLimitPerClientApp()
.containsKey(appName)) {
Integer integerApp = connectionLimitRule.getCountLimitPerClientApp().get(appName);
if (integerApp != null && integerApp.intValue() >= 0) {
return currentCount.get() < integerApp.intValue();
}
@ -151,7 +153,7 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
// 3.check rule of default client ip.
int countLimitPerClientIpDefault = connectionLimitRule.getCountLimitPerClientIpDefault();
return countLimitPerClientIpDefault < 0 || currentCount.get() < countLimitPerClientIpDefault;
return countLimitPerClientIpDefault <= 0 || currentCount.get() < countLimitPerClientIpDefault;
}
return true;
@ -239,9 +241,10 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
try {
MetricsMonitor.getLongConnectionMonitor().set(connections.size());
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
int currentSdkClientCount = currentSdkClientCount();
boolean isLoaderClient = loadClient >= 0;
int currentMaxClient = isLoaderClient ? loadClient : maxClient;
int expelCount = currentMaxClient < 0 ? currentMaxClient : entries.size() - currentMaxClient;
int expelCount = currentMaxClient < 0 ? currentMaxClient : currentSdkClientCount - currentMaxClient;
List<String> expelClient = new LinkedList<String>();
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
@ -249,6 +252,7 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
}
}
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
@ -316,18 +320,20 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
Connection connection = getConnection(connectionId);
if (connection != null) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]);
}
try {
connection.request(connectResetRequest, buildMeta());
} catch (ConnectionAlreadyClosedException e) {
unregister(connectionId);
} catch (Exception e) {
Loggers.REMOTE.error("error occurs when expel connection :", connectionId, e);
if (connection.getMetaInfo().isSdkSource()) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]);
}
try {
connection.request(connectResetRequest, buildMeta());
} catch (ConnectionAlreadyClosedException e) {
unregister(connectionId);
} catch (Exception e) {
Loggers.REMOTE.error("error occurs when expel connection :", connectionId, e);
}
}
}
@ -366,6 +372,16 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
return count;
}
/**
* get client count from sdk.
* @return
*/
public int currentSdkClientCount() {
Map<String, String> filter = new HashMap<String, String>(2);
filter.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
return currentClientsCount(filter);
}
public Map<String, Connection> currentClients() {
return connections;
}
@ -380,7 +396,10 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
for (Map.Entry<String, Connection> entry : connections.entrySet()) {
Connection client = entry.getValue();
try {
client.request(new ConnectResetRequest(), buildMeta());
if (client.getMetaInfo().isSdkSource()) {
client.request(new ConnectResetRequest(), buildMeta());
}
} catch (Exception e) {
//Do Nothing.
}
@ -393,7 +412,7 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
* @return over limit or not.
*/
private boolean isOverLimit() {
return maxClient > 0 && this.connections.size() >= maxClient;
return maxClient > 0 && currentSdkClientCount() >= maxClient;
}
public int countLimited() {

View File

@ -52,7 +52,7 @@ public class RpcAckCallbackSynchronizer {
}).build();
/**
* notify ackid.
* notify ack.
*/
public static void ackNotify(String connectionId, Response response) {

View File

@ -0,0 +1,68 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* tps control manager.
*
* @author liuzunfei
* @version $Id: TpsControlManager.java, v 0.1 2021年01月09日 12:38 PM liuzunfei Exp $
*/
@Service
public class TpsControlManager {
private Map<String, TpsControlPoint> points = new ConcurrentHashMap<String, TpsControlPoint>(16);
public TpsControlManager() {
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Set<Map.Entry<String, TpsControlPoint>> entries = points.entrySet();
long currentMillis = System.currentTimeMillis();
long nextSecondMillis = System.currentTimeMillis() + 1000L;
for (Map.Entry<String, TpsControlPoint> entry : entries) {
TpsControlPoint tpsControlPoint = entry.getValue();
TpsRecorder tpsRecorder = tpsControlPoint.tpsRecorder;
tpsRecorder.checkSecond(currentMillis);
tpsRecorder.checkSecond(nextSecondMillis);
Map<String, TpsRecorder> tpsRecordForIp = tpsControlPoint.tpsRecordForIp;
if (tpsRecordForIp != null) {
for (TpsRecorder tpsIp : tpsRecordForIp.values()) {
tpsIp.checkSecond(currentMillis);
tpsIp.checkSecond(nextSecondMillis);
}
}
}
} catch (Throwable throwable) {
//check point error.
}
}
}, 0L, 1L, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* tps control point.
*
* @author liuzunfei
* @version $Id: TpsControlPoint.java, v 0.1 2021年01月09日 12:38 PM liuzunfei Exp $
*/
public class TpsControlPoint {
private String pointName;
TpsRecorder tpsRecorder;
Map<String, TpsRecorder> tpsRecordForIp = new HashMap<String, TpsRecorder>();
/**
* increase tps.
*
* @param clientIp client ip .
* @return check current tps is allowed.
*/
public boolean applyTps(String clientIp) {
//1.check ip tps.
if (tpsRecordForIp.containsKey(clientIp)) {
TpsRecorder tpsRecorder = tpsRecordForIp.get(clientIp);
AtomicLong currentTps = tpsRecorder.getCurrentTps();
long maxTpsOfIp = tpsRecorder.getMaxTps();
if (tpsRecorder.isInterceptMode() && maxTpsOfIp > 0 && currentTps.longValue() >= maxTpsOfIp) {
return false;
}
currentTps.incrementAndGet();
}
//2.check total tps.
long maxTps = tpsRecorder.getMaxTps();
if (tpsRecorder.isInterceptMode() && maxTps > 0 && tpsRecorder.getCurrentTps().longValue() >= maxTps) {
return false;
}
tpsRecorder.getCurrentTps().incrementAndGet();
//3.check pass.
return true;
}
public String getPointName() {
return pointName;
}
public void setPointName(String pointName) {
this.pointName = pointName;
}
protected void refreshRecorder() {
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.core.remote.AbstractRequestFilter;
/**
* tps control point.
*
* @author liuzunfei
* @version $Id: TpsControlRequestFilter.java, v 0.1 2021年01月09日 12:38 PM liuzunfei Exp $
*/
public class TpsControlRequestFilter extends AbstractRequestFilter {
@Override
protected Response filter(Request request, RequestMeta meta, Class handlerClazz) {
return null;
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.atomic.AtomicLong;
/**
* tps record.
*
* @author liuzunfei
* @version $Id: TpsRecorder.java, v 0.1 2021年01月09日 12:38 PM liuzunfei Exp $
*/
public class TpsRecorder {
private long maxTps = -1;
/**
* monitor/intercept.
*/
private String monitorType = "";
/**
* second count.
*/
private Cache<String, AtomicLong> tps = CacheBuilder.newBuilder().maximumSize(10).build();
public AtomicLong getTps(String second) {
AtomicLong atomicLong = tps.getIfPresent(second);
if (atomicLong != null) {
return atomicLong;
}
synchronized (tps) {
if (tps.getIfPresent(second) == null) {
tps.put(second, new AtomicLong());
}
return tps.getIfPresent(second);
}
}
private String second(long timeStamp) {
String timeStampStr = String.valueOf(timeStamp);
return timeStampStr.substring(0, timeStampStr.length() - 3);
}
protected void checkSecond(long timeStamp) {
getTps(second(timeStamp));
}
public AtomicLong getCurrentTps() {
return getTps(second(System.currentTimeMillis()));
}
public long getMaxTps() {
return maxTps;
}
public void setMaxTps(long maxTps) {
this.maxTps = maxTps;
}
public boolean isInterceptMode() {
return "intercept".equals(this.monitorType);
}
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
@ -36,6 +37,8 @@ import io.grpc.stub.StreamObserver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CHANNEL;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_IP;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_PORT;
@ -70,9 +73,14 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
plainRequest.getMetadata().setConnectionId(connectionId);
if (plainRequest.getBody() instanceof ConnectionSetupRequest) {
RequestMeta metadata = plainRequest.getMetadata();
Map<String, String> labels = metadata.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(metadata.getConnectionId(), clientIp,
metadata.getClientPort(), localPort, ConnectionType.GRPC.getType(),
metadata.getClientVersion(), metadata.getAppName(), metadata.getLabels());
metadata.getClientVersion(), appName, metadata.getLabels());
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
@ -90,6 +98,8 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
} else if (plainRequest.getBody() instanceof Response) {
Response response = (Response) plainRequest.getBody();
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
connectionManager.refreshActiveTime(plainRequest.getMetadata().getConnectionId());
}
}

View File

@ -26,7 +26,6 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.common.remote.exception.ConnectionBusyException;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
@ -61,9 +60,6 @@ public class GrpcConnection extends Connection {
try {
//StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {
if (this.isBusy()) {
throw new ConnectionBusyException(this.getMetaInfo().getConnectionId() + ",connection busy.");
}
streamObserver.onNext(GrpcUtils.convert(request, wrapMeta(meta)));
}
} catch (Exception e) {

View File

@ -1,169 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* connection of rsocket.
*
* @author liuzunfei
* @version $Id: RsocketConnection.java, v 0.1 2020年08月06日 11:58 AM liuzunfei Exp $
*/
public class RsocketConnection extends Connection {
RSocket clientSocket;
public RsocketConnection(ConnectionMetaInfo metaInfo, RSocket clientSocket) {
super(metaInfo);
this.clientSocket = clientSocket;
}
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
return promise.completeExceptionally(ex);
}
}, timeouts, MILLISECONDS);
return promise;
}
@Override
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
return request(request, requestMeta, 3000L);
}
@Override
public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException {
try {
Mono<Payload> payloadMono = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, wrapMeta(requestMeta)));
Payload block = payloadMono.block(Duration.ofMillis(timeoutMills));
return RsocketUtils.parseResponseFromPayload(block);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
private RequestMeta wrapMeta(RequestMeta meta) {
if (meta == null) {
meta = new RequestMeta();
}
meta.setConnectionId(getMetaInfo().getConnectionId());
meta.setClientPort(getMetaInfo().getLocalPort());
meta.setClientIp(NetUtils.localIP());
meta.setClientVersion(VersionUtils.getFullClientVersion());
return meta;
}
@Override
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
final Mono<Payload> payloadMono = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, wrapMeta(requestMeta)));
final CompletableFuture<Payload> payloadCompletableFuture = payloadMono.toFuture();
RequestFuture defaultPushFuture = new RequestFuture() {
@Override
public boolean isDone() {
return payloadCompletableFuture.isDone();
}
@Override
public Response get() throws InterruptedException, ExecutionException {
Payload block = payloadCompletableFuture.get();
return RsocketUtils.parseResponseFromPayload(block);
}
@Override
public Response get(long timeoutMills) throws TimeoutException, InterruptedException, ExecutionException {
Payload block = payloadCompletableFuture.get(timeoutMills, TimeUnit.MILLISECONDS);
return RsocketUtils.parseResponseFromPayload(block);
}
};
return defaultPushFuture;
}
@Override
public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
throws NacosException {
try {
Mono<Payload> response = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, wrapMeta(requestMeta)));
response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
}
}).exceptionally(throwable -> {
requestCallBack.onException(throwable);
return null;
});
} catch (Exception e) {
requestCallBack.onException(e);
}
}
@Override
public Map<String, String> getLabels() {
return null;
}
@Override
public void close() {
if (clientSocket != null && !clientSocket.isDisposed()) {
clientSocket.dispose();
}
}
@Override
public boolean isConnected() {
return clientSocket != null && !clientSocket.isDisposed();
}
}

View File

@ -1,262 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.rsocket;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketUtils;
import com.alibaba.nacos.common.utils.ReflectUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.remote.BaseRpcServer;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketServer;
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
import io.rsocket.transport.netty.TcpDuplexConnection;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.RSocketProxy;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.util.UUID;
/**
* rpc server of rsocket.
*
* @author liuzunfei
* @version $Id: RsocketRpcServer.java, v 0.1 2020年08月06日 11:52 AM liuzunfei Exp $
*/
@Service
public class RsocketRpcServer extends BaseRpcServer {
private static final int PORT_OFFSET = 1100;
private RSocketServer rSocketServer;
CloseableChannel closeChannel;
@Autowired
private RequestHandlerRegistry requestHandlerRegistry;
@Autowired
private ConnectionManager connectionManager;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
@Override
public void startServer() throws Exception {
RSocketServer rSocketServerInner = RSocketServer.create();
closeChannel = rSocketServerInner.acceptor(((setup, sendingSocket) -> {
RsocketUtils.PlainRequest plainrequest = null;
try {
plainrequest = RsocketUtils.parsePlainRequestFromPayload(setup);
} catch (Exception e) {
Loggers.REMOTE.error(String
.format("[%s] error to parse new connection request :%s, error message: %s ", "rsocket",
setup.getDataUtf8(), e.getMessage(), e));
}
reactor.netty.Connection privateConnection = getPrivateConnection(sendingSocket);
InetSocketAddress remoteAddress = (InetSocketAddress) privateConnection.channel().remoteAddress();
InetSocketAddress localAddress = (InetSocketAddress) privateConnection.channel().localAddress();
if (plainrequest == null || !(plainrequest.getBody() instanceof ConnectionSetupRequest)) {
Loggers.REMOTE.info(String.format("[%s] invalid connection setup request, request info : %s", "rsocket",
plainrequest.toString()));
sendingSocket.dispose();
return Mono.just(sendingSocket);
} else {
String connectionId = UUID.randomUUID().toString();
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId,
plainrequest.getMetadata().getClientIp(), remoteAddress.getPort(), localAddress.getPort(),
ConnectionType.RSOCKET.getType(), plainrequest.getMetadata().getClientVersion(),
plainrequest.getMetadata().getAppName(), plainrequest.getMetadata().getLabels());
Connection connection = new RsocketConnection(metaInfo, sendingSocket);
if (!ApplicationUtils.isStarted() || !connectionManager
.register(connection.getMetaInfo().getConnectionId(), connection)) {
//Not register to the connection manager if current server is over limit.
try {
connection.request(new ConnectResetRequest(), buildRequestMeta());
connection.close();
} catch (Exception e) {
//Do nothing.
}
return null;
}
fireOnCloseEvent(sendingSocket, connection);
RSocketProxy rSocketProxy = new NacosRsocket(sendingSocket, connectionId,
remoteAddress.getAddress().getHostAddress(), remoteAddress.getPort());
return Mono.just(rSocketProxy);
}
})).bind(TcpServerTransport.create("0.0.0.0", getServicePort())).block();
rSocketServer = rSocketServerInner;
}
private void fireOnCloseEvent(RSocket rSocket, Connection connection) {
rSocket.onClose().subscribe(new Subscriber<Void>() {
String connectionId;
@Override
public void onSubscribe(Subscription subscription) {
connectionId = connection.getMetaInfo().getConnectionId();
}
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
Loggers.REMOTE.error(String
.format("[%s] error on connection, connection id : %s, error message :%s", "rsocket",
connectionId, throwable.getMessage(), throwable));
connectionManager.unregister(connectionId);
}
@Override
public void onComplete() {
Loggers.REMOTE
.info(String.format("[%s] connection finished ,connection id %s", "rsocket", connectionId));
connectionManager.unregister(connectionId);
}
});
}
private reactor.netty.Connection getPrivateConnection(RSocket rSocket) {
try {
DuplexConnection internalDuplexConnection = (DuplexConnection) ReflectUtils
.getFieldValue(rSocket, "connection");
ReassemblyDuplexConnection source = (ReassemblyDuplexConnection) ReflectUtils
.getFieldValue(internalDuplexConnection, "source");
TcpDuplexConnection tcpDuplexConnection = (TcpDuplexConnection) ReflectUtils
.getFieldValue(source, "delegate");
return (reactor.netty.Connection) ReflectUtils.getFieldValue(tcpDuplexConnection, "connection");
} catch (Exception e) {
throw new IllegalStateException("Can't access connection details!", e);
}
}
class NacosRsocket extends RSocketProxy {
String connectionId;
String clientIp;
int clientPort;
public NacosRsocket(RSocket source) {
super(source);
}
public NacosRsocket(RSocket source, String connectionId, String clientIp, int clientPort) {
super(source);
this.connectionId = connectionId;
this.clientIp = clientIp;
this.clientPort = clientPort;
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
RsocketUtils.PlainRequest requestType = RsocketUtils.parsePlainRequestFromPayload(payload);
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(requestType.getType());
if (requestHandler != null) {
RequestMeta requestMeta = requestType.getMetadata();
requestMeta.setConnectionId(connectionId);
requestMeta.setClientIp(clientIp);
requestMeta.setClientPort(clientPort);
try {
Response response = requestHandler.handleRequest(requestType.getBody(), requestMeta);
return Mono.just(RsocketUtils.convertResponseToPayload(response));
} catch (NacosException e) {
Loggers.REMOTE_DIGEST.debug(String
.format("[%s] fail to handle request, error message : %s ", "rsocket", e.getMessage(),
e));
return Mono.just(RsocketUtils
.convertResponseToPayload(new PlainBodyResponse("exception:" + e.getMessage())));
}
}
Loggers.REMOTE_DIGEST.debug(String
.format("[%s] no handler for request type : %s :", "rsocket", requestType.getType()));
return Mono.just(RsocketUtils.convertResponseToPayload(new PlainBodyResponse("No Handler")));
} catch (Exception e) {
Loggers.REMOTE_DIGEST.debug(String
.format("[%s] fail to parse request, error message : %s ", "rsocket", e.getMessage(), e));
return Mono.just(RsocketUtils
.convertResponseToPayload(new PlainBodyResponse("exception:" + e.getMessage())));
}
}
}
@Override
public ConnectionType getConnectionType() {
return ConnectionType.RSOCKET;
}
@Override
public void shutdownServer() {
if (this.closeChannel != null && !closeChannel.isDisposed()) {
this.closeChannel.dispose();
}
}
private RequestMeta buildRequestMeta() {
RequestMeta meta = new RequestMeta();
meta.setClientVersion(VersionUtils.getFullClientVersion());
meta.setClientIp(NetUtils.localIP());
meta.setClientPort(getServicePort());
return meta;
}
}

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -19,7 +19,7 @@
<parent>
<artifactId>nacos-all</artifactId>
<groupId>com.alibaba.nacos</groupId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

106
pom.xml
View File

@ -22,7 +22,7 @@
<inceptionYear>2018</inceptionYear>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Alibaba NACOS ${project.version}</name>
@ -36,7 +36,7 @@
<url>git@github.com:alibaba/nacos.git</url>
<connection>scm:git@github.com:alibaba/nacos.git</connection>
<developerConnection>scm:git@github.com:alibaba/nacos.git</developerConnection>
<tag>nacos-all-2.0.0-ALPHA.1</tag>
<tag>nacos-all-SNAPSHOT</tag>
</scm>
<mailingLists>
@ -993,110 +993,8 @@
<artifactId>proto-google-common-protos</artifactId>
<version>${proto-google-common-protos.version}</version>
</dependency>
<!-- gRPC dependency end -->
<!-- Rsocket -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Rsocket -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-ALPHA.1</version>
<version>2.0.0-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -1,111 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.core;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.util.UUID;
/**
* roskcet tester.
* @author liuzunfei
* @version $Id: RsocketClientTest.java, v 0.1 2020年08月07日 11:05 PM liuzunfei Exp $
*/
public class RsocketClientTest {
@Test
public void testConnection() throws Exception {
String connectId = UUID.randomUUID().toString();
ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest();
Payload setUpPayload = DefaultPayload.create(connectId);
;
System.out.println("setUpPayload" + setUpPayload.getDataUtf8());
RSocket socketClient = RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor() {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
RSocket rsocket = new RSocketProxy(sendingSocket) {
@Override
public Mono<Payload> requestResponse(Payload payload) {
System.out.println("收到服务端推送:" + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Push OK."));
}
};
return Mono.just((RSocket) rsocket);
}
}).connect(TcpClientTransport.create("localhost", 9948)).block();
System.out.println("socketClient:" + socketClient);
for (int i = 0; i < 100; i++) {
Mono<Payload> payloadMono = socketClient.requestResponse(DefaultPayload.create("helloserver:" + i));
Payload block = payloadMono.block();
System.out.println("Server response:" + block.getDataUtf8());
Thread.sleep(2000L);
}
Thread.sleep(7000000L);
}
public static void main(String[] args) throws Exception {
String connectId = UUID.randomUUID().toString();
ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest();
Payload setUpPayload = DefaultPayload.create(connectId);
;
System.out.println("setUpPayload2" + setUpPayload.getDataUtf8());
RSocket socketClient = RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor() {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
RSocket rsocket = new RSocketProxy(sendingSocket) {
@Override
public Mono<Payload> requestResponse(Payload payload) {
System.out.println("收到服务端推送:" + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Push OK."));
}
};
return Mono.just((RSocket) rsocket);
}
}).connect(TcpClientTransport.create("localhost", 9948)).block();
System.out.println("socketClient:" + socketClient);
for (int i = 0; i < 100; i++) {
Mono<Payload> payloadMono = socketClient.requestResponse(DefaultPayload.create("helloserver:" + i));
Payload block = payloadMono.block();
System.out.println("Server response:" + block.getDataUtf8());
Thread.sleep(2000L);
}
Thread.sleep(7000000L);
}
}

1872
tree.txt

File diff suppressed because it is too large Load Diff