Develop fill ut common (#11436)

* Add Unit test for common module remote root package.

* Add Unit test for common module remote exception package.

* Add Unit test for common module remote client package.

* For checkstyle.

* For UT stability.
This commit is contained in:
杨翊 SionYang 2023-11-28 09:24:39 +08:00 committed by GitHub
parent 93cc842c90
commit 2151756dd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 2142 additions and 373 deletions

View File

@ -29,9 +29,9 @@ public enum ConnectionType {
*/
GRPC("GRPC", "Grpc Connection");
String type;
final String type;
String name;
final String name;
public static ConnectionType getByType(String type) {
ConnectionType[] values = ConnectionType.values();
@ -57,15 +57,6 @@ public enum ConnectionType {
return type;
}
/**
* Setter method for property <tt>type</tt>.
*
* @param type value to be assigned to property type
*/
public void setType(String type) {
this.type = type;
}
/**
* Getter method for property <tt>name</tt>.
*
@ -74,13 +65,4 @@ public enum ConnectionType {
public String getName() {
return name;
}
/**
* Setter method for property <tt>name</tt>.
*
* @param name value to be assigned to property name
*/
public void setName(String name) {
this.name = name;
}
}

View File

@ -44,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -83,7 +82,7 @@ public abstract class RpcClient implements Closeable {
private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
protected volatile Connection currentConnection;
private String tenant;
private long lastActiveTimeStamp = System.currentTimeMillis();
@ -101,9 +100,9 @@ public abstract class RpcClient implements Closeable {
private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)");
protected RpcClientConfig rpcClientConfig;
protected final ResourceLoader resourceLoader = new DefaultResourceLoader();
static {
PayloadRegistry.init();
}
@ -117,7 +116,7 @@ public abstract class RpcClient implements Closeable {
this.serverListFactory = serverListFactory;
init();
}
protected void init() {
if (this.serverListFactory != null) {
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
@ -126,10 +125,6 @@ public abstract class RpcClient implements Closeable {
}
}
public Map<String, String> labels() {
return Collections.unmodifiableMap(rpcClientConfig.labels());
}
/**
* init server list factory. only can init once.
*
@ -170,7 +165,7 @@ public abstract class RpcClient implements Closeable {
/**
* Notify when client new connected.
*
* @param connection connection has connected
* @param connection connection has connected
*/
protected void notifyConnected(Connection connection) {
if (connectionEventListeners.isEmpty()) {
@ -277,8 +272,8 @@ public abstract class RpcClient implements Closeable {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal.poll(rpcClientConfig.connectionKeepAlive(),
TimeUnit.MILLISECONDS);
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
@ -296,8 +291,8 @@ public abstract class RpcClient implements Closeable {
break;
}
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(
rpcClientStatus, RpcClientStatus.UNHEALTHY);
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
@ -364,9 +359,10 @@ public abstract class RpcClient implements Closeable {
}
if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect to server [{}] on start up, connectionId = {}", rpcClientConfig.name(),
connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
@ -400,8 +396,8 @@ public abstract class RpcClient implements Closeable {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
ServerInfo serverInfo = resolveServerInfo(
connectResetRequest.getServerIp() + Constants.COLON
+ connectResetRequest.getServerPort());
connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
.getServerPort());
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
@ -418,15 +414,15 @@ public abstract class RpcClient implements Closeable {
}
}
/**.
* invoke after receiving reset request
/**
* . invoke after receiving reset request
*
* @param request request for resetting
*/
protected void afterReset(ConnectResetRequest request) {
// hook for GrpcClient
}
@Override
public void shutdown() throws NacosException {
LOGGER.info("Shutdown rpc client, set status to shutdown");
@ -451,8 +447,8 @@ public abstract class RpcClient implements Closeable {
if (reTryTimes > 1) {
Thread.sleep(random.nextInt(500));
}
Response response = this.currentConnection.request(healthCheckRequest,
rpcClientConfig.healthCheckTimeOut());
Response response = this.currentConnection
.request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut());
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (Exception e) {
@ -508,9 +504,10 @@ public abstract class RpcClient implements Closeable {
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect a server [{}], connectionId = {}", rpcClientConfig.name(),
serverInfo.getAddress(), connectionNew.getConnectionId());
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
rpcClientConfig.name(), serverInfo.getAddress(),
connectionNew.getConnectionId());
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
@ -579,8 +576,9 @@ public abstract class RpcClient implements Closeable {
}
} catch (Exception e) {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}",
rpcClientConfig.name(), e);
LoggerUtils
.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", rpcClientConfig.name(),
e);
}
}
@ -713,8 +711,8 @@ public abstract class RpcClient implements Closeable {
int retryTimes = 0;
Throwable exceptionToThrow = null;
long start = System.currentTimeMillis();
while (retryTimes <= rpcClientConfig.retryTimes()
&& System.currentTimeMillis() < start + callback.getTimeout()) {
while (retryTimes <= rpcClientConfig.retryTimes() && System.currentTimeMillis() < start + callback
.getTimeout()) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
@ -763,8 +761,8 @@ public abstract class RpcClient implements Closeable {
int retryTimes = 0;
long start = System.currentTimeMillis();
Exception exceptionToThrow = null;
while (retryTimes <= rpcClientConfig.retryTimes()
&& System.currentTimeMillis() < start + rpcClientConfig.timeOutMills()) {
while (retryTimes <= rpcClientConfig.retryTimes() && System.currentTimeMillis() < start + rpcClientConfig
.timeOutMills()) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
@ -979,7 +977,7 @@ public abstract class RpcClient implements Closeable {
}
}
public class ConnectionEvent {
public static class ConnectionEvent {
public static final int CONNECTED = 1;
@ -988,7 +986,7 @@ public abstract class RpcClient implements Closeable {
int eventType;
Connection connection;
public ConnectionEvent(int eventType, Connection connection) {
this.eventType = eventType;
this.connection = connection;
@ -1012,7 +1010,7 @@ public abstract class RpcClient implements Closeable {
return rpcClientConfig.labels();
}
class ReconnectContext {
static class ReconnectContext {
public ReconnectContext(ServerInfo serverInfo, boolean onRequestFail) {
this.onRequestFail = onRequestFail;
@ -1031,7 +1029,7 @@ public abstract class RpcClient implements Closeable {
public void setTenant(String tenant) {
this.tenant = tenant;
}
/**
* Return ability of current connection.
*

View File

@ -74,19 +74,18 @@ public class RpcClientFactory {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
return createClient(clientName, connectionType, null, null, labels);
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels,
RpcClientTlsConfig tlsConfig) {
RpcClientTlsConfig tlsConfig) {
return createClient(clientName, connectionType, null, null, labels, tlsConfig);
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer
threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
return createClient(clientName, connectionType, threadPoolCoreSize, threadPoolMaxSize, labels, null);
}
/**
* create a rpc client.
*
@ -98,21 +97,15 @@ public class RpcClientFactory {
* @return rpc client.
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
});
}
@ -124,15 +117,15 @@ public class RpcClientFactory {
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels) {
Map<String, String> labels) {
return createClusterClient(clientName, connectionType, null, null, labels);
}
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
return createClusterClient(clientName, connectionType, null, null, labels, tlsConfig);
}
/**
* create a rpc client.
*
@ -143,29 +136,31 @@ public class RpcClientFactory {
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
return createClusterClient(clientName, connectionType, threadPoolCoreSize, threadPoolMaxSize, labels, null);
}
/**
* createClusterClient.
* @param clientName client name.
* @param connectionType connectionType.
*
* @param clientName client name.
* @param connectionType connectionType.
* @param threadPoolCoreSize coreSize.
* @param threadPoolMaxSize threadPoolSize.
* @param labels tables.
* @param tlsConfig tlsConfig.
* @param threadPoolMaxSize threadPoolSize.
* @param labels tables.
* @param tlsConfig tlsConfig.
* @return
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels,
RpcClientTlsConfig tlsConfig) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
return CLIENT_MAP.computeIfAbsent(clientName,
clientNameInner -> new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize,
labels, tlsConfig));
clientNameInner -> new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels,
tlsConfig));
}
}

View File

@ -18,10 +18,9 @@ package com.alibaba.nacos.common.remote.client;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Target;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
@ -33,64 +32,64 @@ import java.util.Set;
* @author githubcheng2978.
*/
public class RpcConstants {
public static final String NACOS_CLIENT_RPC = "nacos.remote.client.rpc";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_ENABLE = NACOS_CLIENT_RPC + ".tls.enable";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_PROVIDER = NACOS_CLIENT_RPC + ".tls.provider";
@RpcConfigLabel
public static final String RPC_CLIENT_MUTUAL_AUTH = NACOS_CLIENT_RPC + ".tls.mutualAuth";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_PROTOCOLS = NACOS_CLIENT_RPC + ".tls.protocols";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_CIPHERS = NACOS_CLIENT_RPC + ".tls.ciphers";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_CERT_CHAIN_PATH = NACOS_CLIENT_RPC + ".tls.certChainFile";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_CERT_KEY = NACOS_CLIENT_RPC + ".tls.certPrivateKey";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_TRUST_PWD = NACOS_CLIENT_RPC + ".tls.certPrivateKeyPassword";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH = NACOS_CLIENT_RPC + ".tls.trustCollectionChainPath";
public static final String RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH =
NACOS_CLIENT_RPC + ".tls.trustCollectionChainPath";
@RpcConfigLabel
public static final String RPC_CLIENT_TLS_TRUST_ALL = NACOS_CLIENT_RPC + ".tls.trustAll";
private static final Set<String> CONFIG_NAMES = new HashSet<>();
@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
protected @interface RpcConfigLabel {
}
static {
Class clazz = RpcConstants.class;
Field[] declaredFields = clazz.getDeclaredFields();
for (Field declaredField : declaredFields) {
declaredField.setAccessible(true);
if (declaredField.getType().equals(String.class) && null != declaredField.getAnnotation(
RpcConfigLabel.class)) {
if (declaredField.getType().equals(String.class) && null != declaredField
.getAnnotation(RpcConfigLabel.class)) {
try {
CONFIG_NAMES.add((String) declaredField.get(null));
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (IllegalAccessException ignored) {
}
}
}
}
public static Set<String> getRpcParams() {
return Collections.unmodifiableSet(CONFIG_NAMES);
}

View File

@ -251,59 +251,59 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
* @return Builder
*/
public Builder fromProperties(Properties properties) {
if (properties.contains(GrpcConstants.GRPC_NAME)) {
if (properties.containsKey(GrpcConstants.GRPC_NAME)) {
this.name = properties.getProperty(GrpcConstants.GRPC_NAME);
}
if (properties.contains(GrpcConstants.GRPC_RETRY_TIMES)) {
if (properties.containsKey(GrpcConstants.GRPC_RETRY_TIMES)) {
this.retryTimes = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_RETRY_TIMES));
}
if (properties.contains(GrpcConstants.GRPC_TIMEOUT_MILLS)) {
if (properties.containsKey(GrpcConstants.GRPC_TIMEOUT_MILLS)) {
this.timeOutMills = Long.parseLong(properties.getProperty(GrpcConstants.GRPC_TIMEOUT_MILLS));
}
if (properties.contains(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)) {
if (properties.containsKey(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)) {
this.connectionKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)) {
if (properties.containsKey(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)) {
this.threadPoolKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)) {
if (properties.containsKey(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)) {
this.threadPoolCoreSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)) {
if (properties.containsKey(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)) {
this.threadPoolMaxSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)) {
if (properties.containsKey(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)) {
this.serverCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_QUEUESIZE)) {
if (properties.containsKey(GrpcConstants.GRPC_QUEUESIZE)) {
this.threadPoolQueueSize = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_QUEUESIZE));
}
if (properties.contains(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)) {
if (properties.containsKey(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)) {
this.maxInboundMessageSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)) {
if (properties.containsKey(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)) {
this.channelKeepAlive = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) {
if (properties.containsKey(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) {
this.capabilityNegotiationTimeout = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) {
if (properties.containsKey(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES));
}
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)) {
if (properties.containsKey(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)) {
if (properties.containsKey(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)) {
this.channelKeepAliveTimeout = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT));
}
@ -414,8 +414,9 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
return this;
}
public void setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) {
public Builder setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) {
this.capabilityNegotiationTimeout = capabilityNegotiationTimeout;
return this;
}
/**

View File

@ -33,18 +33,18 @@ import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
@ -60,10 +60,10 @@ import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@ -89,12 +89,12 @@ public abstract class GrpcClient extends RpcClient {
* Block to wait setup success response.
*/
private final RecAbilityContext recAbilityContext = new RecAbilityContext(null);
/**
* for receiving server abilities.
*/
private SetupRequestHandler setupRequestHandler;
@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
@ -108,7 +108,7 @@ public abstract class GrpcClient extends RpcClient {
public GrpcClient(String name) {
this(DefaultGrpcClientConfig.newBuilder().setName(name).build());
}
/**
* constructor.
*
@ -140,7 +140,7 @@ public abstract class GrpcClient extends RpcClient {
this.clientConfig = clientConfig;
initSetupHandler();
}
/**
* setup handler.
*/
@ -188,17 +188,17 @@ public abstract class GrpcClient extends RpcClient {
grpcExecutor.shutdown();
}
}
/**
* Create a stub using a channel.
*
* @param managedChannelTemp channel.
* @return if server check success,return a non-null stub.
*/
private RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel managedChannelTemp) {
protected RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel managedChannelTemp) {
return RequestGrpc.newFutureStub(managedChannelTemp);
}
/**
* create a new channel with specific server address.
*
@ -209,15 +209,15 @@ public abstract class GrpcClient extends RpcClient {
private ManagedChannel createNewManagedChannel(String serverIp, int serverPort) {
LOGGER.info("grpc client connection server:{} ip,serverPort:{},grpcTslConfig:{}", serverIp, serverPort,
JacksonUtils.toJson(clientConfig.tlsConfig()));
ManagedChannelBuilder<?> managedChannelBuilder = buildChannel(serverIp, serverPort, buildSslContext()).executor(
grpcExecutor).compressorRegistry(CompressorRegistry.getDefaultInstance())
ManagedChannelBuilder<?> managedChannelBuilder = buildChannel(serverIp, serverPort, buildSslContext())
.executor(grpcExecutor).compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(clientConfig.maxInboundMessageSize())
.keepAliveTime(clientConfig.channelKeepAlive(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(clientConfig.channelKeepAliveTimeout(), TimeUnit.MILLISECONDS);
return managedChannelBuilder.build();
}
/**
* shutdown a channel.
*
@ -237,9 +237,6 @@ public abstract class GrpcClient extends RpcClient {
*/
private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
try {
if (requestBlockingStub == null) {
return null;
}
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
@ -291,8 +288,8 @@ public abstract class GrpcClient extends RpcClient {
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR,
"Handle server request error");
Response errResponse = ErrorResponse
.build(NacosException.CLIENT_ERROR, "Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}
@ -366,7 +363,7 @@ public abstract class GrpcClient extends RpcClient {
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (!(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
@ -376,9 +373,9 @@ public abstract class GrpcClient extends RpcClient {
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
@ -388,10 +385,10 @@ public abstract class GrpcClient extends RpcClient {
// promise null if no abilities receive
grpcConn.setAbilityTable(null);
}
//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
@ -401,8 +398,8 @@ public abstract class GrpcClient extends RpcClient {
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest
.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
@ -427,14 +424,14 @@ public abstract class GrpcClient extends RpcClient {
}
return null;
}
/**
* ability mode: sdk client or cluster client.
*
* @return mode
*/
protected abstract AbilityMode abilityMode();
@Override
protected void afterReset(ConnectResetRequest request) {
recAbilityContext.release(null);
@ -454,14 +451,14 @@ public abstract class GrpcClient extends RpcClient {
* way to block client.
*/
private volatile CountDownLatch blocker;
private volatile boolean needToSync = false;
public RecAbilityContext(Connection connection) {
this.connection = connection;
this.blocker = new CountDownLatch(1);
}
/**
* whether to sync for ability table.
*
@ -470,7 +467,7 @@ public abstract class GrpcClient extends RpcClient {
public boolean isNeedToSync() {
return this.needToSync;
}
/**
* reset with new connection which is waiting for ability table.
*
@ -481,7 +478,7 @@ public abstract class GrpcClient extends RpcClient {
this.blocker = new CountDownLatch(1);
this.needToSync = true;
}
/**
* notify sync by abilities.
*
@ -503,7 +500,7 @@ public abstract class GrpcClient extends RpcClient {
* await for abilities.
*
* @param timeout timeout.
* @param unit unit.
* @param unit unit.
* @throws InterruptedException by blocker.
*/
public void await(long timeout, TimeUnit unit) throws InterruptedException {
@ -512,16 +509,17 @@ public abstract class GrpcClient extends RpcClient {
}
this.needToSync = false;
}
/**
* check whether receive abilities.
*
* @param connection conn.
* @return whether receive abilities.
* @param connection conn.
* @return whether receive abilities.
*/
public boolean check(Connection connection) {
if (!connection.isAbilitiesSet()) {
LOGGER.error("Client don't receive server abilities table even empty table but server supports ability negotiation."
LOGGER.error(
"Client don't receive server abilities table even empty table but server supports ability negotiation."
+ " You can check if it is need to adjust the timeout of ability negotiation by property: {}"
+ " if always fail to connect.",
GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT);
@ -532,26 +530,26 @@ public abstract class GrpcClient extends RpcClient {
return true;
}
}
/**
* Setup response handler.
*/
class SetupRequestHandler implements ServerRequestHandler {
private final RecAbilityContext abilityContext;
public SetupRequestHandler(RecAbilityContext abilityContext) {
this.abilityContext = abilityContext;
}
@Override
public Response requestReply(Request request, Connection connection) {
// if finish setup
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
recAbilityContext.release(Optional.ofNullable(setupAckRequest.getAbilityTable())
.orElse(new HashMap<>(0)));
recAbilityContext
.release(Optional.ofNullable(setupAckRequest.getAbilityTable()).orElse(new HashMap<>(0)));
return new SetupAckResponse();
}
return null;
@ -562,14 +560,14 @@ public abstract class GrpcClient extends RpcClient {
if (sslContext.isPresent()) {
return NettyChannelBuilder.forAddress(serverIp, port).negotiationType(NegotiationType.TLS)
.sslContext(sslContext.get());
} else {
return ManagedChannelBuilder.forAddress(serverIp, port).usePlaintext();
}
}
private Optional<SslContext> buildSslContext() {
RpcClientTlsConfig tlsConfig = clientConfig.tlsConfig();
if (!tlsConfig.getEnableTls()) {
return Optional.empty();
@ -579,7 +577,7 @@ public abstract class GrpcClient extends RpcClient {
if (StringUtils.isNotBlank(tlsConfig.getSslProvider())) {
builder.sslProvider(TlsTypeResolve.getSslProvider(tlsConfig.getSslProvider()));
}
if (StringUtils.isNotBlank(tlsConfig.getProtocols())) {
builder.protocols(tlsConfig.getProtocols().split(","));
}
@ -595,10 +593,10 @@ public abstract class GrpcClient extends RpcClient {
Resource resource = resourceLoader.getResource(tlsConfig.getTrustCollectionCertFile());
builder.trustManager(resource.getInputStream());
}
if (tlsConfig.getMutualAuthEnable()) {
if (StringUtils.isBlank(tlsConfig.getCertChainFile()) || StringUtils.isBlank(
tlsConfig.getCertPrivateKey())) {
if (StringUtils.isBlank(tlsConfig.getCertChainFile()) || StringUtils
.isBlank(tlsConfig.getCertPrivateKey())) {
throw new IllegalArgumentException("client certChainFile or certPrivateKey must be not null");
}
Resource certChainFile = resourceLoader.getResource(tlsConfig.getCertChainFile());

View File

@ -172,16 +172,14 @@ public class GrpcConnection extends Connection {
if (this.payloadStreamObserver != null) {
try {
payloadStreamObserver.onCompleted();
} catch (Throwable throwable) {
//ignore.
} catch (Throwable ignored) {
}
}
if (this.channel != null && !channel.isShutdown()) {
try {
this.channel.shutdownNow();
} catch (Throwable throwable) {
//ignore.
} catch (Throwable ignored) {
}
}
}

View File

@ -100,8 +100,7 @@ public class GrpcConstants {
GRpcConfigLabel.class)) {
try {
CONFIG_NAMES.add((String) declaredField.get(null));
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (IllegalAccessException ignored) {
}
}
}

View File

@ -62,9 +62,7 @@ public class GrpcUtils {
// request body .
byte[] jsonBytes = convertRequestToByte(request);
return payloadBuilder
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
.build();
return payloadBuilder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).build();
}
@ -83,8 +81,7 @@ public class GrpcUtils {
Payload.Builder builder = Payload.newBuilder();
return builder
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
return builder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
.setMetadata(newMeta).build();
}
@ -99,8 +96,7 @@ public class GrpcUtils {
byte[] jsonBytes = JacksonUtils.toJsonBytes(response);
Metadata.Builder metaBuilder = Metadata.newBuilder().setType(response.getClass().getSimpleName());
return Payload.newBuilder()
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
return Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
.setMetadata(metaBuilder.build()).build();
}
@ -132,50 +128,5 @@ public class GrpcUtils {
throw new RemoteException(NacosException.SERVER_ERROR,
"Unknown payload type:" + payload.getMetadata().getType());
}
}
public static class PlainRequest {
String type;
Object body;
/**
* Getter method for property <tt>type</tt>.
*
* @return property value of type
*/
public String getType() {
return type;
}
/**
* Setter method for property <tt>type</tt>.
*
* @param type value to be assigned to property type
*/
public void setType(String type) {
this.type = type;
}
/**
* Getter method for property <tt>body</tt>.
*
* @return property value of body
*/
public Object getBody() {
return body;
}
/**
* Setter method for property <tt>body</tt>.
*
* @param body value to be assigned to property body
*/
public void setBody(Object body) {
this.body = body;
}
}
}

View File

@ -27,7 +27,7 @@ public class ConnectionAlreadyClosedException extends RemoteException {
private static final int CONNECTION_ALREADY_CLOSED = 600;
public ConnectionAlreadyClosedException(String msg) {
super(CONNECTION_ALREADY_CLOSED);
super(CONNECTION_ALREADY_CLOSED, msg);
}
public ConnectionAlreadyClosedException() {

View File

@ -0,0 +1,40 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class ConnectionTypeTest {
@Test
public void testGetByType() {
ConnectionType connectionType = ConnectionType.getByType("GRPC");
assertNotNull(connectionType);
assertEquals("GRPC", connectionType.getType());
assertEquals("Grpc Connection", connectionType.getName());
}
@Test
public void testGetByNonExistType() {
ConnectionType connectionType = ConnectionType.getByType("HTTP");
assertNull(connectionType);
}
}

View File

@ -16,24 +16,28 @@
package com.alibaba.nacos.common.remote;
import junit.framework.TestCase;
import org.junit.Assert;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import org.junit.BeforeClass;
import org.junit.Test;
public class PayloadRegistryTest extends TestCase {
import static org.junit.Assert.assertNull;
public class PayloadRegistryTest {
public void setUp() throws Exception {
super.setUp();
}
public void tearDown() throws Exception {
@BeforeClass
public static void setUpBefore() {
PayloadRegistry.init();
}
@Test
public void testInit() {
PayloadRegistry.init();
Assert.assertNotNull(PayloadRegistry.getClassByType("NotifySubscriberResponse"));
Assert.assertNotNull(PayloadRegistry.getClassByType("InstanceRequest"));
public void testRegisterInvalidClass() {
PayloadRegistry.register("test", Request.class);
assertNull(PayloadRegistry.getClassByType("test"));
}
@Test(expected = RuntimeException.class)
public void testRegisterDuplicated() {
PayloadRegistry.register("ErrorResponse", ErrorResponse.class);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TlsConfigTest {
@Test
public void testTlsConfig() {
TlsConfig tlsConfig = new TlsConfig();
assertFalse(tlsConfig.getEnableTls());
assertFalse(tlsConfig.getMutualAuthEnable());
assertNull(tlsConfig.getProtocols());
assertNull(tlsConfig.getCiphers());
assertFalse(tlsConfig.getTrustAll());
assertNull(tlsConfig.getTrustCollectionCertFile());
assertNull(tlsConfig.getCertPrivateKeyPassword());
assertNull(tlsConfig.getCertPrivateKey());
assertNull(tlsConfig.getCertChainFile());
assertEquals("", tlsConfig.getSslProvider());
// Set values
tlsConfig.setEnableTls(true);
tlsConfig.setMutualAuthEnable(true);
tlsConfig.setProtocols("TLSv1.1,TLSv1.2,TLSv1.3");
tlsConfig.setCiphers("cipher1,cipher2");
tlsConfig.setTrustAll(true);
tlsConfig.setTrustCollectionCertFile("certFile");
tlsConfig.setCertPrivateKeyPassword("password");
tlsConfig.setCertPrivateKey("privateKey");
tlsConfig.setCertChainFile("chainFile");
tlsConfig.setSslProvider("OPENSSL");
// Test values
assertTrue(tlsConfig.getEnableTls());
assertTrue(tlsConfig.getMutualAuthEnable());
assertEquals("TLSv1.1,TLSv1.2,TLSv1.3", tlsConfig.getProtocols());
assertEquals("cipher1,cipher2", tlsConfig.getCiphers());
assertTrue(tlsConfig.getTrustAll());
assertEquals("certFile", tlsConfig.getTrustCollectionCertFile());
assertEquals("password", tlsConfig.getCertPrivateKeyPassword());
assertEquals("privateKey", tlsConfig.getCertPrivateKey());
assertEquals("chainFile", tlsConfig.getCertChainFile());
assertEquals("OPENSSL", tlsConfig.getSslProvider());
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ConnectionTest {
Connection connection;
@Before
public void setUp() throws Exception {
connection = new Connection(new RpcClient.ServerInfo("127.0.0.1", 8848)) {
@Override
public Response request(Request request, long timeoutMills) throws NacosException {
return null;
}
@Override
public RequestFuture requestFuture(Request request) throws NacosException {
return null;
}
@Override
public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
}
@Override
public void close() {
}
};
}
@After
public void tearDown() throws Exception {
connection.close();
}
@Test
public void testSetConnectionId() {
assertNull(connection.getConnectionId());
connection.setConnectionId("testConnectionId");
assertEquals("testConnectionId", connection.getConnectionId());
}
@Test
public void testGetConnectionAbility() {
assertFalse(connection.isAbilitiesSet());
assertEquals(AbilityStatus.UNKNOWN, connection.getConnectionAbility(AbilityKey.SDK_CLIENT_TEST_1));
connection.setAbilityTable(Collections.singletonMap(AbilityKey.SERVER_TEST_2.getName(), true));
assertTrue(connection.isAbilitiesSet());
assertEquals(AbilityStatus.UNKNOWN, connection.getConnectionAbility(AbilityKey.SDK_CLIENT_TEST_1));
assertEquals(AbilityStatus.SUPPORTED, connection.getConnectionAbility(AbilityKey.SERVER_TEST_2));
connection.setAbilityTable(Collections.singletonMap(AbilityKey.SERVER_TEST_2.getName(), false));
assertEquals(AbilityStatus.NOT_SUPPORTED, connection.getConnectionAbility(AbilityKey.SERVER_TEST_2));
}
@Test
public void testSetAbandon() {
assertFalse(connection.isAbandon());
connection.setAbandon(true);
assertTrue(connection.isAbandon());
}
}

View File

@ -16,10 +16,19 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.ClientDetectionRequest;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.ClientDetectionResponse;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.HealthCheckResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.DefaultGrpcClientConfig;
import com.alibaba.nacos.common.remote.client.grpc.GrpcConnection;
@ -38,17 +47,28 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -159,7 +179,7 @@ public class RpcClientTest {
}
@After
public void tearDown() throws IllegalAccessException {
public void tearDown() throws IllegalAccessException, NacosException {
rpcClientConfig.labels().clear();
rpcClient.rpcClientStatus.set(RpcClientStatus.WAIT_INIT);
serverListFactoryField.set(rpcClient, null);
@ -167,6 +187,7 @@ public class RpcClientTest {
rpcClient.currentConnection = null;
System.clearProperty("nacos.server.port");
rpcClient.eventLinkedBlockingQueue.clear();
rpcClient.shutdown();
}
@Test
@ -263,6 +284,7 @@ public class RpcClientTest {
map.put("labelKey2", "labelValue2");
when(rpcClientConfig.labels()).thenReturn(map);
assertEquals(1, rpcClient.getLabels().size());
assertEquals("test", rpcClient.getName());
}
@Test
@ -319,14 +341,17 @@ public class RpcClientTest {
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "10.10.10.10::8848")).getAddress());
assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "10.10.10.10:8848")).getAddress());
assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient,
"http://10.10.10.10:8848")).getAddress());
assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient,
"http://10.10.10.10::8848")).getAddress());
assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10:8848"))
.getAddress());
assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10::8848"))
.getAddress());
assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10")).getAddress());
assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient,
"https://10.10.10.10::8848")).getAddress());
assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "https://10.10.10.10::8848"))
.getAddress());
}
@Test
@ -336,11 +361,30 @@ public class RpcClientTest {
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10")).getAddress());
}
@Test
public void testRequestSuccess() throws NacosException, NoSuchFieldException, IllegalAccessException {
rpcClient.currentConnection = connection;
rpcClient.rpcClientStatus.set(RpcClientStatus.RUNNING);
when(connection.request(any(), anyLong())).thenReturn(new HealthCheckResponse());
Field lastActiveTimeStampField = RpcClient.class.getDeclaredField("lastActiveTimeStamp");
lastActiveTimeStampField.setAccessible(true);
final long lastActiveTimeStamp = (long) lastActiveTimeStampField.get(rpcClient);
Response response = rpcClient.request(new HealthCheckRequest());
assertTrue(response instanceof HealthCheckResponse);
assertTrue(lastActiveTimeStamp <= (long) lastActiveTimeStampField.get(rpcClient));
}
@Test(expected = NacosException.class)
public void testRequestWithoutAnyTry() throws NacosException {
when(rpcClientConfig.retryTimes()).thenReturn(-1);
rpcClient.request(null);
}
@Test(expected = NacosException.class)
public void testRequestWhenClientAlreadyShutDownThenThrowException() throws NacosException {
rpcClient.rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
rpcClient.currentConnection = connection;
rpcClient.request(null, 10000);
rpcClient.request(null);
}
@Test(expected = NacosException.class)
@ -348,7 +392,6 @@ public class RpcClientTest {
rpcClient.rpcClientStatus.set(RpcClientStatus.RUNNING);
rpcClient.currentConnection = connection;
doReturn(null).when(connection).request(any(), anyLong());
rpcClient.request(null, 10000);
}
@ -357,7 +400,6 @@ public class RpcClientTest {
rpcClient.rpcClientStatus.set(RpcClientStatus.RUNNING);
rpcClient.currentConnection = connection;
doReturn(new ErrorResponse()).when(connection).request(any(), anyLong());
rpcClient.request(null, 10000);
}
@ -381,6 +423,22 @@ public class RpcClientTest {
Assert.assertNotNull(exception);
}
@Test
public void testAsyncRequestSuccess() throws NacosException {
rpcClient.currentConnection = connection;
rpcClient.rpcClientStatus.set(RpcClientStatus.RUNNING);
RequestCallBack<?> requestCallBack = mock(RequestCallBack.class);
when(requestCallBack.getTimeout()).thenReturn(1000L);
rpcClient.asyncRequest(null, requestCallBack);
verify(connection).asyncRequest(any(), any());
}
@Test(expected = NacosException.class)
public void testAsyncRequestWithoutAnyTry() throws NacosException {
when(rpcClientConfig.retryTimes()).thenReturn(-1);
rpcClient.asyncRequest(null, null);
}
@Test(expected = NacosException.class)
public void testAsyncRequestWhenClientAlreadyShutDownThenThrowException() throws NacosException {
rpcClient.rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
@ -411,6 +469,12 @@ public class RpcClientTest {
assertEquals(RpcClientStatus.UNHEALTHY, rpcClient.rpcClientStatus.get());
}
@Test(expected = NacosException.class)
public void testRequestFutureWithoutAnyTry() throws NacosException {
when(rpcClientConfig.retryTimes()).thenReturn(-1);
rpcClient.requestFuture(null);
}
@Test(expected = NacosException.class)
public void testRequestFutureWhenClientAlreadyShutDownThenThrowException() throws NacosException {
rpcClient.rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
@ -419,8 +483,7 @@ public class RpcClientTest {
}
@Test
public void testRequestFutureWhenRetryReachMaxRetryTimesThenSwitchServer()
throws NacosException, IllegalAccessException {
public void testRequestFutureWhenRetryReachMaxRetryTimesThenSwitchServer() throws NacosException {
when(rpcClientConfig.timeOutMills()).thenReturn(5000L);
when(rpcClientConfig.retryTimes()).thenReturn(3);
rpcClient.rpcClientStatus.set(RpcClientStatus.RUNNING);
@ -489,12 +552,13 @@ public class RpcClientTest {
}
@Override
public Connection connectToServer(ServerInfo serverInfo) throws Exception {
public Connection connectToServer(ServerInfo serverInfo) {
return null;
}
};
rpcClient.shutdown();
assertTrue(rpcClient.isShutdown());
}
@Test
@ -622,7 +686,7 @@ public class RpcClientTest {
}
@Override
public Connection connectToServer(ServerInfo serverInfo) throws Exception {
public Connection connectToServer(ServerInfo serverInfo) {
return null;
}
@ -647,4 +711,435 @@ public class RpcClientTest {
});
rpcClient.handleServerRequest(request);
}
@Test
public void testNotifyDisConnectedForEmpty() {
rpcClient.notifyDisConnected(null);
verify(rpcClientConfig, never()).name();
}
@Test
public void testNotifyDisConnected() {
ConnectionEventListener listener = mock(ConnectionEventListener.class);
rpcClient.registerConnectionListener(listener);
rpcClient.notifyDisConnected(null);
verify(listener).onDisConnect(null);
verify(rpcClientConfig, times(2)).name();
}
@Test
public void testNotifyDisConnectedException() {
ConnectionEventListener listener = mock(ConnectionEventListener.class);
rpcClient.registerConnectionListener(listener);
doThrow(new RuntimeException("test")).when(listener).onDisConnect(null);
rpcClient.notifyDisConnected(null);
verify(rpcClientConfig, times(3)).name();
}
@Test
public void testNotifyConnectedForEmpty() {
rpcClient.notifyConnected(null);
verify(rpcClientConfig, never()).name();
}
@Test
public void testNotifyConnected() {
ConnectionEventListener listener = mock(ConnectionEventListener.class);
rpcClient.registerConnectionListener(listener);
rpcClient.notifyConnected(null);
verify(listener).onConnected(null);
verify(rpcClientConfig, times(2)).name();
}
@Test
public void testNotifyConnectedException() {
ConnectionEventListener listener = mock(ConnectionEventListener.class);
rpcClient.registerConnectionListener(listener);
doThrow(new RuntimeException("test")).when(listener).onConnected(null);
rpcClient.notifyConnected(null);
verify(rpcClientConfig, times(3)).name();
}
@Test
public void testStartClient() throws NacosException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(new Function<RpcClient.ServerInfo, Connection>() {
private int count;
@Override
public Connection apply(RpcClient.ServerInfo serverInfo) {
if (count == 0) {
count++;
throw new RuntimeException("test");
}
return connection;
}
});
try {
rpcClient.start();
assertTrue(rpcClient.isRunning());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testStartClientWithFailed() throws NacosException, InterruptedException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
try {
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(1000);
assertFalse(rpcClient.isRunning());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testStartClientAfterShutdown() throws NacosException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
rpcClient.shutdown();
rpcClient.start();
assertTrue(rpcClient.isShutdown());
}
@Test
public void testDisConnectionEventAfterStart() throws NacosException, InterruptedException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> connection);
ConnectionEventListener listener = mock(ConnectionEventListener.class);
rpcClient.registerConnectionListener(listener);
try {
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(100);
rpcClient.eventLinkedBlockingQueue.put(new RpcClient.ConnectionEvent(0, connection));
TimeUnit.MILLISECONDS.sleep(100);
verify(listener).onDisConnect(connection);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithNullConnection() throws NacosException, InterruptedException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
try {
when(rpcClientConfig.connectionKeepAlive()).thenReturn(-1L);
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(100);
verify(rpcClientConfig, never()).healthCheckRetryTimes();
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithConnectionHealthCheckFail()
throws NacosException, InterruptedException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(new Function<RpcClient.ServerInfo, Connection>() {
private int count;
@Override
public Connection apply(RpcClient.ServerInfo serverInfo) {
if (count == 0) {
count++;
return connection;
}
return null;
}
});
try {
when(rpcClientConfig.connectionKeepAlive()).thenReturn(10L);
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(500);
assertEquals(RpcClientStatus.UNHEALTHY, rpcClient.rpcClientStatus.get());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithConnectionHealthCheckSuccess()
throws NacosException, InterruptedException, NoSuchFieldException, IllegalAccessException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> connection);
when(connection.request(any(Request.class), anyLong())).thenReturn(new HealthCheckResponse());
try {
Field lastActiveTimeStampField = RpcClient.class.getDeclaredField("lastActiveTimeStamp");
lastActiveTimeStampField.setAccessible(true);
final long lastActiveTimeStamp = (long) lastActiveTimeStampField.get(rpcClient);
when(rpcClientConfig.connectionKeepAlive()).thenReturn(10L);
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(RpcClientStatus.RUNNING, rpcClient.rpcClientStatus.get());
long newLastActiveTimeStamp = (long) lastActiveTimeStampField.get(rpcClient);
assertTrue(newLastActiveTimeStamp > lastActiveTimeStamp);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithActiveTimeIsNew()
throws NacosException, InterruptedException, NoSuchFieldException, IllegalAccessException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> connection);
try {
Field lastActiveTimeStampField = RpcClient.class.getDeclaredField("lastActiveTimeStamp");
lastActiveTimeStampField.setAccessible(true);
long setTime = System.currentTimeMillis() + 10000;
lastActiveTimeStampField.set(rpcClient, setTime);
when(rpcClientConfig.connectionKeepAlive()).thenReturn(10L);
rpcClient.start();
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(RpcClientStatus.RUNNING, rpcClient.rpcClientStatus.get());
long newLastActiveTimeStamp = (long) lastActiveTimeStampField.get(rpcClient);
assertEquals(setTime, newLastActiveTimeStamp);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithOldServiceInfo()
throws NacosException, InterruptedException, IllegalAccessException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
when(serverListFactory.getServerList()).thenReturn(Collections.singletonList("127.0.0.1:8848"));
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> connection);
try {
rpcClient.start();
RpcClient.ReconnectContext reconnectContext = new RpcClient.ReconnectContext(
new RpcClient.ServerInfo("127.0.0.1", 0), false);
((BlockingQueue<RpcClient.ReconnectContext>) reconnectionSignalField.get(rpcClient)).put(reconnectContext);
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(RpcClientStatus.RUNNING, rpcClient.rpcClientStatus.get());
assertEquals(8848, reconnectContext.serverInfo.serverPort);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectContextAfterStartWithNewServiceInfo()
throws NacosException, InterruptedException, IllegalAccessException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
when(serverListFactory.getServerList()).thenReturn(Collections.singletonList("1.1.1.1:8848"));
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> connection);
try {
rpcClient.start();
RpcClient.ReconnectContext reconnectContext = new RpcClient.ReconnectContext(
new RpcClient.ServerInfo("127.0.0.1", 0), false);
((BlockingQueue<RpcClient.ReconnectContext>) reconnectionSignalField.get(rpcClient)).put(reconnectContext);
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(RpcClientStatus.RUNNING, rpcClient.rpcClientStatus.get());
assertNull(reconnectContext.serverInfo);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testHandleConnectionResetRequestWithoutServer() throws NacosException, InterruptedException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848", "1.1.1.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> {
connection.serverInfo = serverInfo;
return connection;
});
try {
rpcClient.start();
Response response = rpcClient.handleServerRequest(new ConnectResetRequest());
assertTrue(response instanceof ConnectResetResponse);
TimeUnit.MILLISECONDS.sleep(500);
assertEquals("1.1.1.1", connection.serverInfo.getServerIp());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testHandleConnectionResetRequestWithServer() throws NacosException, InterruptedException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848", "1.1.1.1:8848");
List<String> serverList = new LinkedList<>();
serverList.add("127.0.0.1:8848");
serverList.add("1.1.1.1:8848");
serverList.add("2.2.2.2:8848");
when(serverListFactory.getServerList()).thenReturn(serverList);
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> {
connection.serverInfo = serverInfo;
return connection;
});
try {
rpcClient.start();
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp("2.2.2.2");
connectResetRequest.setServerPort("8848");
Response response = rpcClient.handleServerRequest(connectResetRequest);
assertTrue(response instanceof ConnectResetResponse);
TimeUnit.MILLISECONDS.sleep(500);
assertEquals("2.2.2.2", connection.serverInfo.getServerIp());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testHandleConnectionResetRequestWithException() throws NacosException, InterruptedException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848", "1.1.1.1:8848");
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
RpcClient rpcClient = buildTestStartClient(serverInfo -> {
connection.serverInfo = serverInfo;
return connection;
});
try {
rpcClient.start();
System.setProperty("nacos.server.port", "2.2.2.2");
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp("2.2.2.2");
Response response = rpcClient.handleServerRequest(connectResetRequest);
assertTrue(response instanceof ConnectResetResponse);
TimeUnit.MILLISECONDS.sleep(500);
assertEquals("127.0.0.1", connection.serverInfo.getServerIp());
} finally {
rpcClient.shutdown();
}
}
@Test
public void testHandleClientDetectionRequest() throws NacosException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
try {
rpcClient.start();
Response response = rpcClient.handleServerRequest(new ClientDetectionRequest());
assertTrue(response instanceof ClientDetectionResponse);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testHandleOtherRequest() throws NacosException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
try {
rpcClient.start();
Response response = rpcClient.handleServerRequest(new HealthCheckRequest());
assertNull(response);
} finally {
rpcClient.shutdown();
}
}
@Test
public void testReconnectForRequestFailButHealthCheckOK() throws NacosException {
RpcClient rpcClient = buildTestStartClient(serverInfo -> null);
rpcClient.currentConnection = connection;
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
when(connection.request(any(Request.class), anyLong())).thenReturn(new HealthCheckResponse());
rpcClient.reconnect(null, true);
assertTrue(rpcClient.isRunning());
}
@Test
public void testReconnectFailTimes() throws NacosException {
when(serverListFactory.genNextServer()).thenReturn("127.0.0.1:8848");
when(serverListFactory.getServerList()).thenReturn(Collections.singletonList("127.0.0.1:8848"));
final AtomicInteger count = new AtomicInteger(0);
RpcClient rpcClient = buildTestStartClient(serverInfo -> {
int actual = count.incrementAndGet();
return actual > 3 ? connection : null;
});
rpcClient.currentConnection = connection;
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
long start = System.currentTimeMillis();
rpcClient.reconnect(null, false);
assertTrue(rpcClient.isRunning());
assertTrue(System.currentTimeMillis() - start > 400);
}
@Test
public void testGetCurrentServer() {
assertNull(rpcClient.getCurrentServer());
rpcClient.currentConnection = connection;
rpcClient.serverListFactory(serverListFactory);
connection.serverInfo = new RpcClient.ServerInfo("127.0.0.1", 8848);
assertNotNull(rpcClient.getCurrentServer());
}
@Test
public void testCurrentRpcServer() throws IllegalAccessException {
when(serverListFactory.getCurrentServer()).thenReturn("127.0.0.1:8848");
serverListFactoryField.set(rpcClient, serverListFactory);
RpcClient.ServerInfo serverInfo = rpcClient.currentRpcServer();
assertEquals("127.0.0.1", serverInfo.getServerIp());
assertEquals(8848, serverInfo.getServerPort());
assertEquals("127.0.0.1:8848", serverInfo.getAddress());
}
private RpcClient buildTestStartClient(Function<RpcClient.ServerInfo, Connection> function) {
return new RpcClient(rpcClientConfig, serverListFactory) {
@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
}
@Override
public int rpcPortOffset() {
return 0;
}
@Override
public Connection connectToServer(ServerInfo serverInfo) {
return function.apply(serverInfo);
}
};
}
@Test
public void testServerInfoSet() {
RpcClient.ServerInfo serverInfo = new RpcClient.ServerInfo();
String ip = "127.0.0.1";
int port = 80;
serverInfo.setServerIp(ip);
serverInfo.setServerPort(port);
assertEquals("127.0.0.1:80", serverInfo.getAddress());
assertEquals(port, serverInfo.getServerPort());
assertEquals(ip, serverInfo.getServerIp());
String expected = "{serverIp = '127.0.0.1', server main port = 80}";
assertEquals(expected, serverInfo.toString());
}
@Test
public void testSetTenant() {
String tenant = "testTenant";
assertNull(rpcClient.getTenant());
rpcClient.setTenant(tenant);
assertEquals(tenant, rpcClient.getTenant());
}
@Test
public void testGetConnectionAbilityWithNullConnection() {
AbilityStatus abilityStatus = rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1);
assertNull(abilityStatus);
}
@Test
public void testGetConnectionAbilityWithReadyConnection() {
when(connection.getConnectionAbility(AbilityKey.SERVER_TEST_1)).thenReturn(AbilityStatus.SUPPORTED);
rpcClient.currentConnection = connection;
AbilityStatus abilityStatus = rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1);
assertNotNull(abilityStatus);
assertEquals(AbilityStatus.SUPPORTED, abilityStatus);
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client;
import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class RpcClientTlsConfigTest {
@Test
public void testEnableTls() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_ENABLE, "true");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertTrue(tlsConfig.getEnableTls());
}
@Test
public void testSslProvider() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_PROVIDER, "provider");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("provider", tlsConfig.getSslProvider());
}
@Test
public void testMutualAuthEnable() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_MUTUAL_AUTH, "true");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertTrue(tlsConfig.getMutualAuthEnable());
}
@Test
public void testProtocols() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_PROTOCOLS, "protocols");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("protocols", tlsConfig.getProtocols());
}
@Test
public void testCiphers() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_CIPHERS, "ciphers");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("ciphers", tlsConfig.getCiphers());
}
@Test
public void testTrustCollectionCertFile() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH, "trustCollectionCertFile");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("trustCollectionCertFile", tlsConfig.getTrustCollectionCertFile());
}
@Test
public void testCertChainFile() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_CERT_CHAIN_PATH, "certChainFile");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("certChainFile", tlsConfig.getCertChainFile());
}
@Test
public void testCertPrivateKey() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_CERT_KEY, "certPrivateKey");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("certPrivateKey", tlsConfig.getCertPrivateKey());
}
@Test
public void testTrustAll() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_ALL, "true");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertTrue(tlsConfig.getTrustAll());
}
@Test
public void testCertPrivateKeyPassword() {
Properties properties = new Properties();
properties.setProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_PWD, "trustPwd");
RpcClientTlsConfig tlsConfig = RpcClientTlsConfig.properties(properties);
assertEquals("trustPwd", tlsConfig.getCertPrivateKeyPassword());
}
}

