Save the ability table to Connection.

This commit is contained in:
Daydreamer-ia 2022-09-10 13:53:48 +08:00
parent 02bd4872ca
commit e3a3139c33
22 changed files with 376 additions and 1293 deletions

View File

@ -17,53 +17,26 @@
package com.alibaba.nacos.client.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.api.ability.register.impl.ClientAbilities;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
/**.
* @author Daydreamer
* @description {@link AbstractAbilityControlManager} for nacos-client.
* @date 2022/7/13 13:38
**/
public class ClientAbilityControlManager extends DefaultAbilityControlManager {
public class ClientAbilityControlManager extends AbstractAbilityControlManager {
public ClientAbilityControlManager() {
}
@Override
protected Map<AbilityKey, Boolean> getCurrentNodeSupportAbility() {
protected Map<AbilityKey, Boolean> initCurrentNodeAbilities() {
return ClientAbilities.getStaticAbilities();
}
@Override
public AbilityStatus isSupport(String connectionId, AbilityKey abilityKey) {
AbilityTable abilityTable = nodeAbilityTable.get(connectionId);
if (abilityTable == null) {
return AbilityStatus.UNKNOWN;
}
Boolean isSupport = Optional.ofNullable(abilityTable.getAbility())
.orElse(Collections.emptyMap())
.getOrDefault(abilityKey, false);
return isSupport ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
@Override
protected void add(AbilityTable table) {
// nothing to do
}
@Override
protected void remove(String connectionId) {
// nothing to do
}
@Override
public int getPriority() {
// if server ability manager exist, you should choose the server one

View File

@ -17,8 +17,6 @@
package com.alibaba.nacos.client.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
import org.junit.Assert;
import org.junit.Before;
@ -43,35 +41,6 @@ public class AbilityControlManagerTest {
clientAbilityControlManager.setCurrentSupportingAbility(newTable);
}
@Test
public void testClientAdd() {
Map<AbilityKey, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.TEST_2, true);
newTable.put(AbilityKey.TEST_1, true);
AbilityTable table = new AbilityTable();
table.setConnectionId("test-00001");
table.setAbility(newTable);
table.setServer(true);
clientAbilityControlManager.addNewTable(table);
Assert.assertEquals(AbilityStatus.NOT_SUPPORTED, clientAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_2));
Assert.assertEquals(AbilityStatus.SUPPORTED, clientAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_1));
}
@Test
public void testClientRemove() {
Map<AbilityKey, Boolean> clientTa = new HashMap<>();
clientTa.put(AbilityKey.TEST_2, true);
clientTa.put(AbilityKey.TEST_1, false);
AbilityTable clientTable = new AbilityTable();
clientTable.setConnectionId("test-01111");
clientTable.setAbility(clientTa);
clientTable.setServer(true);
clientAbilityControlManager.addNewTable(clientTable);
Assert.assertTrue(clientAbilityControlManager.contains(clientTable.getConnectionId()));
clientAbilityControlManager.removeTable("test-01111");
Assert.assertFalse(clientAbilityControlManager.contains(clientTable.getConnectionId()));
}
@Test
public void testComponent() throws InterruptedException {
enabled = 0;
@ -118,16 +87,16 @@ public class AbilityControlManagerTest {
public void testPriority() throws InterruptedException {
TestClientAbilityControlManager testClientAbilityControlManager = new TestClientAbilityControlManager();
AbilityKey key = AbilityKey.TEST_1;
TestPriority clusterHandlerMapping1 = new TestPriority("1");
TestPriority clusterHandlerMapping2 = new TestPriority("2");
TestPriority clusterHandlerMapping3 = new TestPriority("3");
TestPriority handlerMapping1 = new TestPriority("1");
TestPriority handlerMapping2 = new TestPriority("2");
TestPriority handlerMapping3 = new TestPriority("3");
// first one, invoke enable()
testClientAbilityControlManager.registerComponent(key, clusterHandlerMapping2, 128);
testClientAbilityControlManager.registerComponent(key, handlerMapping2, 128);
// last one, invoke enable()
testClientAbilityControlManager.registerComponent(key, clusterHandlerMapping3);
testClientAbilityControlManager.registerComponent(key, handlerMapping3);
// second one, invoke enable()
testClientAbilityControlManager.registerComponent(key, clusterHandlerMapping1, 12);
// trigger cluster
testClientAbilityControlManager.registerComponent(key, handlerMapping1, 12);
// trigger
testClientAbilityControlManager.trigger(key);
Assert.assertEquals(3, testClientAbilityControlManager.getHandlerMapping(key).size());
// wait for invoking

View File

@ -18,268 +18,381 @@ package com.alibaba.nacos.common.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.inter.AbilityControlManager;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.MapUtil;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**.
* @author Daydreamer
* @description Base class for ability control. It can only be used internally by Nacos.It showld be sington.
* @description It is a capability control center, manager current node abilities or other control.
* @date 2022/7/12 19:18
**/
@SuppressWarnings("all")
public abstract class AbstractAbilityControlManager implements AbilityControlManager {
public abstract class AbstractAbilityControlManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class);
/**
* Abilities current supporting
* <p>
* key: ability key from {@link AbstractAbilityRegistry}
* value: whether to turn on
/**.
* These handlers will be invoked when the flag of ability change key:
* ability key from {@link com.alibaba.nacos.api.ability.constant.AbilityKey} value:
* components who want to be invoked if its interested ability turn on/off
*/
private final Map<AbilityKey, List<HandlerWithPriority>> handlerMappings = new ConcurrentHashMap<>();
/**.
* run for HandlerMapping
*/
private final Executor simpleThreadPool = ExecutorFactory.newSingleExecutorService();
/**.
* ability current node running
*/
protected final Map<AbilityKey, Boolean> currentRunningAbility = new ConcurrentHashMap<>();
/**
* Ability table collections
* <p>
* key: connectionId
* value: AbilityTable
*/
protected final Map<String, AbilityTable> nodeAbilityTable = new ConcurrentHashMap<>();
private final ReentrantLock lockForAbilityTable = new ReentrantLock();
private final ReentrantLock lockForHandlerMappings = new ReentrantLock();
protected AbstractAbilityControlManager() {
// register events
registerAbilityEvent();
// put abilities
currentRunningAbility.putAll(getCurrentNodeSupportAbility());
// initialize
init();
ThreadUtils.addShutdownHook(this::destroy);
NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384);
currentRunningAbility.putAll(initCurrentNodeAbilities());
}
/**
* This is a hook for subclass to init current node ability
*
* @return current node ability
*/
protected abstract Map<AbilityKey, Boolean> getCurrentNodeSupportAbility();
private void registerAbilityEvent(){
// register events
NotifyCenter.registerToPublisher(AbilityComeEvent.class, 16384);
NotifyCenter.registerToPublisher(AbilityExpiredEvent.class, 16384);
}
/**
* Whether the ability current node supporting is running. Return false if current node doesn't support.
* . Turn on the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return is running
* @return if turn success
*/
@Override
public boolean enableCurrentNodeAbility(AbilityKey abilityKey) {
return doTurn(true, abilityKey);
}
/**
* . Turn off the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return if turn success
*/
public boolean disableCurrentNodeAbility(AbilityKey abilityKey) {
return doTurn(false, abilityKey);
}
public boolean isCurrentNodeAbilityRunning(AbilityKey abilityKey) {
return currentRunningAbility.getOrDefault(abilityKey, false);
}
/**
* Register a new ability table.
* Init current node abilities
*
* @param table the ability table.
* @return current node abilities
*/
@Override
public final void addNewTable(AbilityTable table) {
// id should not be null
String connectionId = table.getConnectionId();
// if exists
if (contains(connectionId) || connectionId == null) {
protected abstract Map<AbilityKey, Boolean> initCurrentNodeAbilities();
/**
* Return the abilities current node
*
* @return current abilities
*/
public Map<AbilityKey, Boolean> getCurrentNodeAbilities() {
return Collections.unmodifiableMap(currentRunningAbility);
}
/**
* . Turn on/off the ability of current node
*
* @param isOn is on
* @param abilityKey ability key from {@link AbilityKey}
* @return if turn success
*/
private boolean doTurn(boolean isOn, AbilityKey abilityKey) {
Boolean isEnabled = currentRunningAbility.get(abilityKey);
// if not supporting this key
if (isEnabled == null) {
LOGGER.warn("[AbilityControlManager] Attempt to turn on/off a not existed ability!");
return false;
} else if (isOn == isEnabled) {
// if already turn on/off
return true;
}
// turn on/off
currentRunningAbility.put(abilityKey, isOn);
// handler mappings
triggerHandlerMappingAsyn(abilityKey, isOn, this.handlerMappings);
// notify event
AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent();
abilityUpdateEvent.setTable(Collections.unmodifiableMap(currentRunningAbility));
abilityUpdateEvent.isOn = isOn;
abilityUpdateEvent.abilityKey = abilityKey;
NotifyCenter.publishEvent(abilityUpdateEvent);
return true;
}
/**
* Register the component which is managed by {@link AbstractAbilityControlManager}. if you are hoping that a
* component will be invoked when turn on/off the ability whose key is <p>abilityKey</p>.
*
* @param abilityKey component key.
* @param priority the higher the priority is, the faster it will be called.
* @param handlerMapping component instance.
*/
public void registerComponent(AbilityKey abilityKey, HandlerMapping handlerMapping, int priority) {
doRegisterComponent(abilityKey, handlerMapping, this.handlerMappings, lockForHandlerMappings, priority, currentRunningAbility);
}
/**
* Register component with the lowest priority
*
* @param abilityKey ability key
* @param handlerMapping handler
*/
public void registerComponent(AbilityKey abilityKey, HandlerMapping handlerMapping) {
registerComponent(abilityKey, handlerMapping, -1);
}
/**
* Remove the specific type handler for a certain ability
*
* @param abilityKey ability key
* @param handlerMappingClazz type
* @return the count of handlers are removed
*/
public int removeComponent(AbilityKey abilityKey, Class<? extends HandlerMapping> handlerMappingClazz) {
return doRemove(abilityKey, handlerMappingClazz, lockForHandlerMappings, handlerMappings);
}
public final void destroy() {
LOGGER.warn("[DefaultAbilityControlManager] - Start destroying...");
((ThreadPoolExecutor) simpleThreadPool).shutdown();
if (MapUtil.isNotEmpty(handlerMappings)) {
handlerMappings.keySet().forEach(key -> doTriggerSyn(key, false, handlerMappings));
}
// hook
doDestroy();
LOGGER.warn("[DefaultAbilityControlManager] - Destruction of the end");
}
/**.
* hook for subclass
*/
protected void doDestroy() {
// for server ability manager
}
/**
* Remove the component instance of <p>handlerMappingClazz</p>.
*
* @param abilityKey ability key from {@link AbstractAbilityRegistry}
* @param handlerMappingClazz implement of {@link HandlerMapping}
* @param lock lock for operation
* @param handlerMappingsMap handler collection map
* @return the count of components have removed
*/
protected int doRemove(AbilityKey abilityKey, Class<? extends HandlerMapping> handlerMappingClazz, Lock lock,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
List<HandlerWithPriority> handlerMappings = handlerMappingsMap.get(abilityKey);
if (CollectionUtils.isEmpty(handlerMappings)) {
return 0;
}
lock.lock();
try {
AtomicInteger count = new AtomicInteger();
handlerMappings.removeIf(item -> {
if (item.handlerMapping.getClass().equals(handlerMappingClazz)) {
count.getAndIncrement();
return true;
}
return false;
});
return count.get();
} finally {
lock.unlock();
}
}
public int removeAll(AbilityKey abilityKey) {
List<HandlerWithPriority> remove = this.handlerMappings.remove(abilityKey);
return Optional.ofNullable(remove).orElse(Collections.emptyList()).size();
}
/**.
* Register the component into handlerMappings locking by lockForHandlerMappings to ensure concurrency security.
*
* @param abilityKey ability key
* @param handlerMapping component instance.
* @param handlerMappings container
* @param lockForHandlerMappings lock to ensure concurrency
* @param abilityTable behavioral basis of handler
*/
protected void doRegisterComponent(AbilityKey abilityKey, HandlerMapping handlerMapping,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappings, Lock lockForHandlerMappings,
int priority, Map<AbilityKey, Boolean> abilityTable) {
if (!currentRunningAbility.containsKey(abilityKey)) {
LOGGER.warn("[AbilityHandlePostProcessor] Failed to register processor: {}, because illegal key!",
handlerMapping.getClass().getSimpleName());
}
// legal key
lockForHandlerMappings.lock();
try {
List<HandlerWithPriority> handlers = handlerMappings.getOrDefault(abilityKey, new CopyOnWriteArrayList<>());
HandlerWithPriority handlerWithPriority = new HandlerWithPriority(handlerMapping, priority);
handlers.add(handlerWithPriority);
handlerMappings.put(abilityKey, handlers);
// choose behavior
// enable default
if (abilityTable.getOrDefault(abilityKey, false)) {
handlerMapping.enable();
} else {
handlerMapping.disable();
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("[DefaultAbilityControlManager] Fail to register handler: {}", handlerMapping.getClass().getSimpleName());
} finally {
lockForHandlerMappings.unlock();
LOGGER.info("[DefaultAbilityControlManager] Successfully registered processor: {}",
handlerMapping.getClass().getSimpleName());
}
}
/**
* Invoke componments which linked to ability key asyn.
*
* @param key ability key from {@link AbstractAbilityRegistry}
* @param isEnabled turn on/off
* @param handlerMappingsMap handler collection
*/
protected void triggerHandlerMappingAsyn(AbilityKey key, boolean isEnabled,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
simpleThreadPool.execute(() -> doTriggerSyn(key, isEnabled, handlerMappingsMap));
}
/**
* Invoke componments which linked to ability key syn.
*
* @param key ability key from {@link AbstractAbilityRegistry}
* @param isEnabled turn on/off
* @param handlerMappingsMap handler collection
*/
protected void doTriggerSyn(AbilityKey key, boolean isEnabled,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
List<HandlerWithPriority> handlerWithPriorities = handlerMappingsMap.get(key);
// return if empty
if (CollectionUtils.isEmpty(handlerWithPriorities)) {
return;
}
lockForAbilityTable.lock();
try {
// check
if (contains(connectionId)) {
return;
Collections.sort(handlerWithPriorities);
// invoked all
handlerWithPriorities.forEach(handlerMappingWithPriorities -> {
// any error from current handler does not affect other handler
HandlerMapping handlerMapping = handlerMappingWithPriorities.handlerMapping;
try {
if (isEnabled) {
handlerMapping.enable();
} else {
handlerMapping.disable();
}
} catch (Throwable t) {
LOGGER.warn("[HandlerMapping] Failed to invoke {} :{}", handlerMapping.getClass().getSimpleName(),
t.getLocalizedMessage());
}
// hook method
add(table);
// null if not support ability table
Map<AbilityKey, Boolean> clientAbilities = table.getAbility();
if (clientAbilities != null) {
// add to nod
Set<AbilityKey> abilityKeys = table.getAbility().keySet();
abilityKeys.forEach(abilityKey -> {
Boolean res = currentRunningAbility.getOrDefault(abilityKey, false);
Boolean coming = clientAbilities.getOrDefault(abilityKey, false);
clientAbilities.put(abilityKey, res && coming);
});
nodeAbilityTable.put(connectionId, table);
}
} finally {
lockForAbilityTable.unlock();
}
// publish event to subscriber
AbilityComeEvent updateEvent = new AbilityComeEvent();
updateEvent.setConnectionId(table.getConnectionId());
updateEvent.setTable(table);
NotifyCenter.publishEvent(updateEvent);
}
/**
* Remove a ability table
*
* @param connectionId the ability table which is removing.
*/
@Override
public final void removeTable(String connectionId) {
AbilityTable removingTable = null;
lockForAbilityTable.lock();
try {
// hook method
remove(connectionId);
// remove
removingTable = nodeAbilityTable.remove(connectionId);
} finally {
lockForAbilityTable.unlock();
}
// publish event
if (removingTable != null) {
AbilityExpiredEvent expiredEvent = new AbilityExpiredEvent();
expiredEvent.setTable(removingTable);
expiredEvent.setConnectionId(connectionId);
NotifyCenter.publishEvent(expiredEvent);
}
}
/**
* Register a new ability table. This is a ThreadSafe method for {@link AbstractAbilityControlManager#remove(String)}.
*
* @param table the ability table.
*/
protected abstract void add(AbilityTable table);
/**
* Remove a ability table. This is a ThreadSafe method for {@link AbstractAbilityControlManager#add(AbilityTable)}.
*
* @param connectionId the ability table which is removing.
*/
protected abstract void remove(String connectionId);
/**
* wthether contains this ability table
*
* @return
*/
@Override
public boolean contains(String connectionId) {
return nodeAbilityTable.containsKey(connectionId);
});
}
/**
* Initialize the manager
*/
@Override
public void init() {
// default init
// nothing to do
}
/**
* It should be invoked before destroy
*/
@Override
public void destroy() {
// default destroy
// nothing to do
}
/**
* A Nacos application can only have one {@link AbilityControlManager}.
* When multiple control centers exist, it is used to determine which one is preferred
* A legal nacos application has a ability control manager.
* If there are more than one, the one with higher priority is preferred
*
* @return priority
*/
public abstract int getPriority();
/**
* Return ability table of current node
*
* @return ability table
*/
@Override
public Map<AbilityKey, Boolean> getCurrentRunningAbility() {
return new HashMap<>(this.currentRunningAbility);
}
/**
* base class for ability
*/
public abstract class AbilityEvent extends Event {
private static final long serialVersionUID = -123241121302761L;
protected AbilityEvent(){}
/**
* connection id.
*/
private String connectionId;
/**
* ability table
*/
private AbilityTable table;
public String getConnectionId() {
return connectionId;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
public AbilityTable getTable() {
return table;
}
public void setTable(AbilityTable table) {
this.table = table;
}
@JustForTest
protected Map<AbilityKey, List<HandlerWithPriority>> handlerMapping() {
return this.handlerMappings;
}
/**
* when a connection connected.
* Support priority handler.
*/
public class AbilityComeEvent extends AbilityEvent {
protected class HandlerWithPriority implements Comparable<HandlerWithPriority> {
private static final long serialVersionUID = -123241121302761L;
private AbilityComeEvent(){}
/**.
* Decorated
*/
public HandlerMapping handlerMapping;
/**.
* the higher the priority, the faster it will be called
*/
public int priority;
public HandlerWithPriority(HandlerMapping handlerMapping, int priority) {
this.handlerMapping = handlerMapping;
this.priority = priority;
}
@Override
public int compareTo(HandlerWithPriority o) {
return o.priority - this.priority;
}
}
/**
* when a connection disconnected.
/**.
* notify when current node ability changing
*/
public class AbilityExpiredEvent extends AbilityEvent {
private static final long serialVersionUID = -123241121212127619L;
private AbilityExpiredEvent(){}
public class AbilityUpdateEvent extends Event {
private static final long serialVersionUID = -1232411212311111L;
private AbilityKey abilityKey;
private boolean isOn;
private Map<AbilityKey, Boolean> table;
private AbilityUpdateEvent(){}
public Map<AbilityKey, Boolean> getAbilityTable() {
return table;
}
public void setTable(Map<AbilityKey, Boolean> abilityTable) {
this.table = abilityTable;
}
public AbilityKey getAbilityKey() {
return abilityKey;
}
public void setAbilityKey(AbilityKey abilityKey) {
this.abilityKey = abilityKey;
}
public boolean isOn() {
return isOn;
}
public void setOn(boolean on) {
isOn = on;
}
}
}

View File

@ -1,346 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
import com.alibaba.nacos.common.ability.inter.AbilityHandlerRegistry;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.MapUtil;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**.
* @author Daydreamer
* @description It is a relatively complete capability control center implementation.
* @date 2022/7/12 19:18
**/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class DefaultAbilityControlManager extends AbstractAbilityControlManager
implements AbilityHandlerRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAbilityControlManager.class);
/**.
* These handlers will be invoked when the flag of ability change key:
* ability key from {@link com.alibaba.nacos.api.ability.constant.AbilityKey} value:
* components who want to be invoked if its interested ability turn on/off
*/
private final Map<AbilityKey, List<HandlerWithPriority>> handlerMappings = new ConcurrentHashMap<>();
/**.
* run for HandlerMapping
*/
private final Executor simpleThreadPool = ExecutorFactory.newSingleExecutorService();
private final ReentrantLock lockForHandlerMappings = new ReentrantLock();
protected DefaultAbilityControlManager() {
ThreadUtils.addShutdownHook(this::destroy);
NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384);
}
/**
* . Turn on the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return if turn success
*/
@Override
public boolean enableCurrentNodeAbility(AbilityKey abilityKey) {
return doTurn(true, abilityKey);
}
/**
* . Turn off the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return if turn success
*/
@Override
public boolean disableCurrentNodeAbility(AbilityKey abilityKey) {
return doTurn(false, abilityKey);
}
/**
* . Turn on/off the ability of current node
*
* @param isOn is on
* @param abilityKey ability key from {@link AbilityKey}
* @return if turn success
*/
private boolean doTurn(boolean isOn, AbilityKey abilityKey) {
Boolean isEnabled = currentRunningAbility.get(abilityKey);
// if not supporting this key
if (isEnabled == null) {
LOGGER.warn("[AbilityControlManager] Attempt to turn on/off a not existed ability!");
return false;
} else if (isOn == isEnabled) {
// if already turn on/off
return true;
}
// turn on/off
currentRunningAbility.put(abilityKey, isOn);
// handler mappings
triggerHandlerMappingAsyn(abilityKey, isOn, this.handlerMappings);
// notify event
AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent();
abilityUpdateEvent.setTable(new AbilityTable().setAbility(Collections.unmodifiableMap(currentRunningAbility)));
abilityUpdateEvent.isOn = isOn;
abilityUpdateEvent.abilityKey = abilityKey;
NotifyCenter.publishEvent(abilityUpdateEvent);
return true;
}
/**
* Register the component which is managed by {@link AbstractAbilityControlManager}. if you are hoping that a
* component will be invoked when turn on/off the ability whose key is <p>abilityKey</p>.
*
* @param abilityKey component key.
* @param priority the higher the priority is, the faster it will be called.
* @param handlerMapping component instance.
*/
@Override
public void registerComponent(AbilityKey abilityKey, HandlerMapping handlerMapping, int priority) {
doRegisterComponent(abilityKey, handlerMapping, this.handlerMappings, lockForHandlerMappings, priority, currentRunningAbility);
}
@Override
public int removeComponent(AbilityKey abilityKey, Class<? extends HandlerMapping> handlerMappingClazz) {
return doRemove(abilityKey, handlerMappingClazz, lockForHandlerMappings, handlerMappings);
}
@Override
public final void destroy() {
LOGGER.warn("[DefaultAbilityControlManager] - Start destroying...");
((ThreadPoolExecutor) simpleThreadPool).shutdown();
if (MapUtil.isNotEmpty(handlerMappings)) {
handlerMappings.keySet().forEach(key -> doTriggerSyn(key, false, handlerMappings));
}
// hook
doDestroy();
LOGGER.warn("[DefaultAbilityControlManager] - Destruction of the end");
}
/**.
* hook for subclass
*/
protected void doDestroy() {
// for server ability manager
}
/**
* Remove the component instance of <p>handlerMappingClazz</p>.
*
* @param abilityKey ability key from {@link AbstractAbilityRegistry}
* @param handlerMappingClazz implement of {@link HandlerMapping}
* @param lock lock for operation
* @param handlerMappingsMap handler collection map
* @return the count of components have removed
*/
protected int doRemove(AbilityKey abilityKey, Class<? extends HandlerMapping> handlerMappingClazz, Lock lock,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
List<HandlerWithPriority> handlerMappings = handlerMappingsMap.get(abilityKey);
if (CollectionUtils.isEmpty(handlerMappings)) {
return 0;
}
lock.lock();
try {
AtomicInteger count = new AtomicInteger();
handlerMappings.removeIf(item -> {
if (item.handlerMapping.getClass().equals(handlerMappingClazz)) {
count.getAndIncrement();
return true;
}
return false;
});
return count.get();
} finally {
lock.unlock();
}
}
@Override
public int removeAll(AbilityKey abilityKey) {
List<HandlerWithPriority> remove = this.handlerMappings.remove(abilityKey);
return Optional.ofNullable(remove).orElse(Collections.emptyList()).size();
}
/**.
* Register the component into handlerMappings locking by lockForHandlerMappings to ensure concurrency security.
*
* @param abilityKey ability key
* @param handlerMapping component instance.
* @param handlerMappings container
* @param lockForHandlerMappings lock to ensure concurrency
* @param abilityTable behavioral basis of handler
*/
protected void doRegisterComponent(AbilityKey abilityKey, HandlerMapping handlerMapping,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappings, Lock lockForHandlerMappings,
int priority, Map<AbilityKey, Boolean> abilityTable) {
if (!currentRunningAbility.containsKey(abilityKey)) {
LOGGER.warn("[AbilityHandlePostProcessor] Failed to register processor: {}, because illegal key!",
handlerMapping.getClass().getSimpleName());
}
// legal key
lockForHandlerMappings.lock();
try {
List<HandlerWithPriority> handlers = handlerMappings.getOrDefault(abilityKey, new CopyOnWriteArrayList<>());
HandlerWithPriority handlerWithPriority = new HandlerWithPriority(handlerMapping, priority);
handlers.add(handlerWithPriority);
handlerMappings.put(abilityKey, handlers);
// choose behavior
// enable default
if (abilityTable.getOrDefault(abilityKey, false)) {
handlerMapping.enable();
} else {
handlerMapping.disable();
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("[DefaultAbilityControlManager] Fail to register handler: {}", handlerMapping.getClass().getSimpleName());
} finally {
lockForHandlerMappings.unlock();
LOGGER.info("[DefaultAbilityControlManager] Successfully registered processor: {}",
handlerMapping.getClass().getSimpleName());
}
}
/**
* Invoke componments which linked to ability key asyn.
*
* @param key ability key from {@link AbstractAbilityRegistry}
* @param isEnabled turn on/off
* @param handlerMappingsMap handler collection
*/
protected void triggerHandlerMappingAsyn(AbilityKey key, boolean isEnabled,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
simpleThreadPool.execute(() -> doTriggerSyn(key, isEnabled, handlerMappingsMap));
}
/**
* Invoke componments which linked to ability key syn.
*
* @param key ability key from {@link AbstractAbilityRegistry}
* @param isEnabled turn on/off
* @param handlerMappingsMap handler collection
*/
protected void doTriggerSyn(AbilityKey key, boolean isEnabled,
Map<AbilityKey, List<HandlerWithPriority>> handlerMappingsMap) {
List<HandlerWithPriority> handlerWithPriorities = handlerMappingsMap.get(key);
// return if empty
if (CollectionUtils.isEmpty(handlerWithPriorities)) {
return;
}
Collections.sort(handlerWithPriorities);
// invoked all
handlerWithPriorities.forEach(handlerMappingWithPriorities -> {
// any error from current handler does not affect other handler
HandlerMapping handlerMapping = handlerMappingWithPriorities.handlerMapping;
try {
if (isEnabled) {
handlerMapping.enable();
} else {
handlerMapping.disable();
}
} catch (Throwable t) {
LOGGER.warn("[HandlerMapping] Failed to invoke {} :{}", handlerMapping.getClass().getSimpleName(),
t.getLocalizedMessage());
}
});
}
@JustForTest
protected Map<AbilityKey, List<HandlerWithPriority>> handlerMapping() {
return this.handlerMappings;
}
/**
* Support priority handler.
*/
protected class HandlerWithPriority implements Comparable<HandlerWithPriority> {
/**.
* Decorated
*/
public HandlerMapping handlerMapping;
/**.
* the higher the priority, the faster it will be called
*/
public int priority;
public HandlerWithPriority(HandlerMapping handlerMapping, int priority) {
this.handlerMapping = handlerMapping;
this.priority = priority;
}
@Override
public int compareTo(HandlerWithPriority o) {
return o.priority - this.priority;
}
}
/**.
* notify when current node ability changing
*/
public class AbilityUpdateEvent extends AbilityEvent {
private static final long serialVersionUID = -1232411212311111L;
private AbilityKey abilityKey;
private boolean isOn;
private AbilityUpdateEvent(){}
public AbilityKey getAbilityKey() {
return abilityKey;
}
public void setAbilityKey(AbilityKey abilityKey) {
this.abilityKey = abilityKey;
}
public boolean isOn() {
return isOn;
}
public void setOn(boolean on) {
isOn = on;
}
}
}

View File

@ -17,8 +17,6 @@
package com.alibaba.nacos.common.ability.discover;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
import com.alibaba.nacos.common.ability.inter.AbilityControlManager;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,19 +47,19 @@ public class NacosAbilityManagerHolder {
/**.
* singleton
*/
private static DefaultAbilityControlManager abstractAbilityControlManager;
private static AbstractAbilityControlManager abstractAbilityControlManager;
static {
// spi discover implement
Collection<DefaultAbilityControlManager> load = null;
Collection<AbstractAbilityControlManager> load = null;
try {
// if server
load = NacosServiceLoader.load(DefaultAbilityControlManager.class);
load = NacosServiceLoader.load(AbstractAbilityControlManager.class);
} catch (ServiceConfigurationError e) {
throw new RuntimeException("[AbilityControlManager] Cannot find AbilityControlManger");
}
// the priority of the server is higher
List<DefaultAbilityControlManager> collect = load.stream()
List<AbstractAbilityControlManager> collect = load.stream()
.sorted(Comparator.comparingInt(AbstractAbilityControlManager::getPriority))
.collect(Collectors.toList());
// get the highest priority one
@ -76,7 +74,7 @@ public class NacosAbilityManagerHolder {
*
* @return BaseAbilityControlManager
*/
public static DefaultAbilityControlManager getInstance() {
public static AbstractAbilityControlManager getInstance() {
return abstractAbilityControlManager;
}
@ -87,7 +85,7 @@ public class NacosAbilityManagerHolder {
* @param <T> target type
* @return AbilityControlManager
*/
public static <T extends AbilityControlManager> T getInstance(Class<T> clazz) {
public static <T extends AbstractAbilityControlManager> T getInstance(Class<T> clazz) {
return clazz.cast(abstractAbilityControlManager);
}
}

View File

@ -1,88 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.inter;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import java.util.Map;
/**.
* @author Daydreamer
* @description This is a base interface to manage ability table
* @date 2022/8/10 23:18
**/
public interface AbilityControlManager {
/**
* Whether the ability is supported for Connection. If the ability of current node is closed, it will return false.
*
* @param connectionId the connection range of ability table.
* @param abilityKey key name which comes from {@link AbstractAbilityRegistry}.
* @return whether the ability is supported in certain connection.
*/
AbilityStatus isSupport(String connectionId, AbilityKey abilityKey);
/**
* Whether the ability current node supporting is running. Return false if current node doesn't support.
*
* @param abilityKey ability key
* @return is running
*/
boolean isCurrentNodeAbilityRunning(AbilityKey abilityKey);
/**
* Register a new ability table.
*
* @param table the ability table.
*/
void addNewTable(AbilityTable table);
/**.
* Remove a ability table
*
* @param connectionId the ability table which is removing.
*/
void removeTable(String connectionId);
/**.
* whether contains this ability table
*
* @param connectionId connection id
* @return whether contains
*/
boolean contains(String connectionId);
/**.
* Return ability table of current node
*
* @return ability table
*/
Map<AbilityKey, Boolean> getCurrentRunningAbility();
/**.
* Initialize the manager
*/
void init();
/**.
* It should be invoked before destroy
*/
void destroy();
}

View File

@ -1,83 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.inter;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
/**.
* @author Daydreamer
* @description It provides the capability to notify components which interested in one ability for the {@link AbilityControlManager}
* @date 2022/8/10 23:43
**/
public interface AbilityHandlerRegistry {
/**.
* Turn on the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return if turn success
*/
boolean enableCurrentNodeAbility(AbilityKey abilityKey);
/**.
* Turn off the ability whose key is <p>abilityKey</p>
*
* @param abilityKey ability key
* @return if turn success
*/
boolean disableCurrentNodeAbility(AbilityKey abilityKey);
/**.
* Register the component which is managed by {@link AbstractAbilityControlManager}.
* if you are hoping that a component will be invoked when turn on/off the ability whose key is <p>abilityKey</p>.
*
* @param abilityKey component key from {@link AbstractAbilityRegistry}
* @param priority a positive number, the higher the priority is, the faster it will be called. `1` is the lowest priority.
* @param handlerMapping component instance.
*/
void registerComponent(AbilityKey abilityKey, HandlerMapping handlerMapping, int priority);
/**.
* Default method to register component with the lowest priority.
*
* @param abilityKey component key from {@link AbstractAbilityRegistry}
* @param handlerMapping component instance.
*/
default void registerComponent(AbilityKey abilityKey, HandlerMapping handlerMapping) {
registerComponent(abilityKey, handlerMapping, 1);
}
/**
* Remove the component instance of <p>handlerMappingClazz</p>.
*
* @param abilityKey ability key from {@link AbstractAbilityRegistry}
* @param handlerMappingClazz implement of {@link HandlerMapping}
* @return the count of components have removed
*/
int removeComponent(AbilityKey abilityKey, Class<? extends HandlerMapping> handlerMappingClazz);
/**
* Remove all {@link HandlerMapping} interested in the special ability.
* @param abilityKey abnility key from {@link AbstractAbilityRegistry}
* @return the count of components have removed
*/
int removeAll(AbilityKey abilityKey);
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.listener;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
/**.
* @author Daydreamer
* @description This listener is used for remove ability table if disconnected.
* @date 2022/8/30 22:00
**/
public class ClientAbilityEventListener implements ConnectionEventListener {
@Override
public void onConnected(Connection connection) {
// nothing to do
}
@Override
public void onDisConnect(Connection connection) {
NacosAbilityManagerHolder.getInstance().removeTable(connection.getConnectionId());
}
}

View File

@ -16,8 +16,11 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.remote.Requester;
import java.util.Map;
/**
* connection on client side.
*
@ -33,6 +36,8 @@ public abstract class Connection implements Requester {
protected RpcClient.ServerInfo serverInfo;
protected Map<AbilityKey, Boolean> abilityTable;
public Connection(RpcClient.ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
@ -45,6 +50,14 @@ public abstract class Connection implements Requester {
this.connectionId = connectionId;
}
public Map<AbilityKey, Boolean> getAbilityTable() {
return abilityTable;
}
public void setAbilityTable(Map<AbilityKey, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
/**
* Getter method for property <tt>abandon</tt>.
*

View File

@ -28,8 +28,6 @@ import com.alibaba.nacos.api.remote.response.ClientDetectionResponse;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.ability.listener.ClientAbilityEventListener;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.PayloadRegistry;
@ -272,9 +270,6 @@ public abstract class RpcClient implements Closeable {
return;
}
// add listener to remove expired ability table
registerConnectionListener(new ClientAbilityEventListener());
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
@ -459,9 +454,6 @@ public abstract class RpcClient implements Closeable {
LOGGER.info("Shutdown rpc client, set status to shutdown");
rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
LOGGER.info("Shutdown client event executor " + clientEventExecutor);
if (currentConnection != null) {
NacosAbilityManagerHolder.getInstance().removeTable(currentConnection.getConnectionId());
}
if (clientEventExecutor != null) {
clientEventExecutor.shutdownNow();
}

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
@ -347,23 +346,19 @@ public abstract class GrpcClient extends RpcClient {
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();
AbilityTable table = new AbilityTable();
table.setServer(true)
.setConnectionId(connectionId);
// if not supported, it will be null
if (serverCheckResponse.getAbilities() != null) {
Map<AbilityKey, Boolean> abilityTable = AbilityKey.mapEnum(serverCheckResponse.getAbilities());
table.setAbility(abilityTable);
// mark
markForSetup.put(serverCheckResponse.getConnectionId(), new CountDownLatch(1));
}
NacosAbilityManagerHolder.getInstance().addNewTable(table);
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be null
if (serverCheckResponse.getAbilities() != null) {
Map<AbilityKey, Boolean> abilityTable = AbilityKey.mapEnum(serverCheckResponse.getAbilities());
// mark
markForSetup.put(serverCheckResponse.getConnectionId(), new CountDownLatch(1));
// set server abilities to connection
grpcConn.setAbilityTable(abilityTable);
}
//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
@ -377,7 +372,7 @@ public abstract class GrpcClient extends RpcClient {
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(AbilityKey.mapStr(NacosAbilityManagerHolder.getInstance().getCurrentRunningAbility()));
conSetupRequest.setAbilityTable(AbilityKey.mapStr(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response

View File

@ -19,8 +19,8 @@ package com.alibaba.nacos.core.ability.config;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.ability.inter.AbilityHandlerRegistry;
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
@ -49,7 +49,7 @@ public class AbilityConfigs extends Subscriber<ServerConfigChangeEvent> {
private final Set<AbilityKey> serverAbilityKeys = new ConcurrentHashSet<>();
private AbilityHandlerRegistry abilityHandlerRegistry = NacosAbilityManagerHolder.getInstance();
private AbstractAbilityControlManager abilityHandlerRegistry = NacosAbilityManagerHolder.getInstance();
public AbilityConfigs() {
// load ability
@ -102,7 +102,7 @@ public class AbilityConfigs extends Subscriber<ServerConfigChangeEvent> {
}
@JustForTest
protected void setAbilityHandlerRegistry(AbilityHandlerRegistry abilityHandlerRegistry) {
protected void setAbilityHandlerRegistry(AbstractAbilityControlManager abilityHandlerRegistry) {
this.abilityHandlerRegistry = abilityHandlerRegistry;
}

View File

@ -17,61 +17,28 @@
package com.alibaba.nacos.core.ability.control;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.MapUtil;
import com.alibaba.nacos.core.ability.config.AbilityConfigs;
import com.alibaba.nacos.core.ability.inte.ClusterAbilityControlSupport;
import com.alibaba.nacos.sys.env.EnvUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**.
* @author Daydreamer
* @description {@link AbstractAbilityControlManager} for nacos-server.
* @date 2022/7/13 21:14
**/
public class ServerAbilityControlManager extends DefaultAbilityControlManager implements ClusterAbilityControlSupport {
/**.
* ability for cluster
*/
private final Map<AbilityKey, Boolean> clusterAbilityTable = new ConcurrentHashMap<>();
/**.
* ability for server
*/
private final Map<String, AbilityTable> serversAbilityTable = new ConcurrentHashMap<>();
/**.
* Number of servers that do not support capability negotiation
*/
private final ConcurrentHashSet<String> serverNoAbilityNegotiation = new ConcurrentHashSet<>();
public class ServerAbilityControlManager extends AbstractAbilityControlManager {
public ServerAbilityControlManager() {
// add current node into
AbilityTable currentNodeAbility = new AbilityTable();
currentNodeAbility.setAbility(super.currentRunningAbility);
currentNodeAbility.setConnectionId("current-node");
serversAbilityTable.put(currentNodeAbility.getConnectionId(), currentNodeAbility);
clusterAbilityTable.putAll(currentNodeAbility.getAbility());
NotifyCenter.registerToPublisher(ClusterAbilityUpdateEvent.class, 16384);
}
@Override
protected Map<AbilityKey, Boolean> getCurrentNodeSupportAbility() {
protected Map<AbilityKey, Boolean> initCurrentNodeAbilities() {
// static abilities
Map<AbilityKey, Boolean> staticAbilities = ServerAbilities.getStaticAbilities();
// all function server can support
@ -99,160 +66,9 @@ public class ServerAbilityControlManager extends DefaultAbilityControlManager im
return abilityTable;
}
@Override
public AbilityStatus isSupport(String connectionId, AbilityKey abilityKey) {
AbilityTable abilityTable = nodeAbilityTable.get(connectionId);
if (abilityTable == null) {
return AbilityStatus.UNKNOWN;
}
Boolean isSupport = Optional.ofNullable(abilityTable.getAbility()).orElse(Collections.emptyMap())
.getOrDefault(abilityKey, false);
return isSupport ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
/**.
* Whether all the servers currently connected support a certain capability
*
* @param abilityKey ability key
* @return whether it is turn on
*/
@Override
public AbilityStatus isClusterEnableAbilityNow(AbilityKey abilityKey) {
if (serverNoAbilityNegotiation.size() > 0) {
return AbilityStatus.UNKNOWN;
}
return clusterAbilityTable.getOrDefault(abilityKey, Boolean.FALSE) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
@Override
public Map<AbilityKey, Boolean> getClusterAbility() {
return serverNoAbilityNegotiation.size() > 0 ? null : Collections.unmodifiableMap(clusterAbilityTable);
}
@Override
protected void add(AbilityTable table) {
// from which env
boolean isServer = table.isServer();
// if not null
if (table.getConnectionId() != null && table.getAbility() != null) {
if (isServer) {
serversAbilityTable.put(table.getConnectionId(), table);
// enter cluster
Map<AbilityKey, Boolean> nodeAbility = table.getAbility();
Set<AbilityKey> keySet = clusterAbilityTable.keySet();
keySet.forEach(abilityKey -> {
Boolean isEnabled = clusterAbilityTable.get(abilityKey);
Boolean val = nodeAbility.getOrDefault(abilityKey, Boolean.FALSE);
// new res
Boolean newRes = val && isEnabled;
// if ability changes
if (!newRes.equals(isEnabled)) {
clusterAbilityTable.replace(abilityKey, false);
// notify
NotifyCenter.publishEvent(buildClusterEvent(abilityKey, false));
}
});
}
} else if (isServer && table.getAbility() == null) {
// add mark if server doesn't support ability table
serverNoAbilityNegotiation.add(table.getConnectionId());
}
}
private ClusterAbilityUpdateEvent buildClusterEvent(AbilityKey abilityKey, boolean isOn) {
// notify
ClusterAbilityUpdateEvent event = new ClusterAbilityUpdateEvent();
event.setAbilityKey(abilityKey);
event.setOn(isOn);
event.setTable(new AbilityTable().setAbility(Collections.unmodifiableMap(clusterAbilityTable)));
return event;
}
@Override
protected void remove(String connectionId) {
// from which
AbilityTable abilityTable = nodeAbilityTable.get(connectionId);
// if not support
serverNoAbilityNegotiation.remove(connectionId);
// return if null
if (abilityTable == null) {
return;
}
// from which env
if (abilityTable.isServer()) {
// remove from server ability collection if support
serversAbilityTable.remove(connectionId);
// remove from cluster
if (MapUtil.isNotEmpty(serversAbilityTable)) {
Set<AbilityKey> keySet = clusterAbilityTable.keySet();
keySet.forEach(abilityKey -> {
Boolean isEnabled = clusterAbilityTable.getOrDefault(abilityKey, Boolean.FALSE);
// nothing to do if enabled
if (isEnabled) {
return;
}
// recalculate
Boolean newVal = serversAbilityTable.values()
.stream()
.map(AbilityTable::getAbility)
.map((map) -> map.getOrDefault(abilityKey, Boolean.FALSE))
.reduce((a, b) -> a && b)
.orElse(Boolean.FALSE);
clusterAbilityTable.replace(abilityKey, newVal);
// if change
if (!isEnabled.equals(newVal)) {
// notify
NotifyCenter.publishEvent(buildClusterEvent(abilityKey, newVal));
}
});
}
}
}
@Override
public int getPriority() {
return 1;
}
/**.
* notify when current node ability changing
*/
public class ClusterAbilityUpdateEvent extends AbilityEvent {
private static final long serialVersionUID = -122222411212200111L;
private AbilityKey abilityKey;
private boolean isOn;
private ClusterAbilityUpdateEvent(){}
public AbilityKey getAbilityKey() {
return abilityKey;
}
public void setAbilityKey(AbilityKey abilityKey) {
this.abilityKey = abilityKey;
}
public boolean isOn() {
return isOn;
}
public void setOn(boolean on) {
isOn = on;
}
}
@JustForTest
protected void setClusterAbilityTable(Map<AbilityKey, Boolean> map) {
clusterAbilityTable.putAll(map);
}
@JustForTest
protected Set<String> serverNotSupport() {
return serverNoAbilityNegotiation;
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability.inte;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.common.ability.inter.AbilityControlManager;
import java.util.Map;
/**.
* @author Daydreamer
* @description It provides the capability to manage the AbilityTable in cluster for the {@link AbilityControlManager}
* @date 2022/8/10 23:18
**/
public interface ClusterAbilityControlSupport {
/**.
* Return the cluster abilities.
*
* @return the cluster abilities.
*/
Map<AbilityKey, Boolean> getClusterAbility();
/**.
* Whether all the servers currently connected support a certain capability
*
* @param abilityKey ability key
* @return whether it is turn on
*/
AbilityStatus isClusterEnableAbilityNow(AbilityKey abilityKey);
}

View File

@ -66,8 +66,10 @@ public abstract class Connection implements Requester {
/**
* get abilities.
*
* @deprecated it is replaced by abilityTable field
* @return
*/
@Deprecated
public ClientAbilities getAbilities() {
return abilities;
}
@ -75,8 +77,10 @@ public abstract class Connection implements Requester {
/**
* set abilities.
*
* @deprecated it is replaced by abilityTable field
* @param abilities abilities.
*/
@Deprecated
public void setAbilities(ClientAbilities abilities) {
this.abilities = abilities;
}

View File

@ -92,7 +92,7 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get(),
// to str map
AbilityKey.mapStr(NacosAbilityManagerHolder.getInstance().getCurrentRunningAbility())));
AbilityKey.mapStr(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities())));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();

View File

@ -1,45 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.listener;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.core.remote.Connection;
import org.springframework.stereotype.Component;
/**.
* @author Daydreamer
* @description This listener is used to register or remove ability table.
* @date 2022/7/17 19:18
**/
@Component
public class ServerAbilityConnectionListener extends ClientConnectionEventListener {
@Override
public void clientConnected(Connection connect) {
// it will be thought from client all
AbilityTable abilityTable = new AbilityTable(connect.getMetaInfo().getConnectionId(), connect.getAbilityTable(),
false, connect.getMetaInfo().getVersion());
NacosAbilityManagerHolder.getInstance().addNewTable(abilityTable);
}
@Override
public void clientDisConnected(Connection connect) {
NacosAbilityManagerHolder.getInstance().removeTable(connect.getMetaInfo().getConnectionId());
}
}

View File

@ -17,8 +17,6 @@
package com.alibaba.nacos.core.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
import org.junit.Assert;
import org.junit.Before;
@ -40,99 +38,9 @@ public class AbilityControlManagerTest {
public void inject() {
Map<AbilityKey, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.TEST_1, true);
Map<AbilityKey, Boolean> cluster = new HashMap<>();
cluster.put(AbilityKey.TEST_1, true);
serverAbilityControlManager.setClusterAbility(cluster);
serverAbilityControlManager.setCurrentSupportingAbility(newTable);
}
@Test
public void testClientAdd() {
Map<AbilityKey, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.TEST_2, true);
newTable.put(AbilityKey.TEST_1, true);
AbilityTable table = new AbilityTable();
table.setConnectionId("test-00001");
table.setAbility(newTable);
table.setServer(true);
serverAbilityControlManager.addNewTable(table);
Assert.assertEquals(AbilityStatus.NOT_SUPPORTED, serverAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_2));
Assert.assertEquals(AbilityStatus.SUPPORTED, serverAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_1));
}
@Test
public void testServerAdd() {
Map<AbilityKey, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.TEST_2, true);
newTable.put(AbilityKey.TEST_1, true);
AbilityTable table = new AbilityTable();
table.setConnectionId("test-00001");
table.setAbility(newTable);
table.setServer(true);
serverAbilityControlManager.addNewTable(table);
Assert.assertEquals(AbilityStatus.NOT_SUPPORTED, serverAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_2));
Assert.assertEquals(AbilityStatus.SUPPORTED, serverAbilityControlManager.isSupport("test-00001", AbilityKey.TEST_1));
Map<AbilityKey, Boolean> otherServer = new HashMap<>();
otherServer.put(AbilityKey.TEST_2, true);
otherServer.put(AbilityKey.TEST_1, false);
AbilityTable otherServerTable = new AbilityTable();
otherServerTable.setConnectionId("test-00000");
otherServerTable.setAbility(otherServer);
otherServerTable.setServer(true);
serverAbilityControlManager.addNewTable(otherServerTable);
Map<AbilityKey, Boolean> clientTa = new HashMap<>();
clientTa.put(AbilityKey.TEST_2, true);
clientTa.put(AbilityKey.TEST_1, false);
AbilityTable clientTable = new AbilityTable();
clientTable.setConnectionId("test-00002");
clientTable.setAbility(clientTa);
clientTable.setServer(false);
serverAbilityControlManager.addNewTable(clientTable);
// if not support
AbilityTable serverTable = new AbilityTable();
serverTable.setConnectionId("test-001231");
serverTable.setServer(true);
serverAbilityControlManager.addNewTable(serverTable);
// unknown because not support
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
Assert.assertEquals(serverAbilityControlManager.getServerNotSupportAbility().size(), 1);
Assert.assertTrue(serverAbilityControlManager.getServerNotSupportAbility().contains("test-001231"));
AbilityTable serverTable1 = new AbilityTable();
serverTable1.setConnectionId("test-001231231");
serverTable1.setServer(true);
serverAbilityControlManager.addNewTable(serverTable1);
// unknown because not support
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
Assert.assertEquals(serverAbilityControlManager.getServerNotSupportAbility().size(), 2);
Assert.assertTrue(serverAbilityControlManager.getServerNotSupportAbility().contains("test-001231231"));
// remove then support
serverAbilityControlManager.removeTable("test-001231");
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
serverAbilityControlManager.removeTable("test-001231231");
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.NOT_SUPPORTED);
}
@Test
public void testClientRemove() {
Map<AbilityKey, Boolean> clientTa = new HashMap<>();
clientTa.put(AbilityKey.TEST_2, true);
clientTa.put(AbilityKey.TEST_1, false);
AbilityTable clientTable = new AbilityTable();
clientTable.setConnectionId("test-01111");
clientTable.setAbility(clientTa);
clientTable.setServer(true);
serverAbilityControlManager.addNewTable(clientTable);
Assert.assertTrue(serverAbilityControlManager.contains(clientTable.getConnectionId()));
serverAbilityControlManager.removeTable("test-01111");
Assert.assertFalse(serverAbilityControlManager.contains(clientTable.getConnectionId()));
}
@Test
public void testComponent() throws InterruptedException {
enabled = 0;
@ -177,7 +85,7 @@ public class AbilityControlManagerTest {
@Test
public void testCurrentNodeAbility() {
Set<AbilityKey> keySet = serverAbilityControlManager.getCurrentRunningAbility().keySet();
Set<AbilityKey> keySet = serverAbilityControlManager.getCurrentNodeAbilities().keySet();
// diable all
keySet.forEach(key -> serverAbilityControlManager.disableCurrentNodeAbility(key));
// get all
@ -190,42 +98,6 @@ public class AbilityControlManagerTest {
keySet.forEach(key -> {
Assert.assertTrue(serverAbilityControlManager.isCurrentNodeAbilityRunning(key));
});
// add node doesn't support ability table
AbilityTable abilityTable = new AbilityTable();
abilityTable.setServer(true);
abilityTable.setConnectionId("adsadsa1");
serverAbilityControlManager.addNewTable(abilityTable);
// cluster abilities close
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
AbilityTable abilityTable1 = new AbilityTable();
abilityTable1.setServer(true);
abilityTable1.setConnectionId("adsadsa2");
serverAbilityControlManager.addNewTable(abilityTable1);
// cluster abilities still close
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
Assert.assertNull(serverAbilityControlManager.getClusterAbility());
AbilityTable abilityTable2 = new AbilityTable();
abilityTable2.setServer(true);
abilityTable2.setConnectionId("adsadsa3");
Map<AbilityKey, Boolean> clientTa = new HashMap<>();
clientTa.put(AbilityKey.TEST_2, true);
clientTa.put(AbilityKey.TEST_1, false);
abilityTable2.setAbility(clientTa);
serverAbilityControlManager.addNewTable(abilityTable2);
// cluster abilities still close
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.UNKNOWN);
Assert.assertNull(serverAbilityControlManager.getClusterAbility());
// remove
serverAbilityControlManager.removeTable("adsadsa3");
serverAbilityControlManager.removeTable("adsadsa2");
serverAbilityControlManager.removeTable("adsadsa1");
// cluster abilities open
Assert.assertEquals(serverAbilityControlManager.isClusterEnableAbilityNow(AbilityKey.TEST_1), AbilityStatus.SUPPORTED);
Assert.assertNotNull(serverAbilityControlManager.getClusterAbility());
}
class TestHandlerMapping implements HandlerMapping {

View File

@ -31,19 +31,9 @@ public class TestServerAbilityControlManager extends ServerAbilityControlManager
currentRunningAbility.putAll(ability);
}
@JustForTest
public void setClusterAbility(Map<AbilityKey, Boolean> ability) {
super.setClusterAbilityTable(ability);
}
@JustForTest
public int handlerMappingCount() {
return super.handlerMapping().size();
}
@JustForTest
public Set<String> getServerNotSupportAbility() {
return super.serverNotSupport();
}
}

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.core.ability.config;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.common.ability.inter.AbilityHandlerRegistry;
import java.util.Set;
@ -33,8 +32,4 @@ public class TestAbilityConfig extends AbilityConfigs {
serverAbilityKeys.add(AbilityKey.TEST_1);
serverAbilityKeys.add(AbilityKey.TEST_2);
}
public void setAbilityControlManager(AbilityHandlerRegistry abilityHandlerRegistry) {
super.setAbilityHandlerRegistry(abilityHandlerRegistry);
}
}