remove connectionId in SetupAckRequest, expose timeout param for capability negotiation, add uncommitted file

This commit is contained in:
Daydreamer-ia 2023-09-24 22:14:06 +08:00
parent fc8549dc8f
commit e5f54badb3
8 changed files with 122 additions and 76 deletions

View File

@ -20,22 +20,20 @@ import java.util.Map;
import static com.alibaba.nacos.api.common.Constants.Remote.INTERNAL_MODULE;
/**.
* @author Daydreamer
* @description Server tells the client that the connection is established
/**
* Server tells the client that the connection is established.
*
* @author Daydreamer.
* @date 2022/7/12 19:21
**/
public class SetupAckRequest extends ServerRequest {
private String connectionId;
private Map<String, Boolean> abilityTable;
public SetupAckRequest() {
}
public SetupAckRequest(String connectionId, Map<String, Boolean> abilityTable) {
this.connectionId = connectionId;
public SetupAckRequest(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
@ -47,14 +45,6 @@ public class SetupAckRequest extends ServerRequest {
this.abilityTable = abilityTable;
}
public String getConnectionId() {
return connectionId;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
@Override
public String getModule() {
return INTERNAL_MODULE;

View File

@ -22,6 +22,8 @@ com.alibaba.nacos.api.remote.request.PushAckRequest
com.alibaba.nacos.api.remote.request.ServerCheckRequest
com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest
com.alibaba.nacos.api.remote.request.ServerReloadRequest
com.alibaba.nacos.api.remote.request.SetupAckRequest
com.alibaba.nacos.api.remote.response.SetupAckResponse
com.alibaba.nacos.api.remote.response.ClientDetectionResponse
com.alibaba.nacos.api.remote.response.ConnectResetResponse
com.alibaba.nacos.api.remote.response.ErrorResponse

View File

@ -60,6 +60,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
private long healthCheckTimeOut;
private long capabilityNegotiationTimeout;
private Map<String, String> labels;
private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
@ -90,6 +92,7 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.healthCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, builder.healthCheckTimeOut);
this.channelKeepAliveTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT,
builder.channelKeepAliveTimeout);
this.capabilityNegotiationTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT, builder.capabilityNegotiationTimeout);
this.labels = builder.labels;
this.labels.put("tls.enable", "false");
if (Objects.nonNull(builder.tlsConfig)) {
@ -177,6 +180,11 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.tlsConfig = tlsConfig;
}
@Override
public long capabilityNegotiationTimeout() {
return this.capabilityNegotiationTimeout;
}
@Override
public int healthCheckRetryTimes() {
return healthCheckRetryTimes;
@ -226,6 +234,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
private long healthCheckTimeOut = 3000L;
private long capabilityNegotiationTimeout = 5000L;
private Map<String, String> labels = new HashMap<>();
private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
@ -280,6 +290,10 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.channelKeepAlive = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) {
this.capabilityNegotiationTimeout = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES));
@ -399,6 +413,10 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
return this;
}
public void setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) {
this.capabilityNegotiationTimeout = capabilityNegotiationTimeout;
}
/**
* set healthCheckRetryTimes.
*/

View File