View File

@ -0,0 +1,273 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class DefaultGrpcClientConfigTest {
@Before
public void setUp() throws Exception {
System.setProperty("nacos.common.processors", "2");
}
@After
public void tearDown() throws Exception {
System.clearProperty("nacos.common.processors");
}
@Test
public void testDefault() {
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) DefaultGrpcClientConfig.newBuilder().build();
assertNull(config.name());
assertEquals(3, config.retryTimes());
assertEquals(3000L, config.timeOutMills());
assertEquals(5000L, config.connectionKeepAlive());
assertEquals(10000L, config.threadPoolKeepAlive());
assertEquals(4, config.threadPoolCoreSize());
assertEquals(16, config.threadPoolMaxSize());
assertEquals(3000L, config.serverCheckTimeOut());
assertEquals(10000, config.threadPoolQueueSize());
assertEquals(10 * 1024 * 1024, config.maxInboundMessageSize());
assertEquals(6 * 60 * 1000, config.channelKeepAlive());
assertEquals(TimeUnit.SECONDS.toMillis(20L), config.channelKeepAliveTimeout());
assertEquals(3, config.healthCheckRetryTimes());
assertEquals(3000L, config.healthCheckTimeOut());
assertEquals(5000L, config.capabilityNegotiationTimeout());
assertEquals(1, config.labels().size());
assertNotNull(config.tlsConfig());
}
@Test
public void testFromProperties() {
Properties properties = new Properties();
properties.setProperty(GrpcConstants.GRPC_NAME, "test");
properties.setProperty(GrpcConstants.GRPC_RETRY_TIMES, "3");
properties.setProperty(GrpcConstants.GRPC_TIMEOUT_MILLS, "3000");
properties.setProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME, "5000");
properties.setProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME, "10000");
properties.setProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE, "2");
properties.setProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE, "8");
properties.setProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT, "3000");
properties.setProperty(GrpcConstants.GRPC_QUEUESIZE, "10000");
properties.setProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE, "10485760");
properties.setProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME, "60000");
properties.setProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT, "20000");
properties.setProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES, "3");
properties.setProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, "3000");
properties.setProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT, "5000");
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) DefaultGrpcClientConfig.newBuilder()
.fromProperties(properties).build();
assertEquals("test", config.name());
assertEquals(3, config.retryTimes());
assertEquals(3000, config.timeOutMills());
assertEquals(5000, config.connectionKeepAlive());
assertEquals(10000, config.threadPoolKeepAlive());
assertEquals(2, config.threadPoolCoreSize());
assertEquals(8, config.threadPoolMaxSize());
assertEquals(3000, config.serverCheckTimeOut());
assertEquals(10000, config.threadPoolQueueSize());
assertEquals(10485760, config.maxInboundMessageSize());
assertEquals(60000, config.channelKeepAlive());
assertEquals(20000, config.channelKeepAliveTimeout());
assertEquals(3, config.healthCheckRetryTimes());
assertEquals(3000, config.healthCheckTimeOut());
assertEquals(5000, config.capabilityNegotiationTimeout());
assertEquals(1, config.labels().size());
assertNotNull(config.tlsConfig());
}
@Test
public void testName() {
String name = "test";
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setName(name);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(name, config.name());
}
@Test
public void testSetRetryTimes() {
int retryTimes = 3;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setRetryTimes(retryTimes);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(retryTimes, config.retryTimes());
}
@Test
public void testSetTimeOutMills() {
long timeOutMills = 3000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setTimeOutMills(timeOutMills);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(timeOutMills, config.timeOutMills());
}
@Test
public void testSetConnectionKeepAlive() {
long connectionKeepAlive = 5000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setConnectionKeepAlive(connectionKeepAlive);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(connectionKeepAlive, config.connectionKeepAlive());
}
@Test
public void testSetThreadPoolKeepAlive() {
long threadPoolKeepAlive = 10000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setThreadPoolKeepAlive(threadPoolKeepAlive);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(threadPoolKeepAlive, config.threadPoolKeepAlive());
}
@Test
public void testSetThreadPoolCoreSize() {
int threadPoolCoreSize = 2;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setThreadPoolCoreSize(threadPoolCoreSize);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(threadPoolCoreSize, config.threadPoolCoreSize());
}
@Test
public void testSetThreadPoolMaxSize() {
int threadPoolMaxSize = 8;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setThreadPoolMaxSize(threadPoolMaxSize);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(threadPoolMaxSize, config.threadPoolMaxSize());
}
@Test
public void testSetServerCheckTimeOut() {
long serverCheckTimeOut = 3000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setServerCheckTimeOut(serverCheckTimeOut);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(serverCheckTimeOut, config.serverCheckTimeOut());
}
@Test
public void testSetThreadPoolQueueSize() {
int threadPoolQueueSize = 10000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setThreadPoolQueueSize(threadPoolQueueSize);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(threadPoolQueueSize, config.threadPoolQueueSize());
}
@Test
public void testSetMaxInboundMessageSize() {
int maxInboundMessageSize = 10485760;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setMaxInboundMessageSize(maxInboundMessageSize);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(maxInboundMessageSize, config.maxInboundMessageSize());
}
@Test
public void testSetChannelKeepAlive() {
int channelKeepAlive = 60000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setChannelKeepAlive(channelKeepAlive);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(channelKeepAlive, config.channelKeepAlive());
}
@Test
public void testSetChannelKeepAliveTimeout() {
int channelKeepAliveTimeout = 20000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setChannelKeepAliveTimeout(channelKeepAliveTimeout);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(channelKeepAliveTimeout, config.channelKeepAliveTimeout());
}
@Test
public void testSetCapabilityNegotiationTimeout() {
long capabilityNegotiationTimeout = 5000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setCapabilityNegotiationTimeout(capabilityNegotiationTimeout);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(capabilityNegotiationTimeout, config.capabilityNegotiationTimeout());
}
@Test
public void testSetHealthCheckRetryTimes() {
int healthCheckRetryTimes = 3;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setHealthCheckRetryTimes(healthCheckRetryTimes);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(healthCheckRetryTimes, config.healthCheckRetryTimes());
}
@Test
public void testSetHealthCheckTimeOut() {
long healthCheckTimeOut = 3000;
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setHealthCheckTimeOut(healthCheckTimeOut);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(healthCheckTimeOut, config.healthCheckTimeOut());
}
@Test
public void testSetLabels() {
Map<String, String> labels = new HashMap<>();
labels.put("key1", "value1");
labels.put("key2", "value2");
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setLabels(labels);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(3, config.labels().size());
assertEquals("value1", config.labels().get("key1"));
assertEquals("value2", config.labels().get("key2"));
}
@Test
public void testSetTlsConfig() {
RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
builder.setTlsConfig(tlsConfig);
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
assertEquals(tlsConfig, config.tlsConfig());
}
@Test
public void testSetTlsConfigDirectly() {
RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
DefaultGrpcClientConfig.Builder builder = DefaultGrpcClientConfig.newBuilder();
DefaultGrpcClientConfig config = (DefaultGrpcClientConfig) builder.build();
config.setTlsConfig(tlsConfig);
assertEquals(tlsConfig, config.tlsConfig());
}
}

