ssl context reload spi (#10150)
This commit is contained in:
parent
7df6f6d47c
commit
c39ba4a35c
@ -71,18 +71,18 @@ import java.util.concurrent.TimeUnit;
|
||||
*/
|
||||
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
||||
public abstract class GrpcClient extends RpcClient {
|
||||
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
|
||||
|
||||
|
||||
private final GrpcClientConfig clientConfig;
|
||||
|
||||
|
||||
private ThreadPoolExecutor grpcExecutor;
|
||||
|
||||
|
||||
@Override
|
||||
public ConnectionType getConnectionType() {
|
||||
return ConnectionType.GRPC;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
@ -91,7 +91,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
public GrpcClient(String name) {
|
||||
this(DefaultGrpcClientConfig.newBuilder().setName(name).build());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
@ -100,7 +100,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
public GrpcClient(Properties properties) {
|
||||
this(DefaultGrpcClientConfig.newBuilder().fromProperties(properties).build());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
@ -110,7 +110,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
super(clientConfig);
|
||||
this.clientConfig = clientConfig;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
@ -121,7 +121,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
super(clientConfig, serverListFactory);
|
||||
this.clientConfig = clientConfig;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
@ -134,12 +134,13 @@ public abstract class GrpcClient extends RpcClient {
|
||||
this(DefaultGrpcClientConfig.newBuilder().setName(name).setThreadPoolCoreSize(threadPoolCoreSize)
|
||||
.setThreadPoolMaxSize(threadPoolMaxSize).setLabels(labels).build());
|
||||
}
|
||||
|
||||
public GrpcClient(String name, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
|
||||
this(DefaultGrpcClientConfig.newBuilder().setName(name).setThreadPoolCoreSize(threadPoolCoreSize).setTlsConfig(tlsConfig)
|
||||
.setThreadPoolMaxSize(threadPoolMaxSize).setLabels(labels).build());
|
||||
|
||||
public GrpcClient(String name, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels,
|
||||
RpcClientTlsConfig tlsConfig) {
|
||||
this(DefaultGrpcClientConfig.newBuilder().setName(name).setThreadPoolCoreSize(threadPoolCoreSize)
|
||||
.setTlsConfig(tlsConfig).setThreadPoolMaxSize(threadPoolMaxSize).setLabels(labels).build());
|
||||
}
|
||||
|
||||
|
||||
protected ThreadPoolExecutor createGrpcExecutor(String serverIp) {
|
||||
// Thread name will use String.format, ipv6 maybe contain special word %, so handle it first.
|
||||
serverIp = serverIp.replaceAll("%", "-");
|
||||
@ -151,7 +152,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
grpcExecutor.allowCoreThreadTimeOut(true);
|
||||
return grpcExecutor;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void shutdown() throws NacosException {
|
||||
super.shutdown();
|
||||
@ -160,7 +161,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
grpcExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a stub using a channel.
|
||||
*
|
||||
@ -170,7 +171,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
private RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel managedChannelTemp) {
|
||||
return RequestGrpc.newFutureStub(managedChannelTemp);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* create a new channel with specific server address.
|
||||
*
|
||||
@ -181,15 +182,15 @@ public abstract class GrpcClient extends RpcClient {
|
||||
private ManagedChannel createNewManagedChannel(String serverIp, int serverPort) {
|
||||
LOGGER.info("grpc client connection server:{} ip,serverPort:{},grpcTslConfig:{}", serverIp, serverPort,
|
||||
JacksonUtils.toJson(clientConfig.tlsConfig()));
|
||||
ManagedChannelBuilder<?> managedChannelBuilder = buildChannel(serverIp, serverPort, buildSslContext())
|
||||
.executor(grpcExecutor).compressorRegistry(CompressorRegistry.getDefaultInstance())
|
||||
ManagedChannelBuilder<?> managedChannelBuilder = buildChannel(serverIp, serverPort, buildSslContext()).executor(
|
||||
grpcExecutor).compressorRegistry(CompressorRegistry.getDefaultInstance())
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.maxInboundMessageSize(clientConfig.maxInboundMessageSize())
|
||||
.keepAliveTime(clientConfig.channelKeepAlive(), TimeUnit.MILLISECONDS)
|
||||
.keepAliveTimeout(clientConfig.channelKeepAliveTimeout(), TimeUnit.MILLISECONDS);
|
||||
return managedChannelBuilder.build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* shutdown a channel.
|
||||
*
|
||||
@ -200,7 +201,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
managedChannel.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check server if success.
|
||||
*
|
||||
@ -221,25 +222,30 @@ public abstract class GrpcClient extends RpcClient {
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER,
|
||||
"Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
|
||||
if (this.clientConfig != null && this.clientConfig.tlsConfig() != null && this.clientConfig.tlsConfig()
|
||||
.getEnableTls()) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER,
|
||||
"current client is require tls encrypted ,server must support tls ,please check");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
|
||||
final GrpcConnection grpcConn) {
|
||||
|
||||
final GrpcConnection grpcConn) {
|
||||
|
||||
return streamStub.requestBiStream(new StreamObserver<Payload>() {
|
||||
|
||||
|
||||
@Override
|
||||
public void onNext(Payload payload) {
|
||||
|
||||
|
||||
LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
|
||||
grpcConn.getConnectionId(), payload.toString());
|
||||
try {
|
||||
Object parseBody = GrpcUtils.parse(payload);
|
||||
final Request request = (Request) parseBody;
|
||||
if (request != null) {
|
||||
|
||||
|
||||
try {
|
||||
Response response = handleServerRequest(request);
|
||||
if (response != null) {
|
||||
@ -249,7 +255,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(),
|
||||
request.getRequestId());
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
|
||||
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
|
||||
@ -258,16 +264,16 @@ public abstract class GrpcClient extends RpcClient {
|
||||
errResponse.setRequestId(request.getRequestId());
|
||||
sendResponse(errResponse);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
|
||||
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
boolean isRunning = isRunning();
|
||||
@ -278,14 +284,14 @@ public abstract class GrpcClient extends RpcClient {
|
||||
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
|
||||
switchServerAsync();
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
|
||||
grpcConn.getConnectionId(), isRunning, isAbandon);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
boolean isRunning = isRunning();
|
||||
@ -296,16 +302,16 @@ public abstract class GrpcClient extends RpcClient {
|
||||
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
|
||||
switchServerAsync();
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}",
|
||||
grpcConn.getConnectionId(), isRunning, isAbandon);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void sendResponse(Response response) {
|
||||
try {
|
||||
((GrpcConnection) this.currentConnection).sendResponse(response);
|
||||
@ -314,7 +320,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
response.getRequestId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Connection connectToServer(ServerInfo serverInfo) {
|
||||
try {
|
||||
@ -325,21 +331,21 @@ public abstract class GrpcClient extends RpcClient {
|
||||
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
|
||||
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
|
||||
if (newChannelStubTemp != null) {
|
||||
|
||||
|
||||
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
|
||||
if (response == null || !(response instanceof ServerCheckResponse)) {
|
||||
shuntDownChannel(managedChannel);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
|
||||
newChannelStubTemp.getChannel());
|
||||
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
|
||||
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
|
||||
|
||||
|
||||
//create stream request and bind connection event to this connection.
|
||||
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
|
||||
|
||||
|
||||
// stream observer to send response to server
|
||||
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
|
||||
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
|
||||
@ -361,21 +367,19 @@ public abstract class GrpcClient extends RpcClient {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
private ManagedChannelBuilder buildChannel(String serverIp, int port, Optional<SslContext> sslContext) {
|
||||
if (sslContext.isPresent()) {
|
||||
return NettyChannelBuilder.forAddress(serverIp, port)
|
||||
.negotiationType(NegotiationType.TLS)
|
||||
return NettyChannelBuilder.forAddress(serverIp, port).negotiationType(NegotiationType.TLS)
|
||||
.sslContext(sslContext.get());
|
||||
|
||||
|
||||
} else {
|
||||
return ManagedChannelBuilder
|
||||
.forAddress(serverIp, port).usePlaintext();
|
||||
return ManagedChannelBuilder.forAddress(serverIp, port).usePlaintext();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Optional<SslContext> buildSslContext() {
|
||||
|
||||
|
||||
RpcClientTlsConfig tlsConfig = clientConfig.tlsConfig();
|
||||
if (!tlsConfig.getEnableTls()) {
|
||||
return Optional.absent();
|
||||
@ -398,14 +402,16 @@ public abstract class GrpcClient extends RpcClient {
|
||||
Resource resource = resourceLoader.getResource(tlsConfig.getTrustCollectionCertFile());
|
||||
builder.trustManager(resource.getInputStream());
|
||||
}
|
||||
|
||||
|
||||
if (tlsConfig.getMutualAuthEnable()) {
|
||||
if (StringUtils.isBlank(tlsConfig.getCertChainFile()) || StringUtils.isBlank(tlsConfig.getCertPrivateKey())) {
|
||||
if (StringUtils.isBlank(tlsConfig.getCertChainFile()) || StringUtils.isBlank(
|
||||
tlsConfig.getCertPrivateKey())) {
|
||||
throw new IllegalArgumentException("client certChainFile or certPrivateKey must be not null");
|
||||
}
|
||||
Resource certChainFile = resourceLoader.getResource(tlsConfig.getCertChainFile());
|
||||
Resource privateKey = resourceLoader.getResource(tlsConfig.getCertPrivateKey());
|
||||
builder.keyManager(certChainFile.getInputStream(), privateKey.getInputStream(), tlsConfig.getCertPrivateKeyPassword());
|
||||
builder.keyManager(certChainFile.getInputStream(), privateKey.getInputStream(),
|
||||
tlsConfig.getCertPrivateKeyPassword());
|
||||
}
|
||||
return Optional.of(builder.build());
|
||||
} catch (Exception e) {
|
||||
|
@ -16,7 +16,6 @@
|
||||
|
||||
package com.alibaba.nacos.core.remote;
|
||||
|
||||
import com.alibaba.nacos.common.JustForTest;
|
||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||
import com.alibaba.nacos.common.remote.PayloadRegistry;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
@ -34,31 +33,32 @@ import javax.annotation.PreDestroy;
|
||||
* @version $Id: BaseRpcServer.java, v 0.1 2020年07月13日 3:41 PM liuzunfei Exp $
|
||||
*/
|
||||
public abstract class BaseRpcServer {
|
||||
|
||||
|
||||
static {
|
||||
PayloadRegistry.init();
|
||||
}
|
||||
|
||||
|
||||
@Autowired
|
||||
protected RpcServerTlsConfig grpcServerConfig;
|
||||
|
||||
@JustForTest
|
||||
public void setGrpcServerConfig(RpcServerTlsConfig grpcServerConfig) {
|
||||
this.grpcServerConfig = grpcServerConfig;
|
||||
}
|
||||
|
||||
protected RpcServerTlsConfig rpcServerTlsConfig;
|
||||
|
||||
/**
|
||||
* Start sever.
|
||||
*/
|
||||
@PostConstruct
|
||||
public void start() throws Exception {
|
||||
String serverName = getClass().getSimpleName();
|
||||
String tlsConfig = JacksonUtils.toJson(grpcServerConfig);
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
|
||||
String tlsConfig = JacksonUtils.toJson(rpcServerTlsConfig);
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(),
|
||||
tlsConfig);
|
||||
|
||||
startServer();
|
||||
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
|
||||
|
||||
if (RpcServerSslContextRefresherHolder.getInstance() != null) {
|
||||
RpcServerSslContextRefresherHolder.getInstance().refresh(this);
|
||||
}
|
||||
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(),
|
||||
tlsConfig);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
|
||||
try {
|
||||
@ -68,7 +68,7 @@ public abstract class BaseRpcServer {
|
||||
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
|
||||
}
|
||||
}));
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -78,6 +78,19 @@ public abstract class BaseRpcServer {
|
||||
*/
|
||||
public abstract ConnectionType getConnectionType();
|
||||
|
||||
public RpcServerTlsConfig getRpcServerTlsConfig() {
|
||||
return rpcServerTlsConfig;
|
||||
}
|
||||
|
||||
public void setRpcServerTlsConfig(RpcServerTlsConfig rpcServerTlsConfig) {
|
||||
this.rpcServerTlsConfig = rpcServerTlsConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* reload ssl context.
|
||||
*/
|
||||
public abstract void reloadSslContext();
|
||||
|
||||
/**
|
||||
* Start sever.
|
||||
*
|
||||
@ -115,5 +128,5 @@ public abstract class BaseRpcServer {
|
||||
*/
|
||||
@PreDestroy
|
||||
public abstract void shutdownServer();
|
||||
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,41 @@
|
||||
package com.alibaba.nacos.core.remote;
|
||||
|
||||
/**
|
||||
* ssl context refresher spi holder.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: RequestFilters.java, v 0.1 2023年03月17日 12:00 PM liuzunfei Exp $
|
||||
*/
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public interface RpcServerSslContextRefresher {
|
||||
|
||||
/**
|
||||
* listener current rpc server and do something on ssl context change.
|
||||
*
|
||||
* @param baseRpcServer rpc server.
|
||||
* @return
|
||||
*/
|
||||
SslContextChangeAware refresh(BaseRpcServer baseRpcServer);
|
||||
|
||||
/**
|
||||
* refresher name.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String getName();
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.core.remote;
|
||||
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.core.utils.Loggers;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* ssl context refresher spi holder.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: RequestFilters.java, v 0.1 2023年03月17日 12:00 PM liuzunfei Exp $
|
||||
*/
|
||||
public class RpcServerSslContextRefresherHolder {
|
||||
|
||||
private static RpcServerSslContextRefresher instance;
|
||||
|
||||
private static volatile boolean init = false;
|
||||
|
||||
public static RpcServerSslContextRefresher getInstance() {
|
||||
if (init) {
|
||||
return instance;
|
||||
}
|
||||
synchronized (RpcServerSslContextRefresherHolder.class) {
|
||||
if (init) {
|
||||
return instance;
|
||||
}
|
||||
RpcServerTlsConfig rpcServerTlsConfig = ApplicationUtils.getBean(RpcServerTlsConfig.class);
|
||||
String sslContextRefresher = rpcServerTlsConfig.getSslContextRefresher();
|
||||
if (StringUtils.isNotBlank(sslContextRefresher)) {
|
||||
Collection<RpcServerSslContextRefresher> load = NacosServiceLoader.load(
|
||||
RpcServerSslContextRefresher.class);
|
||||
for (RpcServerSslContextRefresher contextRefresher : load) {
|
||||
if (sslContextRefresher.equals(contextRefresher.getName())) {
|
||||
instance = contextRefresher;
|
||||
Loggers.REMOTE.info("RpcServerSslContextRefresher of Name {} Founded->{}", sslContextRefresher,
|
||||
contextRefresher.getClass().getSimpleName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (instance == null) {
|
||||
Loggers.REMOTE.info("RpcServerSslContextRefresher of Name {} not found", sslContextRefresher);
|
||||
}
|
||||
|
||||
} else {
|
||||
Loggers.REMOTE.info(
|
||||
"No RpcServerSslContextRefresher specified,Ssl Context auto refresh not supported.");
|
||||
}
|
||||
|
||||
Loggers.REMOTE.info("RpcServerSslContextRefresher init end");
|
||||
init = true;
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
}
|
@ -31,7 +31,9 @@ import org.springframework.stereotype.Component;
|
||||
public class RpcServerTlsConfig extends TlsConfig {
|
||||
|
||||
public static final String PREFIX = "nacos.remote.server.rpc.tls";
|
||||
|
||||
|
||||
private String sslContextRefresher = "";
|
||||
|
||||
private Boolean compatibility = true;
|
||||
|
||||
public Boolean getCompatibility() {
|
||||
@ -41,4 +43,12 @@ public class RpcServerTlsConfig extends TlsConfig {
|
||||
public void setCompatibility(Boolean compatibility) {
|
||||
this.compatibility = compatibility;
|
||||
}
|
||||
|
||||
public String getSslContextRefresher() {
|
||||
return sslContextRefresher;
|
||||
}
|
||||
|
||||
public void setSslContextRefresher(String sslContextRefresher) {
|
||||
this.sslContextRefresher = sslContextRefresher;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.core.remote;
|
||||
|
||||
/**
|
||||
* ssl context refresher spi holder.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: RequestFilters.java, v 0.1 2023年03月17日 12:00 PM liuzunfei Exp $
|
||||
*/
|
||||
public interface SslContextChangeAware {
|
||||
|
||||
/**
|
||||
* init rpc server ssl context.
|
||||
*
|
||||
* @param baseRpcServer rpc server.
|
||||
*/
|
||||
void init(BaseRpcServer baseRpcServer);
|
||||
|
||||
/**
|
||||
* do something on ssl context change.
|
||||
*/
|
||||
void onSslContextChange();
|
||||
|
||||
/**
|
||||
* shutdown to clear context.
|
||||
*/
|
||||
void shutdown();
|
||||
}
|
@ -22,10 +22,12 @@ import com.alibaba.nacos.common.packagescan.resource.Resource;
|
||||
import com.alibaba.nacos.common.packagescan.resource.ResourceLoader;
|
||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.common.utils.TlsTypeResolve;
|
||||
import com.alibaba.nacos.core.remote.BaseRpcServer;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
import com.alibaba.nacos.core.utils.Loggers;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
@ -61,63 +63,78 @@ import java.util.concurrent.TimeUnit;
|
||||
* @version $Id: BaseGrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $
|
||||
*/
|
||||
public abstract class BaseGrpcServer extends BaseRpcServer {
|
||||
|
||||
|
||||
private Server server;
|
||||
|
||||
|
||||
private final ResourceLoader resourceLoader = new DefaultResourceLoader();
|
||||
|
||||
|
||||
@Autowired
|
||||
private GrpcRequestAcceptor grpcCommonRequestAcceptor;
|
||||
|
||||
|
||||
@Autowired
|
||||
private GrpcBiStreamRequestAcceptor grpcBiStreamRequestAcceptor;
|
||||
|
||||
|
||||
@Autowired
|
||||
private ConnectionManager connectionManager;
|
||||
|
||||
|
||||
private OptionalTlsProtocolNegotiator optionalTlsProtocolNegotiator;
|
||||
|
||||
@Override
|
||||
public ConnectionType getConnectionType() {
|
||||
return ConnectionType.GRPC;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void startServer() throws Exception {
|
||||
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
|
||||
addServices(handlerRegistry, new GrpcConnectionInterceptor());
|
||||
NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
|
||||
|
||||
if (grpcServerConfig.getEnableTls()) {
|
||||
if (grpcServerConfig.getCompatibility()) {
|
||||
builder.protocolNegotiator(new OptionalTlsProtocolNegotiator(getSslContextBuilder()));
|
||||
} else {
|
||||
builder.sslContext(getSslContextBuilder());
|
||||
}
|
||||
|
||||
if (rpcServerTlsConfig.getEnableTls()) {
|
||||
builder.protocolNegotiator(
|
||||
new OptionalTlsProtocolNegotiator(getSslContextBuilder(), rpcServerTlsConfig.getCompatibility()));
|
||||
|
||||
}
|
||||
|
||||
|
||||
server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
|
||||
.compressorRegistry(CompressorRegistry.getDefaultInstance())
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.addTransportFilter(new AddressTransportFilter(connectionManager))
|
||||
.keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
|
||||
.keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
|
||||
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
|
||||
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* reload ssl context.
|
||||
*/
|
||||
public void reloadSslContext() {
|
||||
if (optionalTlsProtocolNegotiator != null) {
|
||||
try {
|
||||
optionalTlsProtocolNegotiator.setSslContext(getSslContextBuilder());
|
||||
} catch (Throwable throwable) {
|
||||
Loggers.REMOTE.info("Nacos {} Rpc server reload ssl context fail at port {} and tls config:{}",
|
||||
this.getClass().getSimpleName(), getServicePort(),
|
||||
JacksonUtils.toJson(super.rpcServerTlsConfig));
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected long getPermitKeepAliveTime() {
|
||||
return GrpcServerConstants.GrpcConfig.DEFAULT_GRPC_PERMIT_KEEP_ALIVE_TIME;
|
||||
}
|
||||
|
||||
|
||||
protected long getKeepAliveTime() {
|
||||
return GrpcServerConstants.GrpcConfig.DEFAULT_GRPC_KEEP_ALIVE_TIME;
|
||||
}
|
||||
|
||||
|
||||
protected long getKeepAliveTimeout() {
|
||||
return GrpcServerConstants.GrpcConfig.DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT;
|
||||
}
|
||||
|
||||
|
||||
protected int getMaxInboundMessageSize() {
|
||||
Integer property = EnvUtil.getProperty(GrpcServerConstants.GrpcConfig.MAX_INBOUND_MSG_SIZE_PROPERTY,
|
||||
Integer.class);
|
||||
@ -126,88 +143,90 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
|
||||
}
|
||||
return GrpcServerConstants.GrpcConfig.DEFAULT_GRPC_MAX_INBOUND_MSG_SIZE;
|
||||
}
|
||||
|
||||
|
||||
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
|
||||
|
||||
|
||||
// unary common call register.
|
||||
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
|
||||
.setType(MethodDescriptor.MethodType.UNARY)
|
||||
.setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
|
||||
GrpcServerConstants.REQUEST_METHOD_NAME))
|
||||
.setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(
|
||||
MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
|
||||
GrpcServerConstants.REQUEST_METHOD_NAME))
|
||||
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
|
||||
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
|
||||
|
||||
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
|
||||
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
|
||||
|
||||
|
||||
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
|
||||
(request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
|
||||
|
||||
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
|
||||
GrpcServerConstants.REQUEST_SERVICE_NAME)
|
||||
.addMethod(unaryPayloadMethod, payloadHandler).build();
|
||||
GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
|
||||
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
|
||||
|
||||
|
||||
// bi stream register.
|
||||
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
|
||||
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
|
||||
|
||||
|
||||
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
|
||||
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
|
||||
.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
|
||||
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(
|
||||
MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
|
||||
GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
|
||||
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
|
||||
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
|
||||
|
||||
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
|
||||
.builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
|
||||
|
||||
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(
|
||||
GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
|
||||
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void shutdownServer() {
|
||||
if (server != null) {
|
||||
server.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private SslContext getSslContextBuilder() {
|
||||
try {
|
||||
if (StringUtils.isBlank(grpcServerConfig.getCertChainFile()) || StringUtils.isBlank(grpcServerConfig.getCertPrivateKey())) {
|
||||
if (StringUtils.isBlank(rpcServerTlsConfig.getCertChainFile()) || StringUtils.isBlank(
|
||||
rpcServerTlsConfig.getCertPrivateKey())) {
|
||||
throw new IllegalArgumentException("Server certChainFile or certPrivateKey must be not null");
|
||||
}
|
||||
InputStream certificateChainFile = getInputStream(grpcServerConfig.getCertChainFile(), "certChainFile");
|
||||
InputStream privateKeyFile = getInputStream(grpcServerConfig.getCertPrivateKey(), "certPrivateKey");
|
||||
SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(certificateChainFile, privateKeyFile,
|
||||
grpcServerConfig.getCertPrivateKeyPassword());
|
||||
|
||||
if (StringUtils.isNotBlank(grpcServerConfig.getProtocols())) {
|
||||
sslClientContextBuilder.protocols(grpcServerConfig.getProtocols().split(","));
|
||||
InputStream certificateChainFile = getInputStream(rpcServerTlsConfig.getCertChainFile(), "certChainFile");
|
||||
InputStream privateKeyFile = getInputStream(rpcServerTlsConfig.getCertPrivateKey(), "certPrivateKey");
|
||||
SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(certificateChainFile,
|
||||
privateKeyFile, rpcServerTlsConfig.getCertPrivateKeyPassword());
|
||||
|
||||
if (StringUtils.isNotBlank(rpcServerTlsConfig.getProtocols())) {
|
||||
sslClientContextBuilder.protocols(rpcServerTlsConfig.getProtocols().split(","));
|
||||
}
|
||||
|
||||
if (StringUtils.isNotBlank(grpcServerConfig.getCiphers())) {
|
||||
sslClientContextBuilder.ciphers(Arrays.asList(grpcServerConfig.getCiphers().split(",")));
|
||||
|
||||
if (StringUtils.isNotBlank(rpcServerTlsConfig.getCiphers())) {
|
||||
sslClientContextBuilder.ciphers(Arrays.asList(rpcServerTlsConfig.getCiphers().split(",")));
|
||||
}
|
||||
if (grpcServerConfig.getMutualAuthEnable()) {
|
||||
if (rpcServerTlsConfig.getMutualAuthEnable()) {
|
||||
// trust all certificate
|
||||
if (grpcServerConfig.getTrustAll()) {
|
||||
if (rpcServerTlsConfig.getTrustAll()) {
|
||||
sslClientContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
|
||||
} else {
|
||||
if (StringUtils.isBlank(grpcServerConfig.getTrustCollectionCertFile())) {
|
||||
throw new IllegalArgumentException("enable mutual auth,trustCollectionCertFile must be not null");
|
||||
if (StringUtils.isBlank(rpcServerTlsConfig.getTrustCollectionCertFile())) {
|
||||
throw new IllegalArgumentException(
|
||||
"enable mutual auth,trustCollectionCertFile must be not null");
|
||||
}
|
||||
|
||||
InputStream clientCert = getInputStream(grpcServerConfig.getTrustCollectionCertFile(), "trustCollectionCertFile");
|
||||
|
||||
InputStream clientCert = getInputStream(rpcServerTlsConfig.getTrustCollectionCertFile(),
|
||||
"trustCollectionCertFile");
|
||||
sslClientContextBuilder.trustManager(clientCert);
|
||||
}
|
||||
sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
}
|
||||
SslContextBuilder configure = GrpcSslContexts.configure(sslClientContextBuilder,
|
||||
TlsTypeResolve.getSslProvider(grpcServerConfig.getSslProvider()));
|
||||
TlsTypeResolve.getSslProvider(rpcServerTlsConfig.getSslProvider()));
|
||||
return configure.build();
|
||||
} catch (SSLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private InputStream getInputStream(String path, String config) {
|
||||
try {
|
||||
Resource resource = resourceLoader.getResource(path);
|
||||
@ -216,12 +235,12 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
|
||||
throw new RuntimeException(config + " load fail", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get rpc executor.
|
||||
*
|
||||
* @return executor.
|
||||
*/
|
||||
public abstract ThreadPoolExecutor getRpcExecutor();
|
||||
|
||||
|
||||
}
|
||||
|
@ -32,40 +32,45 @@ import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* support the tls and plain protocol one the same port.
|
||||
* support the tls and plain protocol one the same port.
|
||||
*
|
||||
* @author githubcheng2978.
|
||||
*/
|
||||
public class OptionalTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
|
||||
|
||||
|
||||
private static final int MAGIC_VALUE = 5;
|
||||
|
||||
|
||||
private boolean supportPlainText;
|
||||
|
||||
private SslContext sslContext;
|
||||
|
||||
public OptionalTlsProtocolNegotiator(SslContext sslContext) {
|
||||
|
||||
public OptionalTlsProtocolNegotiator(SslContext sslContext, boolean supportPlainText) {
|
||||
this.sslContext = sslContext;
|
||||
this.supportPlainText = supportPlainText;
|
||||
}
|
||||
|
||||
void setSslContext(SslContext sslContext) {
|
||||
this.sslContext = sslContext;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public AsciiString scheme() {
|
||||
return AsciiString.of("https");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHttp2ConnectionHandler) {
|
||||
ChannelHandler plaintext =
|
||||
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
|
||||
ChannelHandler ssl =
|
||||
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
|
||||
ChannelHandler plaintext = InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
|
||||
ChannelHandler ssl = InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
|
||||
ChannelHandler decoder = new PortUnificationServerHandler(ssl, plaintext);
|
||||
return decoder;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private ProtocolNegotiationEvent getDefPne() {
|
||||
ProtocolNegotiationEvent protocolNegotiationEvent = null;
|
||||
try {
|
||||
@ -77,31 +82,31 @@ public class OptionalTlsProtocolNegotiator implements InternalProtocolNegotiator
|
||||
}
|
||||
return protocolNegotiationEvent;
|
||||
}
|
||||
|
||||
|
||||
public class PortUnificationServerHandler extends ByteToMessageDecoder {
|
||||
|
||||
private ProtocolNegotiationEvent pne;
|
||||
|
||||
|
||||
private final ChannelHandler ssl;
|
||||
|
||||
|
||||
private final ChannelHandler plaintext;
|
||||
|
||||
|
||||
public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler plaintext) {
|
||||
this.ssl = ssl;
|
||||
this.plaintext = plaintext;
|
||||
this.pne = getDefPne();
|
||||
}
|
||||
|
||||
|
||||
private boolean isSsl(ByteBuf buf) {
|
||||
return SslHandler.isEncrypted(buf);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
|
||||
throws Exception {
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
if (in.readableBytes() < MAGIC_VALUE) {
|
||||
return;
|
||||
}
|
||||
if (isSsl(in)) {
|
||||
if (isSsl(in) || !supportPlainText) {
|
||||
ctx.pipeline().addAfter(ctx.name(), (String) null, this.ssl);
|
||||
ctx.fireUserEventTriggered(pne);
|
||||
ctx.pipeline().remove(this);
|
||||
@ -112,5 +117,5 @@ public class OptionalTlsProtocolNegotiator implements InternalProtocolNegotiator
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -20,10 +20,14 @@ package com.alibaba.nacos.core.remote.grpc;
|
||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||
import com.alibaba.nacos.core.remote.RpcServerTlsConfig;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.springframework.mock.env.MockEnvironment;
|
||||
|
||||
@ -40,36 +44,46 @@ import static org.mockito.Mockito.when;
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.Silent.class)
|
||||
public class GrpcServerTest {
|
||||
|
||||
|
||||
private final RpcServerTlsConfig grpcServerConfig = mock(RpcServerTlsConfig.class);
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
static MockedStatic<ApplicationUtils> applicationUtilsMockedStatic = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
EnvUtil.setEnvironment(new MockEnvironment());
|
||||
applicationUtilsMockedStatic = Mockito.mockStatic(ApplicationUtils.class);
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void after() {
|
||||
applicationUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrpcSdkServer() throws Exception {
|
||||
BaseGrpcServer grpcSdkServer = new GrpcSdkServer();
|
||||
grpcSdkServer.setGrpcServerConfig(grpcServerConfig);
|
||||
grpcSdkServer.setRpcServerTlsConfig(grpcServerConfig);
|
||||
when(grpcServerConfig.getEnableTls()).thenReturn(false);
|
||||
when(ApplicationUtils.getBean(RpcServerTlsConfig.class)).thenReturn(grpcServerConfig);
|
||||
grpcSdkServer.start();
|
||||
Assert.assertEquals(grpcSdkServer.getConnectionType(), ConnectionType.GRPC);
|
||||
Assert.assertEquals(grpcSdkServer.rpcPortOffset(), 1000);
|
||||
grpcSdkServer.stopServer();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGrpcClusterServer() throws Exception {
|
||||
BaseGrpcServer grpcSdkServer = new GrpcClusterServer();
|
||||
grpcSdkServer.setGrpcServerConfig(grpcServerConfig);
|
||||
grpcSdkServer.setRpcServerTlsConfig(grpcServerConfig);
|
||||
when(grpcServerConfig.getEnableTls()).thenReturn(false);
|
||||
when(ApplicationUtils.getBean(RpcServerTlsConfig.class)).thenReturn(grpcServerConfig);
|
||||
grpcSdkServer.start();
|
||||
Assert.assertEquals(grpcSdkServer.getConnectionType(), ConnectionType.GRPC);
|
||||
Assert.assertEquals(grpcSdkServer.rpcPortOffset(), 1001);
|
||||
grpcSdkServer.stopServer();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGrpcEnableTls() throws Exception {
|
||||
final BaseGrpcServer grpcSdkServer = new BaseGrpcServer() {
|
||||
@ -77,7 +91,7 @@ public class GrpcServerTest {
|
||||
public ThreadPoolExecutor getRpcExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 100;
|
||||
@ -86,40 +100,41 @@ public class GrpcServerTest {
|
||||
when(grpcServerConfig.getEnableTls()).thenReturn(true);
|
||||
when(grpcServerConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256,ECDHE-RSA-AES256-GCM-SHA384");
|
||||
when(grpcServerConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
|
||||
|
||||
|
||||
when(grpcServerConfig.getCertPrivateKey()).thenReturn("test-server-key.pem");
|
||||
when(grpcServerConfig.getCertChainFile()).thenReturn("test-server-cert.pem");
|
||||
grpcSdkServer.setGrpcServerConfig(grpcServerConfig);
|
||||
when(ApplicationUtils.getBean(RpcServerTlsConfig.class)).thenReturn(grpcServerConfig);
|
||||
grpcSdkServer.setRpcServerTlsConfig(grpcServerConfig);
|
||||
grpcSdkServer.start();
|
||||
grpcSdkServer.shutdownServer();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGrpcEnableMutualAuthAndTrustAll() throws Exception {
|
||||
|
||||
|
||||
final BaseGrpcServer grpcSdkServer = new BaseGrpcServer() {
|
||||
@Override
|
||||
public ThreadPoolExecutor getRpcExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 100;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
when(grpcServerConfig.getEnableTls()).thenReturn(true);
|
||||
when(grpcServerConfig.getTrustAll()).thenReturn(true);
|
||||
when(grpcServerConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256,ECDHE-RSA-AES256-GCM-SHA384");
|
||||
when(grpcServerConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
|
||||
when(grpcServerConfig.getCertPrivateKey()).thenReturn("test-server-key.pem");
|
||||
when(grpcServerConfig.getCertChainFile()).thenReturn("test-server-cert.pem");
|
||||
grpcSdkServer.setGrpcServerConfig(grpcServerConfig);
|
||||
grpcSdkServer.setRpcServerTlsConfig(grpcServerConfig);
|
||||
grpcSdkServer.start();
|
||||
grpcSdkServer.shutdownServer();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGrpcEnableMutualAuthAndPart() throws Exception {
|
||||
final BaseGrpcServer grpcSdkServer = new BaseGrpcServer() {
|
||||
@ -127,7 +142,7 @@ public class GrpcServerTest {
|
||||
public ThreadPoolExecutor getRpcExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 100;
|
||||
@ -138,13 +153,13 @@ public class GrpcServerTest {
|
||||
when(grpcServerConfig.getEnableTls()).thenReturn(true);
|
||||
when(grpcServerConfig.getCiphers()).thenReturn("ECDHE-RSA-AES128-GCM-SHA256,ECDHE-RSA-AES256-GCM-SHA384");
|
||||
when(grpcServerConfig.getProtocols()).thenReturn("TLSv1.2,TLSv1.3");
|
||||
|
||||
|
||||
when(grpcServerConfig.getCertPrivateKey()).thenReturn("test-server-key.pem");
|
||||
when(grpcServerConfig.getCertChainFile()).thenReturn("test-server-cert.pem");
|
||||
when(grpcServerConfig.getTrustCollectionCertFile()).thenReturn("test-ca-cert.pem");
|
||||
|
||||
grpcSdkServer.setGrpcServerConfig(grpcServerConfig);
|
||||
|
||||
|
||||
grpcSdkServer.setRpcServerTlsConfig(grpcServerConfig);
|
||||
|
||||
grpcSdkServer.start();
|
||||
grpcSdkServer.shutdownServer();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user