@ -32,11 +32,12 @@ import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.Arrays;
import java.util.Properties;
@ -87,7 +87,7 @@ public abstract class GrpcClient extends RpcClient {
/**
* Block to wait setup success response.
*/
private final Map<String, RecAbilityContext> markForSetup = new ConcurrentHashMap<>();
private final RecAbilityContext recAbilityContext = new RecAbilityContext(null);
@Override
public ConnectionType getConnectionType() {
@ -140,23 +140,7 @@ public abstract class GrpcClient extends RpcClient {
*/
private void initSetupHandler() {
// register to handler setup request
registerServerRequestHandler((request, connection) -> {
// if finish setup
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
RecAbilityContext context = markForSetup.remove(setupAckRequest.getConnectionId());
if (context != null) {
// set server abilities
context.connection.setAbilityTable(setupAckRequest.getAbilityTable());
// notify
java.util.Optional.ofNullable(context.blocker)
.orElse(new CountDownLatch(1))
.countDown();
}
}
return new SetupAckResponse();
});
registerServerRequestHandler(new SetupRequestHandler(this.recAbilityContext));
}
/**
@ -309,12 +293,7 @@ public abstract class GrpcClient extends RpcClient {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
// remove and notify
RecAbilityContext context = markForSetup.remove(grpcConn.getConnectionId());
if (context != null) {
Optional.ofNullable(context.blocker)
.orElse(new CountDownLatch(1))
.countDown();
}
recAbilityContext.release(null);
}
}
@ -396,7 +375,7 @@ public abstract class GrpcClient extends RpcClient {
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
markForSetup.put(serverCheckResponse.getConnectionId(), new RecAbilityContext(grpcConn, new CountDownLatch(1)));
this.recAbilityContext.reset(grpcConn);
}
//create stream request and bind connection event to this connection.
@ -415,13 +394,12 @@ public abstract class GrpcClient extends RpcClient {
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
RecAbilityContext synResponse = markForSetup.get(connectionId);
if (synResponse != null) {
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
synResponse.blocker.await(5000L, TimeUnit.MICROSECONDS);
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
} else {
// leave for adapting old version server
//wait to register connection setup
// wait to register connection setup
Thread.sleep(100L);
}
return grpcConn;
@ -430,25 +408,14 @@ public abstract class GrpcClient extends RpcClient {
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
RecAbilityContext context = markForSetup.remove(connectionId);
if (context != null) {
Optional.ofNullable(context.blocker)
.orElse(new CountDownLatch(1))
.countDown();
}
recAbilityContext.release(null);
}
return null;
}
@Override
protected void afterReset(ConnectResetRequest request) {
RecAbilityContext context = markForSetup.remove(request.getConnectionId());
if (context != null) {
// remove and notify
java.util.Optional.ofNullable(context.blocker)
.orElse(new CountDownLatch(1))
.countDown();
}
recAbilityContext.release(null);
}
/**
@ -459,32 +426,92 @@ public abstract class GrpcClient extends RpcClient {
/**
* connection waiting for server abilities.
*/
private Connection connection;
private volatile Connection connection;
/**
* way to block client.
*/
private CountDownLatch blocker;
private volatile CountDownLatch blocker;
public RecAbilityContext(Connection connection, CountDownLatch blocker) {
private volatile boolean needToSync = false;
public RecAbilityContext(Connection connection) {
this.connection = connection;
this.blocker = blocker;
this.blocker = new CountDownLatch(1);
}
public Connection getConnection() {
return connection;
/**
* whether to sync for ability table.
*
* @return whether to sync for ability table.
*/
public boolean isNeedToSync() {
return this.needToSync;
}
public void setConnection(Connection connection) {
/**
* reset with new connection which is waiting for ability table.
*
* @param connection new connection which is waiting for ability table.
*/
public void reset(Connection connection) {
this.connection = connection;
this.blocker = new CountDownLatch(1);
this.needToSync = true;
}
public CountDownLatch getBlocker() {
return blocker;
/**
* notify sync by abilities.
*
* @param abilities abilities.
*/
public void release(Map<String, Boolean> abilities) {
if (this.connection != null) {
this.connection.setAbilityTable(abilities);
// avoid repeat setting
this.connection = null;
}
if (this.blocker != null) {
blocker.countDown();
}
this.needToSync = false;
}
public void setBlocker(CountDownLatch blocker) {
this.blocker = blocker;
/**
* await for abilities.
*
* @param timeout timeout.
* @param unit unit.
* @throws InterruptedException by blocker.
*/
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (this.blocker != null) {
this.blocker.await(timeout, unit);
}
this.needToSync = false;
}
}
/**
* Setup response handler.
*/
class SetupRequestHandler implements ServerRequestHandler {
private final RecAbilityContext abilityContext;
public SetupRequestHandler(RecAbilityContext abilityContext) {
this.abilityContext = abilityContext;
}
@Override
public Response requestReply(Request request, Connection connection) {
// if finish setup
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
recAbilityContext.release(setupAckRequest.getAbilityTable());
}
return new SetupAckResponse();
}
}

View File

@ -96,4 +96,11 @@ public interface GrpcClientConfig extends RpcClientConfig {
*/
void setTlsConfig(RpcClientTlsConfig tlsConfig);
/**
* get timeout of connection setup(TimeUnit.MILLISECONDS).
*
* @return timeout of connection setup
*/
long capabilityNegotiationTimeout();
}

View File

@ -79,6 +79,9 @@ public class GrpcConstants {
@GRpcConfigLabel
public static final String GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.keep.alive.timeout";
@GRpcConfigLabel
public static final String GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.capability.negotiation.timeout";
private static final Set<String> CONFIG_NAMES = new HashSet<>();
@Documented

View File

@ -642,7 +642,7 @@ public class RpcClientTest {
return null;
}
};
rpcClient.serverRequestHandlers.add(req -> {
rpcClient.serverRequestHandlers.add((req, conn) -> {
throw new RuntimeException();
});
rpcClient.handleServerRequest(request);

View File

@ -138,8 +138,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
} else {
try {
// finish register, tell client has set up successfully
connection.request(new SetupAckRequest(connectionId,
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()), 3000L);
connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()), 3000L);
} catch (Exception e) {
// nothing to do