Fix npe when setup ack response in GrpcClient (#11210)

* fix npe when setup ack response in GrpcClient

* server sync response, avoid blocking setup ack response sent to server.

* remove setup response from client to server, change way of server pushing abilities by sendNoACK

* add UT for negotiation timeout

* fix checkstyle

* fix PMD error

* private check logical into ReceiveAbilityContext.

* fix indent
This commit is contained in:
chenyiqin 2023-10-13 09:48:06 +08:00 committed by GitHub
parent 5d8973cad5
commit a5c0a60a1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 31 deletions

View File

@ -57,6 +57,10 @@ public abstract class Connection implements Requester {
}
return abilityTable.get(abilityKey.getName()) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
public boolean isAbilitiesSet() {
return abilityTable != null;
}
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;

View File

@ -33,18 +33,18 @@ import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
@ -60,9 +60,10 @@ import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@ -89,6 +90,11 @@ public abstract class GrpcClient extends RpcClient {
*/
private final RecAbilityContext recAbilityContext = new RecAbilityContext(null);
/**
* for receiving server abilities.
*/
private SetupRequestHandler setupRequestHandler;
@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
@ -140,7 +146,7 @@ public abstract class GrpcClient extends RpcClient {
*/
private void initSetupHandler() {
// register to handler setup request
registerServerRequestHandler(new SetupRequestHandler(this.recAbilityContext));
setupRequestHandler = new SetupRequestHandler(this.recAbilityContext);
}
/**
@ -268,6 +274,11 @@ public abstract class GrpcClient extends RpcClient {
if (request != null) {
try {
if (request instanceof SetupAckRequest) {
// there is no connection ready this time
setupRequestHandler.requestReply(request, null);
return;
}
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
@ -374,6 +385,8 @@ public abstract class GrpcClient extends RpcClient {
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
// promise null if no abilities receive
grpcConn.setAbilityTable(null);
}
//create stream request and bind connection event to this connection.
@ -395,10 +408,9 @@ public abstract class GrpcClient extends RpcClient {
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
boolean waitForResponse = recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(),
TimeUnit.MILLISECONDS);
if (!waitForResponse) {
// haven't received a response for registration; need to register again.
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
// if no server abilities receiving, then reconnect
if (!recAbilityContext.check(grpcConn)) {
return null;
}
} else {
@ -488,22 +500,36 @@ public abstract class GrpcClient extends RpcClient {
}
/**
* Wait for a specified duration for a condition to be met.
* await for abilities.
*
* @param timeout The maximum time to wait.
* @param unit The time unit for the timeout.
* @return true if the condition was successfully awaited, false otherwise.
* @throws InterruptedException if the waiting thread is interrupted.
* @param timeout timeout.
* @param unit unit.
* @throws InterruptedException by blocker.
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (blocker != null) {
boolean waitForResponse = blocker.await(timeout, unit);
if (waitForResponse) {
needToSync = false;
}
return waitForResponse;
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (this.blocker != null) {
this.blocker.await(timeout, unit);
}
return false;
this.needToSync = false;
}
/**
* check whether receive abilities.
*
* @param connection conn.
* @return whether receive abilities.
*/
public boolean check(Connection connection) {
if (!connection.isAbilitiesSet()) {
LOGGER.error("Client don't receive server abilities table even empty table but server supports ability negotiation."
+ " You can check if it is need to adjust the timeout of ability negotiation by property: {}"
+ " if always fail to connect.",
GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT);
connection.setAbandon(true);
connection.close();
return false;
}
return true;
}
}
@ -524,7 +550,8 @@ public abstract class GrpcClient extends RpcClient {
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
recAbilityContext.release(setupAckRequest.getAbilityTable());
recAbilityContext.release(Optional.ofNullable(setupAckRequest.getAbilityTable())
.orElse(new HashMap<>(0)));
return new SetupAckResponse();
}
return null;

View File

@ -26,7 +26,6 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMeta;
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
@ -115,7 +114,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
Connection connection = new GrpcConnection(metaInfo, responseObserver,
GrpcConnection connection = new GrpcConnection(metaInfo, responseObserver,
GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
// null if supported
if (setUpRequest.getAbilityTable() != null) {
@ -140,8 +139,9 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
} else {
try {
// finish register, tell client has set up successfully
connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance()
.getCurrentNodeAbilities(AbilityMode.SERVER)), 3000L);
// async response without client ack
connection.sendRequestNoAck(new SetupAckRequest(NacosAbilityManagerHolder.getInstance()
.getCurrentNodeAbilities(AbilityMode.SERVER)));
} catch (Exception e) {
// nothing to do

View File

@ -52,7 +52,13 @@ public class GrpcConnection extends Connection {
this.channel = channel;
}
private void sendRequestNoAck(Request request) throws NacosException {
/**
* send request without ack.
*
* @param request request data.
* @throws NacosException NacosException
*/
public void sendRequestNoAck(Request request) throws NacosException {
try {
//StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
@ -37,6 +38,7 @@ import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestFilters;
import com.alibaba.nacos.core.remote.RequestHandler;
@ -102,6 +104,8 @@ public class AbilityDiscovery {
private Field serverReuqestHandlersField;
private Field currentConnField;
private Field setupRequestHandlerField;
@Before
public void setup() throws NoSuchFieldException, IllegalAccessException, NacosException {
@ -131,6 +135,10 @@ public class AbilityDiscovery {
serverReuqestHandlersField = RpcClient.class.getDeclaredField("serverRequestHandlers");
serverReuqestHandlersField.setAccessible(true);
// setupRequestHandler
setupRequestHandlerField = GrpcClient.class.getDeclaredField("setupRequestHandler");
setupRequestHandlerField.setAccessible(true);
// init client
client = RpcClientFactory.createClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>());
client.serverListFactory(new ServerListFactory() {
@ -218,7 +226,7 @@ public class AbilityDiscovery {
com.alibaba.nacos.core.remote.Connection connection = connectionManager.getConnection(conn.getConnectionId());
try {
connection.request(new SetupAckRequest(), 2000L);
connection.request(new NotifySubscriberRequest(), 2000L);
} catch (NacosException e) {
// nothing to do
}
@ -244,6 +252,18 @@ public class AbilityDiscovery {
handlers.remove(ConfigQueryRequest.class.getSimpleName());
handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler);
}
@Test
public void testNegotiationTimeout() throws Exception {
Object origin = setupRequestHandlerField.get(client);
// set null for setupRequestHandlerField
setupRequestHandlerField.set(client, null);
// try connect
Connection connection = client.connectToServer(new RpcClient.ServerInfo("127.0.0.1", port));
Assert.assertNull(connection);
// recovery
setupRequestHandlerField.set(client, origin);
}
@After
public void recover() throws IllegalAccessException, NacosException {