refactor AbilityControlManager with multiple ability mode.
This commit is contained in:
parent
6bce3bda7b
commit
cb70aaf3c7
@ -0,0 +1,41 @@
|
||||
package com.alibaba.nacos.api.ability.register.impl;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* It is used to register cluster client abilities.
|
||||
*
|
||||
* @author Daydreamer
|
||||
**/
|
||||
public class ClusterClientAbilities extends AbstractAbilityRegistry {
|
||||
|
||||
private static final ClusterClientAbilities INSTANCE = new ClusterClientAbilities();
|
||||
|
||||
{
|
||||
/*
|
||||
* example:
|
||||
* There is a function named "compression".
|
||||
* The key is from <p>AbilityKey</p>, the value is whether turn on.
|
||||
*
|
||||
* You can add a new public field in <p>AbilityKey</p> like:
|
||||
* <code>DATA_COMPRESSION("compression", "description about this ability")</code>
|
||||
*
|
||||
* And then you need to declare whether turn on in the ability table, you can:
|
||||
* <code>supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true);</code> means that current client support compression.
|
||||
*
|
||||
*/
|
||||
// put ability here, which you want current client supports
|
||||
}
|
||||
|
||||
/**
|
||||
* get static ability current cluster client supports.
|
||||
*
|
||||
* @return static ability
|
||||
*/
|
||||
public static Map<AbilityKey, Boolean> getStaticAbilities() {
|
||||
return INSTANCE.getSupportedAbilities();
|
||||
}
|
||||
}
|
@ -21,14 +21,15 @@ import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**.
|
||||
/**
|
||||
* It is used to register client abilities.
|
||||
*
|
||||
* @author Daydreamer
|
||||
* @description It is used to register client abilities.
|
||||
* @date 2022/8/31 12:32
|
||||
**/
|
||||
public class ClientAbilities extends AbstractAbilityRegistry {
|
||||
public class SdkClientAbilities extends AbstractAbilityRegistry {
|
||||
|
||||
private static final ClientAbilities INSTANCE = new ClientAbilities();
|
||||
private static final SdkClientAbilities INSTANCE = new SdkClientAbilities();
|
||||
|
||||
{
|
||||
/*
|
@ -21,9 +21,10 @@ import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**.
|
||||
/**
|
||||
* It is used to register server abilities.
|
||||
*
|
||||
* @author Daydreamer
|
||||
* @description It is used to register server abilities.
|
||||
* @date 2022/8/31 12:32
|
||||
**/
|
||||
public class ServerAbilities extends AbstractAbilityRegistry {
|
||||
|
@ -18,9 +18,10 @@ package com.alibaba.nacos.client.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.ability.register.impl.ClientAbilities;
|
||||
import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities;
|
||||
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**.
|
||||
@ -34,13 +35,10 @@ public class ClientAbilityControlManager extends AbstractAbilityControlManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<AbilityKey, Boolean> initCurrentNodeAbilities() {
|
||||
return ClientAbilities.getStaticAbilities();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbilityMode initializeMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
|
||||
Map<AbilityMode, Map<AbilityKey, Boolean>> abilities = new HashMap<>();
|
||||
abilities.put(AbilityMode.SDK_CLIENT, SdkClientAbilities.getStaticAbilities());
|
||||
return abilities;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,17 +18,18 @@ package com.alibaba.nacos.common.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
|
||||
import com.alibaba.nacos.api.ability.initializer.AbilityPostProcessor;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@ -41,15 +42,14 @@ public abstract class AbstractAbilityControlManager {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class);
|
||||
|
||||
/**.
|
||||
* ability current node running
|
||||
/**
|
||||
* current node support abilities.
|
||||
*/
|
||||
protected final Map<String, Boolean> currentRunningAbility = new ConcurrentHashMap<>();
|
||||
protected final Map<AbilityMode, Map<String, Boolean>> currentNodeAbilities = new ConcurrentHashMap<>();
|
||||
|
||||
protected AbstractAbilityControlManager() {
|
||||
ThreadUtils.addShutdownHook(this::destroy);
|
||||
NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384);
|
||||
currentRunningAbility.putAll(getAbilityTable());
|
||||
initAbilityTable();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -57,16 +57,37 @@ public abstract class AbstractAbilityControlManager {
|
||||
*
|
||||
* @return abilities
|
||||
*/
|
||||
private Map<String, Boolean> getAbilityTable() {
|
||||
private void initAbilityTable() {
|
||||
LOGGER.info("Ready to get current node abilities...");
|
||||
// get processors
|
||||
Collection<AbilityPostProcessor> processors = NacosServiceLoader.load(AbilityPostProcessor.class);
|
||||
Map<AbilityKey, Boolean> abilities = initCurrentNodeAbilities();
|
||||
AbilityMode mode = initializeMode();
|
||||
Map<AbilityMode, Map<AbilityKey, Boolean>> abilities = initCurrentNodeAbilities();
|
||||
// get abilities
|
||||
for (AbilityPostProcessor processor : processors) {
|
||||
processor.process(mode, abilities);
|
||||
for (AbilityMode mode : AbilityMode.values()) {
|
||||
Map<AbilityKey, Boolean> abilitiesTable = abilities.get(mode);
|
||||
if (abilitiesTable == null) {
|
||||
continue;
|
||||
}
|
||||
// check whether exist error key
|
||||
// check for developer
|
||||
for (AbilityKey abilityKey : abilitiesTable.keySet()) {
|
||||
if (!mode.equals(abilityKey.getMode())) {
|
||||
LOGGER.error("You should not contain a other mode: {} in a specify mode: {} abilities set, error key: {}, please check again.",
|
||||
abilityKey.getMode(), mode, abilityKey);
|
||||
throw new IllegalStateException("Except mode: " + mode + " but " + abilityKey + " mode: " + abilityKey.getMode() + ", please check again.");
|
||||
}
|
||||
}
|
||||
Collection<AbilityPostProcessor> processors = NacosServiceLoader.load(AbilityPostProcessor.class);
|
||||
for (AbilityPostProcessor processor : processors) {
|
||||
processor.process(mode, abilitiesTable);
|
||||
}
|
||||
}
|
||||
return AbilityKey.mapStr(abilities);
|
||||
// init
|
||||
Set<AbilityMode> abilityModes = abilities.keySet();
|
||||
LOGGER.info("Ready to initialize current node abilities, support modes: {}", abilityModes);
|
||||
for (AbilityMode abilityMode : abilityModes) {
|
||||
this.currentNodeAbilities.put(abilityMode, new ConcurrentHashMap<>(AbilityKey.mapStr(abilities.get(abilityMode))));
|
||||
}
|
||||
LOGGER.info("Initialize current abilities finish...");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -75,14 +96,21 @@ public abstract class AbstractAbilityControlManager {
|
||||
* @param abilityKey ability key{@link AbilityKey}
|
||||
*/
|
||||
public void enableCurrentNodeAbility(AbilityKey abilityKey) {
|
||||
doTurn(this.currentRunningAbility, abilityKey, true);
|
||||
Map<String, Boolean> abilities = this.currentNodeAbilities.get(abilityKey.getMode());
|
||||
if (abilities != null) {
|
||||
doTurn(abilities, abilityKey, true, abilityKey.getMode());
|
||||
}
|
||||
}
|
||||
|
||||
protected void doTurn(Map<String, Boolean> abilities, AbilityKey key, boolean turn) {
|
||||
protected void doTurn(Map<String, Boolean> abilities, AbilityKey key, boolean turn, AbilityMode mode) {
|
||||
if (!key.getMode().equals(mode)) {
|
||||
throw new IllegalStateException("Except " + mode + " but " + key.getMode());
|
||||
}
|
||||
LOGGER.info("Turn current node ability: {}, turn: {}", key, turn);
|
||||
abilities.put(key.getName(), turn);
|
||||
// notify event
|
||||
AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent();
|
||||
abilityUpdateEvent.setTable(Collections.unmodifiableMap(currentRunningAbility));
|
||||
abilityUpdateEvent.setTable(Collections.unmodifiableMap(abilities));
|
||||
abilityUpdateEvent.isOn = turn;
|
||||
abilityUpdateEvent.abilityKey = key;
|
||||
NotifyCenter.publishEvent(abilityUpdateEvent);
|
||||
@ -94,7 +122,10 @@ public abstract class AbstractAbilityControlManager {
|
||||
* @param abilityKey ability key
|
||||
*/
|
||||
public void disableCurrentNodeAbility(AbilityKey abilityKey) {
|
||||
doTurn(this.currentRunningAbility, abilityKey, false);
|
||||
Map<String, Boolean> abilities = this.currentNodeAbilities.get(abilityKey.getMode());
|
||||
if (abilities != null) {
|
||||
doTurn(abilities, abilityKey, false, abilityKey.getMode());
|
||||
}
|
||||
}
|
||||
|
||||
/**.
|
||||
@ -103,8 +134,15 @@ public abstract class AbstractAbilityControlManager {
|
||||
* @param abilityKey ability key from {@link AbilityKey}
|
||||
* @return whether support
|
||||
*/
|
||||
public boolean isCurrentNodeAbilityRunning(AbilityKey abilityKey) {
|
||||
return currentRunningAbility.getOrDefault(abilityKey.getName(), false);
|
||||
public AbilityStatus isCurrentNodeAbilityRunning(AbilityKey abilityKey) {
|
||||
Map<String, Boolean> abilities = currentNodeAbilities.get(abilityKey.getMode());
|
||||
if (abilities != null) {
|
||||
Boolean support = abilities.get(abilityKey.getName());
|
||||
if (support != null) {
|
||||
return support ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
|
||||
}
|
||||
}
|
||||
return AbilityStatus.UNKNOWN;
|
||||
}
|
||||
|
||||
/**.
|
||||
@ -112,39 +150,19 @@ public abstract class AbstractAbilityControlManager {
|
||||
*
|
||||
* @return current node abilities
|
||||
*/
|
||||
protected abstract Map<AbilityKey, Boolean> initCurrentNodeAbilities();
|
||||
|
||||
/**
|
||||
* initialize mode.
|
||||
*
|
||||
* @return mode.
|
||||
*/
|
||||
protected abstract AbilityMode initializeMode();
|
||||
protected abstract Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities();
|
||||
|
||||
/**.
|
||||
* Return the abilities current node
|
||||
*
|
||||
* @return current abilities
|
||||
*/
|
||||
public Map<String, Boolean> getCurrentNodeAbilities() {
|
||||
return Collections.unmodifiableMap(currentRunningAbility);
|
||||
}
|
||||
|
||||
/**.
|
||||
* Close
|
||||
*/
|
||||
public final void destroy() {
|
||||
LOGGER.warn("[DefaultAbilityControlManager] - Start destroying...");
|
||||
// hook
|
||||
doDestroy();
|
||||
LOGGER.warn("[DefaultAbilityControlManager] - Destruction of the end");
|
||||
}
|
||||
|
||||
/**.
|
||||
* hook for subclass
|
||||
*/
|
||||
protected void doDestroy() {
|
||||
// for server ability manager
|
||||
public Map<String, Boolean> getCurrentNodeAbilities(AbilityMode mode) {
|
||||
Map<String, Boolean> abilities = currentNodeAbilities.get(mode);
|
||||
if (abilities != null) {
|
||||
return Collections.unmodifiableMap(abilities);
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.common.remote.client.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
|
||||
import com.alibaba.nacos.api.grpc.auto.Payload;
|
||||
@ -62,9 +63,9 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -390,7 +391,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
|
||||
conSetupRequest.setLabels(super.getLabels());
|
||||
// set ability table
|
||||
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities());
|
||||
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
|
||||
conSetupRequest.setTenant(super.getTenant());
|
||||
grpcConn.sendRequest(conSetupRequest);
|
||||
// wait for response
|
||||
@ -413,6 +414,13 @@ public abstract class GrpcClient extends RpcClient {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* ability mode: sdk client or cluster client.
|
||||
*
|
||||
* @return mode
|
||||
*/
|
||||
protected abstract AbilityMode abilityMode();
|
||||
|
||||
@Override
|
||||
protected void afterReset(ConnectResetRequest request) {
|
||||
recAbilityContext.release(null);
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.common.remote.client.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
|
||||
|
||||
@ -75,6 +76,11 @@ public class GrpcClusterClient extends GrpcClient {
|
||||
super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.CLUSTER_CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY,
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.common.remote.client.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
|
||||
|
||||
@ -65,6 +66,11 @@ public class GrpcSdkClient extends GrpcClient {
|
||||
super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor.
|
||||
*
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.common.remote.client.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClient;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
|
||||
@ -70,6 +71,11 @@ public class GrpcClientTest {
|
||||
public void setUp() throws Exception {
|
||||
when(clientConfig.name()).thenReturn("testClient");
|
||||
grpcClient = spy(new GrpcClient(clientConfig) {
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 1000;
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.common.remote.client.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -33,6 +34,12 @@ public class GrpcClientTlsTest extends GrpcClientTest {
|
||||
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
|
||||
|
||||
grpcClient = new GrpcClient(clientConfig) {
|
||||
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 1000;
|
||||
@ -51,6 +58,12 @@ public class GrpcClientTlsTest extends GrpcClientTest {
|
||||
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
|
||||
|
||||
grpcClient = new GrpcClient(clientConfig) {
|
||||
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 1000;
|
||||
@ -72,6 +85,11 @@ public class GrpcClientTlsTest extends GrpcClientTest {
|
||||
when(clientConfig.name()).thenReturn("testClient");
|
||||
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
|
||||
grpcClient = new GrpcClient(clientConfig) {
|
||||
@Override
|
||||
protected AbilityMode abilityMode() {
|
||||
return AbilityMode.SDK_CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rpcPortOffset() {
|
||||
return 1000;
|
||||
|
@ -18,6 +18,8 @@ package com.alibaba.nacos.core.ability.control;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.ability.register.impl.ClusterClientAbilities;
|
||||
import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities;
|
||||
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
|
||||
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
|
||||
import com.alibaba.nacos.core.ability.config.AbilityConfigs;
|
||||
@ -39,7 +41,13 @@ public class ServerAbilityControlManager extends AbstractAbilityControlManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<AbilityKey, Boolean> initCurrentNodeAbilities() {
|
||||
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
|
||||
// init client abilities
|
||||
Map<AbilityMode, Map<AbilityKey, Boolean>> res = new HashMap<>();
|
||||
res.put(AbilityMode.CLUSTER_CLIENT, initClusterClientAbilities());
|
||||
res.put(AbilityMode.SDK_CLIENT, initSdkClientAbilities());
|
||||
|
||||
// init server abilities
|
||||
// static abilities
|
||||
Map<AbilityKey, Boolean> staticAbilities = ServerAbilities.getStaticAbilities();
|
||||
// all function server can support
|
||||
@ -64,12 +72,25 @@ public class ServerAbilityControlManager extends AbstractAbilityControlManager {
|
||||
});
|
||||
// load from ServerAbilities
|
||||
unIncludedInConfig.forEach(abilityKey -> abilityTable.put(abilityKey, staticAbilities.get(abilityKey)));
|
||||
return abilityTable;
|
||||
|
||||
res.put(AbilityMode.SERVER, abilityTable);
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbilityMode initializeMode() {
|
||||
return AbilityMode.SERVER;
|
||||
/**
|
||||
* init cluster client abilities.
|
||||
*/
|
||||
private Map<AbilityKey, Boolean> initClusterClientAbilities() {
|
||||
// static abilities
|
||||
return ClusterClientAbilities.getStaticAbilities();
|
||||
}
|
||||
|
||||
/**
|
||||
* init sdk client abilities.
|
||||
*/
|
||||
private Map<AbilityKey, Boolean> initSdkClientAbilities() {
|
||||
// static abilities
|
||||
return SdkClientAbilities.getStaticAbilities();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,7 +142,6 @@ public class StartingApplicationListener implements NacosApplicationListener {
|
||||
ThreadPoolManager.shutdown();
|
||||
WatchFileCenter.shutdown();
|
||||
NotifyCenter.shutdown();
|
||||
NacosAbilityManagerHolder.getInstance().destroy();
|
||||
|
||||
closeExecutor();
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.core.remote.grpc;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
|
||||
import com.alibaba.nacos.api.grpc.auto.Payload;
|
||||
@ -138,7 +139,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
|
||||
} else {
|
||||
try {
|
||||
// finish register, tell client has set up successfully
|
||||
connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()), 3000L);
|
||||
connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(AbilityMode.SERVER)), 3000L);
|
||||
} catch (Exception e) {
|
||||
// nothing to do
|
||||
|
||||
|
@ -18,6 +18,7 @@ package com.alibaba.nacos.core.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -41,18 +42,18 @@ public class AbilityControlManagerTest {
|
||||
|
||||
@Test
|
||||
public void testCurrentNodeAbility() {
|
||||
Set<String> keySet = serverAbilityControlManager.getCurrentNodeAbilities().keySet();
|
||||
Set<String> keySet = serverAbilityControlManager.getCurrentNodeAbilities(AbilityMode.SERVER).keySet();
|
||||
// diable all
|
||||
keySet.forEach(key -> serverAbilityControlManager.disableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key)));
|
||||
// get all
|
||||
keySet.forEach(key -> {
|
||||
Assert.assertFalse(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)));
|
||||
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED);
|
||||
});
|
||||
// enable all
|
||||
keySet.forEach(key -> serverAbilityControlManager.enableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key)));
|
||||
// get all
|
||||
keySet.forEach(key -> {
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)));
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package com.alibaba.nacos.core.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.common.JustForTest;
|
||||
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
|
||||
|
||||
@ -25,8 +26,8 @@ public class TestServerAbilityControlManager extends ServerAbilityControlManager
|
||||
|
||||
@JustForTest
|
||||
public void setCurrentSupportingAbility(Map<String, Boolean> ability) {
|
||||
currentRunningAbility.clear();
|
||||
currentRunningAbility.putAll(ability);
|
||||
currentNodeAbilities.get(AbilityMode.SERVER).clear();
|
||||
currentNodeAbilities.get(AbilityMode.SERVER).putAll(ability);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.core.ability.config;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
|
||||
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
|
||||
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
|
||||
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
|
||||
@ -92,16 +93,16 @@ public class AbilityConfigsTest {
|
||||
fill();
|
||||
ServerAbilityControlManager manager = new ServerAbilityControlManager();
|
||||
// config has higher priority
|
||||
Assert.assertTrue(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertFalse(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2));
|
||||
Assert.assertEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
Assert.assertNotEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
|
||||
// clear
|
||||
currentAbilities.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit() {
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2));
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -110,24 +111,24 @@ public class AbilityConfigsTest {
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString());
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.TRUE.toString());
|
||||
abilityConfigs.onEvent(new ServerConfigChangeEvent());
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2));
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
|
||||
|
||||
// test change
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString());
|
||||
abilityConfigs.onEvent(new ServerConfigChangeEvent());
|
||||
Assert.assertFalse(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2));
|
||||
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
|
||||
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString());
|
||||
abilityConfigs.onEvent(new ServerConfigChangeEvent());
|
||||
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString());
|
||||
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.FALSE.toString());
|
||||
abilityConfigs.onEvent(new ServerConfigChangeEvent());
|
||||
Assert.assertFalse(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1));
|
||||
Assert.assertFalse(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2));
|
||||
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
|
||||
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -64,6 +64,8 @@ public class AbilityDiscovery {
|
||||
|
||||
private RpcClient client;
|
||||
|
||||
private RpcClient clusterClient;
|
||||
|
||||
private ConfigService configService;
|
||||
|
||||
private AbstractAbilityControlManager oldInstance;
|
||||
@ -75,10 +77,14 @@ public class AbilityDiscovery {
|
||||
|
||||
private volatile boolean clientSuccess = false;
|
||||
|
||||
private volatile boolean clusterSuccess = false;
|
||||
|
||||
private Field abstractAbilityControlManager;
|
||||
|
||||
private Field registryHandlerFields;
|
||||
|
||||
private Field serverReuqestHandlersField;
|
||||
|
||||
private Field currentConnField;
|
||||
|
||||
@Before
|
||||
@ -105,6 +111,10 @@ public class AbilityDiscovery {
|
||||
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port);
|
||||
configService = NacosFactory.createConfigService(properties);
|
||||
|
||||
// server request handler
|
||||
serverReuqestHandlersField = RpcClient.class.getDeclaredField("serverRequestHandlers");
|
||||
serverReuqestHandlersField.setAccessible(true);
|
||||
|
||||
// init client
|
||||
client = RpcClientFactory.createClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>());
|
||||
client.serverListFactory(new ServerListFactory() {
|
||||
@ -125,6 +135,26 @@ public class AbilityDiscovery {
|
||||
});
|
||||
// connect to server
|
||||
client.start();
|
||||
|
||||
clusterClient = RpcClientFactory.createClusterClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>());
|
||||
clusterClient.serverListFactory(new ServerListFactory() {
|
||||
@Override
|
||||
public String genNextServer() {
|
||||
return "127.0.0.1:" + port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCurrentServer() {
|
||||
return "127.0.0.1:" + port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getServerList() {
|
||||
return Collections.singletonList("127.0.0.1:" + port);
|
||||
}
|
||||
});
|
||||
// connect to server
|
||||
clusterClient.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -153,6 +183,8 @@ public class AbilityDiscovery {
|
||||
|
||||
@Test
|
||||
public void testClientJudge() throws Exception {
|
||||
List<ServerRequestHandler> handlers = (List<ServerRequestHandler>) serverReuqestHandlersField.get(client);
|
||||
handlers.clear();
|
||||
// register
|
||||
client.registerServerRequestHandler(new ServerRequestHandler() {
|
||||
@Override
|
||||
@ -180,6 +212,23 @@ public class AbilityDiscovery {
|
||||
Assert.assertTrue(clientSuccess);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterClient() throws IllegalAccessException, NacosException, InterruptedException, NoSuchFieldException {
|
||||
Map<String, RequestHandler> handlers = (Map<String, RequestHandler>) registryHandlerFields
|
||||
.get(requestHandlerRegistry);
|
||||
|
||||
// set handler
|
||||
RequestHandler oldRequestHandler = handlers.remove(ConfigQueryRequest.class.getSimpleName());
|
||||
handlers.put(ConfigQueryRequest.class.getSimpleName(), new ClusterClientRequestHandler(filters));
|
||||
configService.getConfig("test", "DEFAULT_GROUP", 2000);
|
||||
// wait server invoke
|
||||
Thread.sleep(3000);
|
||||
Assert.assertTrue(clusterSuccess);
|
||||
// recover
|
||||
handlers.remove(ConfigQueryRequest.class.getSimpleName());
|
||||
handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler);
|
||||
}
|
||||
|
||||
@After
|
||||
public void recover() throws IllegalAccessException, NacosException {
|
||||
abstractAbilityControlManager.set(NacosAbilityManagerHolder.class, oldInstance);
|
||||
@ -199,11 +248,30 @@ public class AbilityDiscovery {
|
||||
|
||||
@Override
|
||||
public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException {
|
||||
if (meta.getConnectionAbility(AbilityKey.SERVER_TEST_1).equals(AbilityStatus.SUPPORTED) && meta
|
||||
.getConnectionAbility(AbilityKey.SERVER_TEST_2).equals(AbilityStatus.NOT_SUPPORTED)) {
|
||||
if (meta.getConnectionAbility(AbilityKey.SDK_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) {
|
||||
serverSuccess = true;
|
||||
}
|
||||
return new ConfigQueryResponse();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* just to test ability.
|
||||
*/
|
||||
class ClusterClientRequestHandler extends RequestHandler<ConfigQueryRequest, ConfigQueryResponse> {
|
||||
|
||||
public ClusterClientRequestHandler(RequestFilters requestFilters) throws NoSuchFieldException, IllegalAccessException {
|
||||
Field declaredField = RequestHandler.class.getDeclaredField("requestFilters");
|
||||
declaredField.setAccessible(true);
|
||||
declaredField.set(this, requestFilters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException {
|
||||
if (meta.getConnectionAbility(AbilityKey.CLUSTER_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) {
|
||||
clusterSuccess = true;
|
||||
}
|
||||
return new ConfigQueryResponse();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.alibaba.nacos.test.ability.component;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityMode;
|
||||
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -9,10 +10,20 @@ import java.util.Map;
|
||||
public class TestServerAbilityControlManager extends ServerAbilityControlManager {
|
||||
|
||||
@Override
|
||||
protected Map<AbilityKey, Boolean> initCurrentNodeAbilities() {
|
||||
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
|
||||
Map<AbilityKey, Boolean> map = new HashMap<>();
|
||||
map.put(AbilityKey.SERVER_TEST_1, true);
|
||||
map.put(AbilityKey.SERVER_TEST_2, false);
|
||||
return map;
|
||||
HashMap res = new HashMap<>();
|
||||
res.put(AbilityMode.SERVER, map);
|
||||
|
||||
Map<AbilityKey, Boolean> map1 = new HashMap<>();
|
||||
map1.put(AbilityKey.SDK_CLIENT_TEST_1, true);
|
||||
res.put(AbilityMode.SDK_CLIENT, map1);
|
||||
|
||||
Map<AbilityKey, Boolean> map2 = new HashMap<>();
|
||||
map2.put(AbilityKey.CLUSTER_CLIENT_TEST_1, true);
|
||||
res.put(AbilityMode.CLUSTER_CLIENT, map2);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user