Add the response to setup request.

This commit is contained in:
Daydreamer-ia 2022-09-08 18:23:35 +08:00
parent eedb84f2a1
commit 89b450537e
10 changed files with 184 additions and 7 deletions

View File

@ -45,6 +45,8 @@ public class ServerAbilities extends AbstractAbilityRegistry {
* *
*/ */
// put ability here, which you want current server supports // put ability here, which you want current server supports
supportedAbilities.put(AbilityKey.TEST_1, true);
supportedAbilities.put(AbilityKey.TEST_2, true);
} }
private ServerAbilities() { private ServerAbilities() {

View File

@ -30,11 +30,31 @@ public class ConnectResetRequest extends ServerRequest {
String serverPort; String serverPort;
String connectionId;
@Override @Override
public String getModule() { public String getModule() {
return INTERNAL_MODULE; return INTERNAL_MODULE;
} }
/**
* Getter method for property <tt>connectionId</tt>.
*
* @return property value of connectionId
*/
public String getConnectionId() {
return connectionId;
}
/**
* Setter method for property <tt>connectionId</tt>.
*
* @param connectionId value to be assigned to property connectionId
*/
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
/** /**
* Getter method for property <tt>serverIp</tt>. * Getter method for property <tt>serverIp</tt>.
* *

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
import static com.alibaba.nacos.api.common.Constants.Remote.INTERNAL_MODULE;
/**.
* @author Daydreamer
* @description Server tells the client that the connection is established
* @date 2022/7/12 19:21
**/
public class SetupAckRequest extends ServerRequest {
private String connectionId;
public SetupAckRequest() {
}
public SetupAckRequest(String connectionId) {
this.connectionId = connectionId;
}
public String getConnectionId() {
return connectionId;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
@Override
public String getModule() {
return INTERNAL_MODULE;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**.
* @author Daydreamer
* @description Server tells the client that the connection is established
* @date 2022/7/12 19:21
**/
public class SetupAckResponse extends Response {
}

View File

@ -433,6 +433,7 @@ public abstract class RpcClient implements Closeable {
} else { } else {
switchServerAsync(); switchServerAsync();
} }
afterReset(connectResetRequest);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -444,6 +445,15 @@ public abstract class RpcClient implements Closeable {
} }
} }
/**.
* invoke after receiving reset request
*
* @param request request for resetting
*/
protected void afterReset(ConnectResetRequest request) {
// hook for GrpcClient
}
@Override @Override
public void shutdown() throws NacosException { public void shutdown() throws NacosException {
LOGGER.info("Shutdown rpc client, set status to shutdown"); LOGGER.info("Shutdown rpc client, set status to shutdown");

View File

@ -22,12 +22,15 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc; import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc; import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest; import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse; import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse; import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.api.utils.AbilityTableUtils; import com.alibaba.nacos.api.utils.AbilityTableUtils;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.ConnectionType;
@ -48,6 +51,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -75,6 +81,11 @@ public abstract class GrpcClient extends RpcClient {
private static final long DEFAULT_KEEP_ALIVE_TIME = 6 * 60 * 1000; private static final long DEFAULT_KEEP_ALIVE_TIME = 6 * 60 * 1000;
/**.
* Block to wait setup success response
*/
private final Map<String, CountDownLatch> markForSetup = new ConcurrentHashMap<>();
@Override @Override
public ConnectionType getConnectionType() { public ConnectionType getConnectionType() {
return ConnectionType.GRPC; return ConnectionType.GRPC;
@ -85,6 +96,19 @@ public abstract class GrpcClient extends RpcClient {
*/ */
public GrpcClient(String name) { public GrpcClient(String name) {
super(name); super(name);
// register to handler setup request
registerServerRequestHandler((request) -> {
// if finish setup
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
Optional.ofNullable(markForSetup.remove((setupAckRequest.getConnectionId())))
.orElse(new CountDownLatch(1))
.countDown();
}
return new SetupAckResponse();
});
} }
/** /**
@ -247,6 +271,10 @@ public abstract class GrpcClient extends RpcClient {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8()); grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
// remove and notify
Optional.ofNullable(markForSetup.remove(grpcConn.getConnectionId()))
.orElse(new CountDownLatch(1))
.countDown();
} }
} }
@ -299,6 +327,8 @@ public abstract class GrpcClient extends RpcClient {
@Override @Override
public Connection connectToServer(ServerInfo serverInfo) { public Connection connectToServer(ServerInfo serverInfo) {
// the newest connection id
String connectionId = "";
try { try {
if (grpcExecutor == null) { if (grpcExecutor == null) {
this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp()); this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
@ -317,21 +347,25 @@ public abstract class GrpcClient extends RpcClient {
// submit ability table as soon as possible // submit ability table as soon as possible
// ability table will be null if server doesn't support ability table // ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response; ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();
AbilityTable table = new AbilityTable(); AbilityTable table = new AbilityTable();
table.setServer(true) table.setServer(true)
.setConnectionId(serverCheckResponse.getConnectionId()); .setConnectionId(connectionId);
// if not supported, it will be null // if not supported, it will be null
if (serverCheckResponse.getAbilities() != null) { if (serverCheckResponse.getAbilities() != null) {
Map<AbilityKey, Boolean> abilityTable = AbilityTableUtils Map<AbilityKey, Boolean> abilityTable = AbilityTableUtils
.getAbilityTableBy(serverCheckResponse.getAbilities(), AbilityKey.offset()); .getAbilityTableBy(serverCheckResponse.getAbilities(), AbilityKey.offset());
table.setAbility(abilityTable); table.setAbility(abilityTable);
// mark
markForSetup.put(serverCheckResponse.getConnectionId(), new CountDownLatch(1));
} }
NacosAbilityManagerHolder.getInstance().addNewTable(table); NacosAbilityManagerHolder.getInstance().addNewTable(table);
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel()); .newStub(newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); grpcConn.setConnectionId(connectionId);
//create stream request and bind connection event to this connection. //create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
@ -351,6 +385,14 @@ public abstract class GrpcClient extends RpcClient {
conSetupRequest.setServer(isServer()); conSetupRequest.setServer(isServer());
conSetupRequest.setTenant(super.getTenant()); conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest); grpcConn.sendRequest(conSetupRequest);
// wait for response
CountDownLatch synResponse = markForSetup.get(connectionId);
if (synResponse != null) {
synResponse.await(2000L, TimeUnit.MICROSECONDS);
// ensure remove, it may being reset
markForSetup.remove(connectionId);
}
// leave for adapting old version server
//wait to register connection setup //wait to register connection setup
Thread.sleep(100L); Thread.sleep(100L);
return grpcConn; return grpcConn;
@ -358,10 +400,25 @@ public abstract class GrpcClient extends RpcClient {
return null; return null;
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
Optional.ofNullable(markForSetup.remove(connectionId))
.orElse(new CountDownLatch(1))
.countDown();
} }
return null; return null;
} }
@Override
protected void afterReset(ConnectResetRequest request) {
String connectionId = request.getConnectionId();
if (connectionId != null) {
// remove and notify
Optional.ofNullable(markForSetup.remove(connectionId))
.orElse(new CountDownLatch(1))
.countDown();
}
}
/** /**
* Return whether server environment * Return whether server environment
* The same offset may refer to different functions in the client capability table and the server capability table. * The same offset may refer to different functions in the client capability table and the server capability table.

View File

@ -64,9 +64,11 @@ public class AbilityConfigs extends Subscriber<ServerConfigChangeEvent> {
serverAbilityKeys.forEach(abilityKey -> { serverAbilityKeys.forEach(abilityKey -> {
String key = PREFIX + abilityKey.getName(); String key = PREFIX + abilityKey.getName();
try { try {
// default true // scan
Boolean property = EnvUtil.getProperty(key, Boolean.class, true); Boolean property = EnvUtil.getProperty(key, Boolean.class);
if (property != null) {
newValues.put(abilityKey, property); newValues.put(abilityKey, property);
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.warn("Update ability config from env failed, use old val, ability : {} , because : {}", key, e); LOGGER.warn("Update ability config from env failed, use old val, ability : {} , because : {}", key, e);
} }

View File

@ -511,6 +511,7 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
String[] split = redirectAddress.split(Constants.COLON); String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]); connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]); connectResetRequest.setServerPort(split[1]);
connectResetRequest.setConnectionId(connectionId);
} }
try { try {
connection.request(connectResetRequest, 3000L); connection.request(connectResetRequest, 3000L);

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.AbilityTableUtils; import com.alibaba.nacos.api.utils.AbilityTableUtils;
import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.ConnectionType;
@ -134,7 +135,9 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
try { try {
Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId, Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
rejectSdkOnStarting ? " server is not started" : " server is over limited."); rejectSdkOnStarting ? " server is not started" : " server is over limited.");
connection.request(new ConnectResetRequest(), 3000L); ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setConnectionId(connectionId);
connection.request(connectResetRequest, 3000L);
connection.close(); connection.close();
} catch (Exception e) { } catch (Exception e) {
//Do nothing. //Do nothing.
@ -143,6 +146,14 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
.warn("[{}]Send connect reset request error,error={}", connectionId, e); .warn("[{}]Send connect reset request error,error={}", connectionId, e);
} }
} }
} else {
try {
// finish register, tell client has set up successfully
connection.request(new SetupAckRequest(connectionId), 3000L);
} catch (Exception e) {
// nothing to do
}
} }
} else if (parseObj instanceof Response) { } else if (parseObj instanceof Response) {

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.core.remote.grpc; package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityKey; import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc; import com.alibaba.nacos.api.grpc.auto.RequestGrpc;