diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java index 8c37bf842..cecdaa825 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java @@ -20,22 +20,20 @@ import java.util.Map; import static com.alibaba.nacos.api.common.Constants.Remote.INTERNAL_MODULE; -/**. - * @author Daydreamer - * @description Server tells the client that the connection is established +/** + * Server tells the client that the connection is established. + * + * @author Daydreamer. * @date 2022/7/12 19:21 **/ public class SetupAckRequest extends ServerRequest { - private String connectionId; - private Map abilityTable; public SetupAckRequest() { } - public SetupAckRequest(String connectionId, Map abilityTable) { - this.connectionId = connectionId; + public SetupAckRequest(Map abilityTable) { this.abilityTable = abilityTable; } @@ -47,14 +45,6 @@ public class SetupAckRequest extends ServerRequest { this.abilityTable = abilityTable; } - public String getConnectionId() { - return connectionId; - } - - public void setConnectionId(String connectionId) { - this.connectionId = connectionId; - } - @Override public String getModule() { return INTERNAL_MODULE; diff --git a/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload b/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload index d838e65ea..5e9552afd 100644 --- a/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload +++ b/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload @@ -22,6 +22,8 @@ com.alibaba.nacos.api.remote.request.PushAckRequest com.alibaba.nacos.api.remote.request.ServerCheckRequest com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest com.alibaba.nacos.api.remote.request.ServerReloadRequest +com.alibaba.nacos.api.remote.request.SetupAckRequest +com.alibaba.nacos.api.remote.response.SetupAckResponse com.alibaba.nacos.api.remote.response.ClientDetectionResponse com.alibaba.nacos.api.remote.response.ConnectResetResponse com.alibaba.nacos.api.remote.response.ErrorResponse diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java index 685405022..a191081f9 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java @@ -59,6 +59,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { private int healthCheckRetryTimes; private long healthCheckTimeOut; + + private long capabilityNegotiationTimeout; private Map labels; @@ -90,6 +92,7 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.healthCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, builder.healthCheckTimeOut); this.channelKeepAliveTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT, builder.channelKeepAliveTimeout); + this.capabilityNegotiationTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT, builder.capabilityNegotiationTimeout); this.labels = builder.labels; this.labels.put("tls.enable", "false"); if (Objects.nonNull(builder.tlsConfig)) { @@ -177,6 +180,11 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.tlsConfig = tlsConfig; } + @Override + public long capabilityNegotiationTimeout() { + return this.capabilityNegotiationTimeout; + } + @Override public int healthCheckRetryTimes() { return healthCheckRetryTimes; @@ -225,6 +233,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { private int healthCheckRetryTimes = 3; private long healthCheckTimeOut = 3000L; + + private long capabilityNegotiationTimeout = 5000L; private Map labels = new HashMap<>(); @@ -280,6 +290,10 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.channelKeepAlive = Integer.parseInt( properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)); } + if (properties.contains(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) { + this.capabilityNegotiationTimeout = Integer.parseInt( + properties.getProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)); + } if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) { this.healthCheckRetryTimes = Integer.parseInt( properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)); @@ -398,7 +412,11 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.channelKeepAliveTimeout = channelKeepAliveTimeout; return this; } - + + public void setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) { + this.capabilityNegotiationTimeout = capabilityNegotiationTimeout; + } + /** * set healthCheckRetryTimes. */ diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 121fd96d2..c4a96fccd 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -32,11 +32,12 @@ import com.alibaba.nacos.api.remote.response.SetupAckResponse; import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.packagescan.resource.Resource; import com.alibaba.nacos.common.remote.ConnectionType; -import com.alibaba.nacos.common.remote.client.RpcClient; -import com.alibaba.nacos.common.remote.client.RpcClientStatus; -import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; +import com.alibaba.nacos.common.remote.client.RpcClientStatus; +import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.ServerListFactory; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.common.remote.client.ServerRequestHandler; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.StringUtils; @@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.Arrays; import java.util.Properties; @@ -87,7 +87,7 @@ public abstract class GrpcClient extends RpcClient { /** * Block to wait setup success response. */ - private final Map markForSetup = new ConcurrentHashMap<>(); + private final RecAbilityContext recAbilityContext = new RecAbilityContext(null); @Override public ConnectionType getConnectionType() { @@ -140,23 +140,7 @@ public abstract class GrpcClient extends RpcClient { */ private void initSetupHandler() { // register to handler setup request - registerServerRequestHandler((request, connection) -> { - // if finish setup - if (request instanceof SetupAckRequest) { - SetupAckRequest setupAckRequest = (SetupAckRequest) request; - // remove and count down - RecAbilityContext context = markForSetup.remove(setupAckRequest.getConnectionId()); - if (context != null) { - // set server abilities - context.connection.setAbilityTable(setupAckRequest.getAbilityTable()); - // notify - java.util.Optional.ofNullable(context.blocker) - .orElse(new CountDownLatch(1)) - .countDown(); - } - } - return new SetupAckResponse(); - }); + registerServerRequestHandler(new SetupRequestHandler(this.recAbilityContext)); } /** @@ -309,12 +293,7 @@ public abstract class GrpcClient extends RpcClient { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8()); // remove and notify - RecAbilityContext context = markForSetup.remove(grpcConn.getConnectionId()); - if (context != null) { - Optional.ofNullable(context.blocker) - .orElse(new CountDownLatch(1)) - .countDown(); - } + recAbilityContext.release(null); } } @@ -396,7 +375,7 @@ public abstract class GrpcClient extends RpcClient { // if not supported, it will be false if (serverCheckResponse.isSupportAbilityNegotiation()) { // mark - markForSetup.put(serverCheckResponse.getConnectionId(), new RecAbilityContext(grpcConn, new CountDownLatch(1))); + this.recAbilityContext.reset(grpcConn); } //create stream request and bind connection event to this connection. @@ -415,13 +394,12 @@ public abstract class GrpcClient extends RpcClient { conSetupRequest.setTenant(super.getTenant()); grpcConn.sendRequest(conSetupRequest); // wait for response - RecAbilityContext synResponse = markForSetup.get(connectionId); - if (synResponse != null) { + if (recAbilityContext.isNeedToSync()) { // try to wait for notify response - synResponse.blocker.await(5000L, TimeUnit.MICROSECONDS); + recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS); } else { // leave for adapting old version server - //wait to register connection setup + // wait to register connection setup Thread.sleep(100L); } return grpcConn; @@ -430,25 +408,14 @@ public abstract class GrpcClient extends RpcClient { } catch (Exception e) { LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); // remove and notify - RecAbilityContext context = markForSetup.remove(connectionId); - if (context != null) { - Optional.ofNullable(context.blocker) - .orElse(new CountDownLatch(1)) - .countDown(); - } + recAbilityContext.release(null); } return null; } @Override protected void afterReset(ConnectResetRequest request) { - RecAbilityContext context = markForSetup.remove(request.getConnectionId()); - if (context != null) { - // remove and notify - java.util.Optional.ofNullable(context.blocker) - .orElse(new CountDownLatch(1)) - .countDown(); - } + recAbilityContext.release(null); } /** @@ -459,32 +426,92 @@ public abstract class GrpcClient extends RpcClient { /** * connection waiting for server abilities. */ - private Connection connection; + private volatile Connection connection; /** * way to block client. */ - private CountDownLatch blocker; + private volatile CountDownLatch blocker; - public RecAbilityContext(Connection connection, CountDownLatch blocker) { + private volatile boolean needToSync = false; + + public RecAbilityContext(Connection connection) { this.connection = connection; - this.blocker = blocker; + this.blocker = new CountDownLatch(1); } - public Connection getConnection() { - return connection; + /** + * whether to sync for ability table. + * + * @return whether to sync for ability table. + */ + public boolean isNeedToSync() { + return this.needToSync; } - public void setConnection(Connection connection) { + /** + * reset with new connection which is waiting for ability table. + * + * @param connection new connection which is waiting for ability table. + */ + public void reset(Connection connection) { this.connection = connection; + this.blocker = new CountDownLatch(1); + this.needToSync = true; } - public CountDownLatch getBlocker() { - return blocker; + /** + * notify sync by abilities. + * + * @param abilities abilities. + */ + public void release(Map abilities) { + if (this.connection != null) { + this.connection.setAbilityTable(abilities); + // avoid repeat setting + this.connection = null; + } + if (this.blocker != null) { + blocker.countDown(); + } + this.needToSync = false; } - public void setBlocker(CountDownLatch blocker) { - this.blocker = blocker; + /** + * await for abilities. + * + * @param timeout timeout. + * @param unit unit. + * @throws InterruptedException by blocker. + */ + public void await(long timeout, TimeUnit unit) throws InterruptedException { + if (this.blocker != null) { + this.blocker.await(timeout, unit); + } + this.needToSync = false; + } + } + + /** + * Setup response handler. + */ + class SetupRequestHandler implements ServerRequestHandler { + + private final RecAbilityContext abilityContext; + + public SetupRequestHandler(RecAbilityContext abilityContext) { + this.abilityContext = abilityContext; + } + + @Override + public Response requestReply(Request request, Connection connection) { + // if finish setup + if (request instanceof SetupAckRequest) { + SetupAckRequest setupAckRequest = (SetupAckRequest) request; + // remove and count down + recAbilityContext.release(setupAckRequest.getAbilityTable()); + } + return new SetupAckResponse(); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java index 776072830..1c1b4003b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java @@ -96,4 +96,11 @@ public interface GrpcClientConfig extends RpcClientConfig { */ void setTlsConfig(RpcClientTlsConfig tlsConfig); + /** + * get timeout of connection setup(TimeUnit.MILLISECONDS). + * + * @return timeout of connection setup + */ + long capabilityNegotiationTimeout(); + } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java index c1afa0744..908997def 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java @@ -79,6 +79,9 @@ public class GrpcConstants { @GRpcConfigLabel public static final String GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.keep.alive.timeout"; + @GRpcConfigLabel + public static final String GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.capability.negotiation.timeout"; + private static final Set CONFIG_NAMES = new HashSet<>(); @Documented diff --git a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java index 796e2c6d0..281d0ebb2 100644 --- a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java @@ -642,7 +642,7 @@ public class RpcClientTest { return null; } }; - rpcClient.serverRequestHandlers.add(req -> { + rpcClient.serverRequestHandlers.add((req, conn) -> { throw new RuntimeException(); }); rpcClient.handleServerRequest(request); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java index fbadeab25..d01420b62 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java @@ -138,8 +138,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt } else { try { // finish register, tell client has set up successfully - connection.request(new SetupAckRequest(connectionId, - NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()), 3000L); + connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()), 3000L); } catch (Exception e) { // nothing to do