View File

@ -17,92 +17,401 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class GrpcClientTest {
protected GrpcClient grpcClient;
@Mock(lenient = true)
@Mock
RpcClientTlsConfig tlsConfig;
protected Method createNewManagedChannelMethod;
protected Method createNewChannelStubMethod;
protected ManagedChannel managedChannel;
protected RpcClient.ServerInfo serverInfo;
@Mock(lenient = true)
protected GrpcClientConfig clientConfig;
protected void init() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
when(clientConfig.channelKeepAlive()).thenReturn(6 * 60 * 1000);
when(clientConfig.channelKeepAliveTimeout()).thenReturn(TimeUnit.SECONDS.toMillis(20L));
RpcClient.ServerInfo serverInfo = spy(new RpcClient.ServerInfo("10.10.10.10", 8848));
createNewManagedChannelMethod = GrpcClient.class.getDeclaredMethod("createNewManagedChannel", String.class,
int.class);
createNewManagedChannelMethod.setAccessible(true);
int port = serverInfo.getServerPort() + grpcClient.rpcPortOffset();
managedChannel = (ManagedChannel) createNewManagedChannelMethod.invoke(grpcClient, serverInfo.getServerIp(),
port);
}
@Before
public void setUp() throws Exception {
when(clientConfig.name()).thenReturn("testClient");
clientConfig = DefaultGrpcClientConfig.newBuilder().setServerCheckTimeOut(100L)
.setCapabilityNegotiationTimeout(100L).setChannelKeepAliveTimeout((int) TimeUnit.SECONDS.toMillis(3L))
.setChannelKeepAlive(1000).setName("testClient").build();
clientConfig.setTlsConfig(tlsConfig);
grpcClient = spy(new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
return 0;
}
});
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
init();
serverInfo = new RpcClient.ServerInfo("10.10.10.10", 8848);
}
@Test
public void testCreateNewManagedChannel() throws InvocationTargetException, IllegalAccessException {
GrpcConnection grpcConnection = new GrpcConnection(serverInfo, null);
grpcConnection.setChannel(managedChannel);
}
@Test
public void createNewChannelStub() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
createNewChannelStubMethod = GrpcClient.class.getDeclaredMethod("createNewChannelStub", ManagedChannel.class);
createNewChannelStubMethod.setAccessible(true);
Object invoke = createNewChannelStubMethod.invoke(grpcClient, managedChannel);
Assert.assertTrue(invoke instanceof RequestGrpc.RequestFutureStub);
}
@After
public void close() {
managedChannel.shutdownNow();
public void tearDown() throws NacosException {
grpcClient.shutdown();
}
@Test
public void testGetConnectionType() {
assertEquals(ConnectionType.GRPC, grpcClient.getConnectionType());
}
@Test
public void testConnectToServerFailed() {
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testConnectToServerException() {
doThrow(new RuntimeException("test")).when(grpcClient).createNewChannelStub(any(ManagedChannel.class));
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testConnectToServerMockSuccess() throws ExecutionException, InterruptedException, TimeoutException {
RequestGrpc.RequestFutureStub stub = mockStub(new ServerCheckResponse(), null);
doReturn(stub).when(grpcClient).createNewChannelStub(any(ManagedChannel.class));
Connection connection = grpcClient.connectToServer(serverInfo);
assertNotNull(connection);
assertTrue(connection instanceof GrpcConnection);
assertEquals(stub, ((GrpcConnection) connection).getGrpcFutureServiceStub());
}
@Test
public void testConnectToServerMockSuccessWithAbility()
throws ExecutionException, InterruptedException, TimeoutException {
ServerCheckResponse response = new ServerCheckResponse();
response.setSupportAbilityNegotiation(true);
RequestGrpc.RequestFutureStub stub = mockStub(response, null);
doReturn(stub).when(grpcClient).createNewChannelStub(any(ManagedChannel.class));
Connection connection = grpcClient.connectToServer(serverInfo);
assertNull(connection);
}
@Test
public void testConnectToServerMockHealthCheckFailed()
throws ExecutionException, InterruptedException, TimeoutException {
RequestGrpc.RequestFutureStub stub = mockStub(null, new RuntimeException("test"));
doReturn(stub).when(grpcClient).createNewChannelStub(any(ManagedChannel.class));
Connection connection = grpcClient.connectToServer(serverInfo);
assertNull(connection);
}
private RequestGrpc.RequestFutureStub mockStub(ServerCheckResponse response, Throwable throwable)
throws InterruptedException, ExecutionException, TimeoutException {
RequestGrpc.RequestFutureStub stub = mock(RequestGrpc.RequestFutureStub.class);
ListenableFuture<Payload> listenableFuture = mock(ListenableFuture.class);
when(stub.request(any(Payload.class))).thenReturn(listenableFuture);
if (null == throwable) {
when(listenableFuture.get(100L, TimeUnit.MILLISECONDS)).thenReturn(GrpcUtils.convert(response));
} else {
when(listenableFuture.get(100L, TimeUnit.MILLISECONDS)).thenThrow(throwable);
}
Channel channel = mock(Channel.class);
when(stub.getChannel()).thenReturn(channel);
ClientCall mockCall = mock(ClientCall.class);
when(channel.newCall(any(), any())).thenReturn(mockCall);
return stub;
}
@Test
public void testBindRequestStreamOnNextSetupAckRequest()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0))
.onNext(GrpcUtils.convert(new SetupAckRequest()));
return null;
});
setCurrentConnection(grpcConnection, grpcClient);
invokeBindRequestStream(grpcClient, stub, grpcConnection);
verify(grpcConnection, never()).sendResponse(any(Response.class));
}
@Test
public void testBindRequestStreamOnNextOtherRequest()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0))
.onNext(GrpcUtils.convert(new ConnectResetRequest()));
return null;
});
grpcClient.registerServerRequestHandler((request, connection) -> {
if (request instanceof ConnectResetRequest) {
return new ConnectResetResponse();
}
return null;
});
setCurrentConnection(grpcConnection, grpcClient);
invokeBindRequestStream(grpcClient, stub, grpcConnection);
verify(grpcConnection).sendResponse(any(ConnectResetResponse.class));
}
@Test
public void testBindRequestStreamOnNextNoRequest()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0))
.onNext(GrpcUtils.convert(new ConnectResetRequest()));
return null;
});
grpcClient.registerServerRequestHandler((request, connection) -> null);
setCurrentConnection(grpcConnection, grpcClient);
invokeBindRequestStream(grpcClient, stub, grpcConnection);
verify(grpcConnection, never()).sendResponse(any(Response.class));
}
@Test
public void testBindRequestStreamOnNextHandleException()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0))
.onNext(GrpcUtils.convert(new ConnectResetRequest()));
return null;
});
grpcClient.registerServerRequestHandler((request, connection) -> {
throw new RuntimeException("test");
});
setCurrentConnection(grpcConnection, grpcClient);
invokeBindRequestStream(grpcClient, stub, grpcConnection);
verify(grpcConnection).sendResponse(any(ErrorResponse.class));
}
@Test
public void testBindRequestStreamOnNextParseException()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0)).onNext(Payload.newBuilder().build());
return null;
});
setCurrentConnection(grpcConnection, grpcClient);
invokeBindRequestStream(grpcClient, stub, grpcConnection);
verify(grpcConnection, never()).sendResponse(any(ErrorResponse.class));
}
@Test
public void testBindRequestStreamOnErrorFromRunning()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0)).onError(new RuntimeException("test"));
return null;
});
setStatus(grpcClient, RpcClientStatus.RUNNING);
setCurrentConnection(grpcConnection, grpcClient);
assertTrue(grpcClient.isRunning());
invokeBindRequestStream(grpcClient, stub, grpcConnection);
assertFalse(grpcClient.isRunning());
}
@Test
public void testBindRequestStreamOnErrorFromNotRunning()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0)).onError(new RuntimeException("test"));
return null;
});
setStatus(grpcClient, RpcClientStatus.WAIT_INIT);
setCurrentConnection(grpcConnection, grpcClient);
assertFalse(grpcClient.isRunning());
assertTrue(grpcClient.isWaitInitiated());
invokeBindRequestStream(grpcClient, stub, grpcConnection);
assertFalse(grpcClient.isRunning());
assertTrue(grpcClient.isWaitInitiated());
}
@Test
public void testBindRequestStreamOnCompletedFromRunning()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0)).onCompleted();
return null;
});
setStatus(grpcClient, RpcClientStatus.RUNNING);
setCurrentConnection(grpcConnection, grpcClient);
assertTrue(grpcClient.isRunning());
invokeBindRequestStream(grpcClient, stub, grpcConnection);
assertFalse(grpcClient.isRunning());
}
@Test
public void testBindRequestStreamOnCompletedFromNotRunning()
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
BiRequestStreamGrpc.BiRequestStreamStub stub = mock(BiRequestStreamGrpc.BiRequestStreamStub.class);
GrpcConnection grpcConnection = mock(GrpcConnection.class);
when(stub.requestBiStream(any())).thenAnswer((Answer<StreamObserver<Payload>>) invocationOnMock -> {
((StreamObserver<Payload>) invocationOnMock.getArgument(0)).onCompleted();
return null;
});
setStatus(grpcClient, RpcClientStatus.WAIT_INIT);
setCurrentConnection(grpcConnection, grpcClient);
assertFalse(grpcClient.isRunning());
assertTrue(grpcClient.isWaitInitiated());
invokeBindRequestStream(grpcClient, stub, grpcConnection);
assertFalse(grpcClient.isRunning());
assertTrue(grpcClient.isWaitInitiated());
}
private void invokeBindRequestStream(GrpcClient grpcClient, BiRequestStreamGrpc.BiRequestStreamStub stub,
GrpcConnection grpcConnection)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method bindRequestStreamMethod = GrpcClient.class
.getDeclaredMethod("bindRequestStream", BiRequestStreamGrpc.BiRequestStreamStub.class,
GrpcConnection.class);
bindRequestStreamMethod.setAccessible(true);
bindRequestStreamMethod.invoke(grpcClient, stub, grpcConnection);
}
private void setCurrentConnection(GrpcConnection connection, GrpcClient client)
throws NoSuchFieldException, IllegalAccessException {
Field connectionField = RpcClient.class.getDeclaredField("currentConnection");
connectionField.setAccessible(true);
connectionField.set(client, connection);
}
private void setStatus(GrpcClient grpcClient, RpcClientStatus status)
throws IllegalAccessException, NoSuchFieldException {
Field statusField = RpcClient.class.getDeclaredField("rpcClientStatus");
statusField.setAccessible(true);
statusField.set(grpcClient, new AtomicReference<>(status));
}
@Test
public void testAfterReset() throws NoSuchFieldException, IllegalAccessException {
Field recAbilityContextField = GrpcClient.class.getDeclaredField("recAbilityContext");
recAbilityContextField.setAccessible(true);
GrpcClient.RecAbilityContext context = mock(GrpcClient.RecAbilityContext.class);
recAbilityContextField.set(grpcClient, context);
grpcClient.afterReset(new ConnectResetRequest());
verify(context).release(null);
}
@Test
public void testAppendRecAbilityContext() {
GrpcClient.RecAbilityContext context = new GrpcClient.RecAbilityContext(null);
GrpcConnection connection = mock(GrpcConnection.class);
context.reset(connection);
assertTrue(context.isNeedToSync());
assertFalse(context.check(connection));
context.release(Collections.emptyMap());
assertFalse(context.isNeedToSync());
verify(connection).setAbilityTable(anyMap());
when(connection.isAbilitiesSet()).thenReturn(true);
assertTrue(context.check(connection));
}
@Test
public void testSendResponseWithException()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
GrpcConnection connection = mock(GrpcConnection.class);
setCurrentConnection(connection, grpcClient);
doThrow(new RuntimeException("test")).when(connection).sendResponse(any(Response.class));
Method sendResponseMethod = GrpcClient.class.getDeclaredMethod("sendResponse", Response.class);
sendResponseMethod.setAccessible(true);
sendResponseMethod.invoke(grpcClient, new ConnectResetResponse());
// don't throw any exception.
}
@Test
public void testConstructorWithServerListFactory() {
ServerListFactory serverListFactory = mock(ServerListFactory.class);
GrpcClient grpcClient = new GrpcClient(clientConfig, serverListFactory) {
@Override
protected AbilityMode abilityMode() {
return null;
}
@Override
public int rpcPortOffset() {
return 0;
}
};
assertFalse(grpcClient.isWaitInitiated());
}
@Test
public void testConstructorWithoutServerListFactory() {
GrpcClient grpcClient = new GrpcClient("testNoFactory", 2, 2, Collections.emptyMap()) {
@Override
protected AbilityMode abilityMode() {
return null;
}
@Override
public int rpcPortOffset() {
return 0;
}
};
assertTrue(grpcClient.isWaitInitiated());
}
}

