Client combine ability then return to server.

This commit is contained in:
Daydreamer-ia 2022-09-19 21:44:22 +08:00
parent 4cff58da13
commit 5cbfc524a9
2 changed files with 31 additions and 5 deletions

View File

@ -30,6 +30,7 @@ import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse; import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.api.remote.response.SetupAckResponse; import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.Connection;
@ -48,6 +49,8 @@ import io.grpc.stub.StreamObserver;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -353,7 +356,7 @@ public abstract class GrpcClient extends RpcClient {
grpcConn.setConnectionId(connectionId); grpcConn.setConnectionId(connectionId);
// if not supported, it will be null // if not supported, it will be null
if (serverCheckResponse.getAbilities() != null) { if (serverCheckResponse.getAbilities() != null) {
Map<AbilityKey, Boolean> abilityTable = AbilityKey.mapEnum(serverCheckResponse.getAbilities()); Map<AbilityKey, Boolean> abilityTable = mapAndFilter(serverCheckResponse.getAbilities());
// mark // mark
markForSetup.put(serverCheckResponse.getConnectionId(), new CountDownLatch(1)); markForSetup.put(serverCheckResponse.getConnectionId(), new CountDownLatch(1));
// combine with current node abilities // combine with current node abilities
@ -375,7 +378,7 @@ public abstract class GrpcClient extends RpcClient {
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setLabels(super.getLabels());
// set ability table // set ability table
conSetupRequest.setAbilityTable(AbilityKey.mapStr(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities())); conSetupRequest.setAbilityTable(serverCheckResponse.getAbilities());
conSetupRequest.setTenant(super.getTenant()); conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest); grpcConn.sendRequest(conSetupRequest);
// wait for response // wait for response
@ -401,6 +404,32 @@ public abstract class GrpcClient extends RpcClient {
return null; return null;
} }
/**.
* filter the ability current node not support, map to enum
*
* @param originClientAbilities origin client abilities
* @return enum map
*/
private Map<AbilityKey, Boolean> mapAndFilter(Map<String, Boolean> originClientAbilities) {
Map<AbilityKey, Boolean> res = new HashMap<>(originClientAbilities.size());
Iterator<Map.Entry<String, Boolean>> iterator = originClientAbilities.entrySet().iterator();
// filter ability current node not support
AbstractAbilityControlManager instance = NacosAbilityManagerHolder.getInstance();
while (iterator.hasNext()) {
Map.Entry<String, Boolean> next = iterator.next();
AbilityKey anEnum = AbilityKey.getEnum(next.getKey());
if (anEnum != null) {
// whether support
boolean isRunning = instance.isCurrentNodeAbilityRunning(anEnum) && next.getValue();
res.put(anEnum, isRunning);
// if not support
originClientAbilities.replace(next.getKey(), isRunning);
}
}
return res;
}
@Override @Override
protected void afterReset(ConnectResetRequest request) { protected void afterReset(ConnectResetRequest request) {
String connectionId = request.getConnectionId(); String connectionId = request.getConnectionId();

View File

@ -128,9 +128,6 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
// map to table // map to table
Map<AbilityKey, Boolean> clientAbilities = AbilityKey Map<AbilityKey, Boolean> clientAbilities = AbilityKey
.mapEnum(setUpRequest.getAbilityTable()); .mapEnum(setUpRequest.getAbilityTable());
// combine with current node abilities
// in order to get abilities current node provides
NacosAbilityManagerHolder.getInstance().combine(clientAbilities);
connection.setAbilityTable(clientAbilities); connection.setAbilityTable(clientAbilities);
} }
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();