View File

@ -16,37 +16,25 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import org.junit.Test;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
/**
* Currently not good way to test tls relative codes, and it's a optional feature, single test first.
*/
public class GrpcClientTlsTest extends GrpcClientTest {
@Test
public void testGrpcEnableTlsAndTrustPart() throws Exception {
when(tlsConfig.getEnableTls()).thenReturn(true);
when(tlsConfig.getTrustCollectionCertFile()).thenReturn("ca-cert.pem");
when(tlsConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA-AES256-GCM-SHA384");
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
when(clientConfig.name()).thenReturn("testClient");
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
}
};
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testGrpcEnableTlsAndTrustAll() throws Exception {
when(tlsConfig.getEnableTls()).thenReturn(true);
@ -54,23 +42,9 @@ public class GrpcClientTlsTest extends GrpcClientTest {
when(tlsConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA-AES256-GCM-SHA384");
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
when(tlsConfig.getTrustAll()).thenReturn(true);
when(clientConfig.name()).thenReturn("testClient");
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
}
};
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testGrpcEnableTlsAndEnableMutualAuth() throws Exception {
when(tlsConfig.getEnableTls()).thenReturn(true);
@ -79,21 +53,41 @@ public class GrpcClientTlsTest extends GrpcClientTest {
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
when(tlsConfig.getTrustAll()).thenReturn(true);
when(tlsConfig.getMutualAuthEnable()).thenReturn(true);
when(tlsConfig.getTrustCollectionCertFile()).thenReturn("ca-cert.pem");
when(tlsConfig.getCiphers()).thenReturn("client-cert.pem");
when(tlsConfig.getCertPrivateKey()).thenReturn("client-key.pem");
when(clientConfig.name()).thenReturn("testClient");
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
}
};
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testGrpcSslProvider() {
when(tlsConfig.getEnableTls()).thenReturn(true);
when(tlsConfig.getTrustCollectionCertFile()).thenReturn("ca-cert.pem");
when(tlsConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA-AES256-GCM-SHA384");
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
when(tlsConfig.getTrustAll()).thenReturn(true);
when(tlsConfig.getMutualAuthEnable()).thenReturn(true);
when(tlsConfig.getCertPrivateKey()).thenReturn("client-key.pem");
when(tlsConfig.getSslProvider()).thenReturn("JDK");
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testGrpcEmptyTrustCollectionCertFile() {
when(tlsConfig.getEnableTls()).thenReturn(true);
when(tlsConfig.getTrustCollectionCertFile()).thenReturn("");
when(tlsConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA-AES256-GCM-SHA384");
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
assertNull(grpcClient.connectToServer(serverInfo));
}
@Test
public void testGrpcMutualAuth() {
when(tlsConfig.getEnableTls()).thenReturn(true);
when(tlsConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA-AES256-GCM-SHA384");
when(tlsConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
when(tlsConfig.getMutualAuthEnable()).thenReturn(true);
when(tlsConfig.getTrustAll()).thenReturn(true);
when(tlsConfig.getCertChainFile()).thenReturn("classpath:test-tls-cert.pem");
when(tlsConfig.getCertPrivateKey()).thenReturn("classpath:test-tls-cert.pem");
assertNull(grpcClient.connectToServer(serverInfo));
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.After;
import org.junit.Test;
import java.util.Collections;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class GrpcClusterClientTest {
GrpcClusterClient grpcClusterClient;
@After
public void tearDown() throws NacosException {
System.clearProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY);
if (grpcClusterClient != null) {
grpcClusterClient.shutdown();
}
}
@Test
public void testAbilityMode() {
grpcClusterClient = new GrpcClusterClient("test");
assertEquals(AbilityMode.CLUSTER_CLIENT, grpcClusterClient.abilityMode());
}
@Test
public void testRpcPortOffsetDefault() {
grpcClusterClient = new GrpcClusterClient(new Properties());
assertEquals(1001, grpcClusterClient.rpcPortOffset());
}
@Test
public void testRpcPortOffsetFromSystemProperty() {
System.setProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY, "10001");
grpcClusterClient = new GrpcClusterClient("test", 8, 8, Collections.emptyMap());
assertEquals(10001, grpcClusterClient.rpcPortOffset());
}
@Test
public void testGrpcClientByConfig() {
GrpcClientConfig config = DefaultGrpcClientConfig.newBuilder().setName("test111").build();
grpcClusterClient = new GrpcClusterClient(config);
assertEquals("test111", grpcClusterClient.getName());
}
}

View File

@ -0,0 +1,249 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.Metadata;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.HealthCheckResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.PayloadRegistry;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class GrpcConnectionTest {
@Mock
private Executor executor;
@Mock
private ManagedChannel channel;
@Mock
private StreamObserver<Payload> payloadStreamObserver;
@Mock
private RequestGrpc.RequestFutureStub requestFutureStub;
@Mock
ListenableFuture<Payload> future;
Payload responsePayload;
Payload errorResponsePayload;
GrpcConnection connection;
@BeforeClass
public static void setUpBeforeClass() {
PayloadRegistry.init();
}
@Before
public void setUp() throws Exception {
connection = new GrpcConnection(new RpcClient.ServerInfo(), executor);
connection.setChannel(channel);
connection.setPayloadStreamObserver(payloadStreamObserver);
connection.setGrpcFutureServiceStub(requestFutureStub);
when(requestFutureStub.request(any(Payload.class))).thenReturn(future);
responsePayload = GrpcUtils.convert(new HealthCheckResponse());
errorResponsePayload = GrpcUtils.convert(ErrorResponse.build(500, "test"));
when(future.get()).thenReturn(responsePayload);
when(future.get(100L, TimeUnit.MILLISECONDS)).thenReturn(responsePayload);
when(future.isDone()).thenReturn(true);
}
@After
public void tearDown() throws Exception {
connection.close();
}
@Test
public void testGetAll() {
assertEquals(channel, connection.getChannel());
assertEquals(payloadStreamObserver, connection.getPayloadStreamObserver());
assertEquals(requestFutureStub, connection.getGrpcFutureServiceStub());
}
@Test
public void testRequestSuccessSync() throws NacosException {
Response response = connection.request(new HealthCheckRequest(), -1);
assertTrue(response instanceof HealthCheckResponse);
}
@Test
public void testRequestSuccessAsync() throws NacosException {
Response response = connection.request(new HealthCheckRequest(), 100);
assertTrue(response instanceof HealthCheckResponse);
}
@Test(expected = NacosException.class)
public void testRequestTimeout() throws InterruptedException, ExecutionException, TimeoutException, NacosException {
when(future.get(100L, TimeUnit.MILLISECONDS)).thenThrow(new TimeoutException("test"));
connection.request(new HealthCheckRequest(), 100);
}
@Test
public void testRequestFuture() throws Exception {
RequestFuture requestFuture = connection.requestFuture(new HealthCheckRequest());
assertTrue(requestFuture.isDone());
Response response = requestFuture.get();
assertTrue(response instanceof HealthCheckResponse);
}
@Test
public void testRequestFutureWithTimeout() throws Exception {
RequestFuture requestFuture = connection.requestFuture(new HealthCheckRequest());
assertTrue(requestFuture.isDone());
Response response = requestFuture.get(100L);
assertTrue(response instanceof HealthCheckResponse);
}
@Test(expected = NacosException.class)
public void testRequestFutureFailure() throws Exception {
when(future.get()).thenReturn(errorResponsePayload);
RequestFuture requestFuture = connection.requestFuture(new HealthCheckRequest());
assertTrue(requestFuture.isDone());
requestFuture.get();
}
@Test(expected = NacosException.class)
public void testRequestFutureWithTimeoutFailure() throws Exception {
when(future.get(100L, TimeUnit.MILLISECONDS)).thenReturn(errorResponsePayload);
RequestFuture requestFuture = connection.requestFuture(new HealthCheckRequest());
assertTrue(requestFuture.isDone());
requestFuture.get(100L);
}
@Test
public void testSendResponse() {
connection.sendResponse(new HealthCheckResponse());
verify(payloadStreamObserver).onNext(any(Payload.class));
}
@Test
public void testSendRequest() {
connection.sendRequest(new HealthCheckRequest());
verify(payloadStreamObserver).onNext(any(Payload.class));
}
@Test
public void testAsyncRequestSuccess() throws NacosException {
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(future).addListener(any(Runnable.class), eq(executor));
RequestCallBack requestCallBack = mock(RequestCallBack.class);
connection.asyncRequest(new HealthCheckRequest(), requestCallBack);
verify(requestCallBack).onResponse(any(HealthCheckResponse.class));
}
@Test
public void testAsyncRequestError() throws NacosException, ExecutionException, InterruptedException {
when(future.get()).thenReturn(errorResponsePayload);
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(future).addListener(any(Runnable.class), eq(executor));
RequestCallBack requestCallBack = mock(RequestCallBack.class);
connection.asyncRequest(new HealthCheckRequest(), requestCallBack);
verify(requestCallBack).onException(any(NacosException.class));
}
@Test
public void testAsyncRequestNullResponse() throws NacosException, ExecutionException, InterruptedException {
byte[] jsonBytes = JacksonUtils.toJsonBytes(null);
Metadata.Builder metaBuilder = Metadata.newBuilder().setType(HealthCheckResponse.class.getSimpleName());
Payload nullResponsePayload = Payload.newBuilder()
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes)))
.setMetadata(metaBuilder.build()).build();
when(future.get()).thenReturn(nullResponsePayload);
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(future).addListener(any(Runnable.class), eq(executor));
RequestCallBack requestCallBack = mock(RequestCallBack.class);
connection.asyncRequest(new HealthCheckRequest(), requestCallBack);
verify(requestCallBack).onException(any(NacosException.class));
}
@Test
public void testAsyncRequestWithCancelException() throws NacosException, ExecutionException, InterruptedException {
when(future.get()).thenThrow(new CancellationException("test"));
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(future).addListener(any(Runnable.class), eq(executor));
RequestCallBack requestCallBack = mock(RequestCallBack.class);
connection.asyncRequest(new HealthCheckRequest(), requestCallBack);
verify(requestCallBack).onException(any(TimeoutException.class));
}
@Test
public void testAsyncRequestWithOtherException() throws NacosException, ExecutionException, InterruptedException {
when(future.get()).thenThrow(new RuntimeException("test"));
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(future).addListener(any(Runnable.class), eq(executor));
RequestCallBack requestCallBack = mock(RequestCallBack.class);
connection.asyncRequest(new HealthCheckRequest(), requestCallBack);
verify(requestCallBack).onException(any(RuntimeException.class));
}
@Test
public void testCloseWithException() {
doThrow(new RuntimeException("test")).when(payloadStreamObserver).onCompleted();
when(channel.shutdownNow()).thenThrow(new RuntimeException("test"));
connection.close();
// don't throw any exception
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.After;
import org.junit.Test;
import java.util.Collections;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class GrpcSdkClientTest {
GrpcSdkClient grpcSdkClient;
@After
public void tearDown() throws NacosException {
System.clearProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY);
if (grpcSdkClient != null) {
grpcSdkClient.shutdown();
}
}
@Test
public void testAbilityMode() {
grpcSdkClient = new GrpcSdkClient("test");
assertEquals(AbilityMode.SDK_CLIENT, grpcSdkClient.abilityMode());
}
@Test
public void testRpcPortOffsetDefault() {
grpcSdkClient = new GrpcSdkClient(new Properties());
assertEquals(1000, grpcSdkClient.rpcPortOffset());
}
@Test
public void testRpcPortOffsetFromSystemProperty() {
System.setProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY, "10000");
grpcSdkClient = new GrpcSdkClient("test", 8, 8, Collections.emptyMap());
assertEquals(10000, grpcSdkClient.rpcPortOffset());
}
@Test
public void testGrpcClientByConfig() {
GrpcClientConfig config = DefaultGrpcClientConfig.newBuilder().setName("test111").build();
grpcSdkClient = new GrpcSdkClient(config);
assertEquals("test111", grpcSdkClient.getName());
}
}

View File

@ -19,15 +19,20 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.grpc.auto.Metadata;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.common.remote.PayloadRegistry;
import com.alibaba.nacos.common.remote.exception.RemoteException;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GrpcUtilsTest {
@ -76,6 +81,16 @@ public class GrpcUtilsTest {
assertEquals("v3", convert.getMetadata().getHeadersMap().get("h3"));
}
@Test
public void testConvertRequestWithMeta() {
RequestMeta meta = new RequestMeta();
Payload convert = GrpcUtils.convert(request, meta);
assertEquals(request.getClass().getSimpleName(), convert.getMetadata().getType());
assertEquals("v1", convert.getMetadata().getHeadersMap().get("h1"));
assertEquals("v2", convert.getMetadata().getHeadersMap().get("h2"));
assertEquals("v3", convert.getMetadata().getHeadersMap().get("h3"));
}
@Test
public void testConvertResponse() {
Payload convert = GrpcUtils.convert(response);
@ -83,7 +98,7 @@ public class GrpcUtilsTest {
}
@Test
public void parse() {
public void testParse() {
Payload requestPayload = GrpcUtils.convert(request);
ServiceQueryRequest request = (ServiceQueryRequest) GrpcUtils.parse(requestPayload);
@ -97,4 +112,12 @@ public class GrpcUtilsTest {
assertEquals(this.response.getMetrics(), response.getMetrics());
}
@Test(expected = RemoteException.class)
public void testParseNullType() {
Payload mockPayload = mock(Payload.class);
Metadata mockMetadata = mock(Metadata.class);
when(mockPayload.getMetadata()).thenReturn(mockMetadata);
GrpcUtils.parse(mockPayload);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.exception;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class RemoteExceptionTest {
@Test
public void testConnectionAlreadyClosedException() {
ConnectionAlreadyClosedException exception = new ConnectionAlreadyClosedException("test message");
assertEquals(600, exception.getErrCode());
assertEquals("errCode: 600, errMsg: test message ", exception.getMessage());
assertNull(exception.getCause());
exception = new ConnectionAlreadyClosedException();
assertEquals(600, exception.getErrCode());
assertNull(exception.getMessage());
assertNull(exception.getCause());
RuntimeException caused = new RuntimeException("test cause");
exception = new ConnectionAlreadyClosedException(caused);
assertEquals(600, exception.getErrCode());
assertEquals(caused, exception.getCause());
assertEquals("java.lang.RuntimeException: test cause", exception.getMessage());
}
@Test
public void testConnectionBusyException() {
String msg = "Connection is busy";
ConnectionBusyException exception = new ConnectionBusyException(msg);
assertEquals(601, exception.getErrCode());
assertEquals("errCode: 601, errMsg: " + msg + " ", exception.getMessage());
assertNull(exception.getCause());
RuntimeException caused = new RuntimeException("test cause");
exception = new ConnectionBusyException(caused);
assertEquals(601, exception.getErrCode());
assertEquals(caused, exception.getCause());
}
}