Add the base implements for ability control.
This commit is contained in:
parent
a15ef8eb3a
commit
4098297e51
@ -0,0 +1,401 @@
|
||||
/*
|
||||
* Copyright 1999-2022 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
|
||||
import com.alibaba.nacos.api.ability.entity.AbilityTable;
|
||||
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
|
||||
import com.alibaba.nacos.common.ability.inter.TraceableAbilityControlManager;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**.
|
||||
* @author Daydreamer
|
||||
* @description Base class for ability control. It can only be used internally by Nacos.It showld be sington.
|
||||
* @date 2022/7/12 19:18
|
||||
**/
|
||||
@SuppressWarnings("all")
|
||||
public abstract class AbstractAbilityControlManager implements TraceableAbilityControlManager {
|
||||
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class);
|
||||
|
||||
/**
|
||||
* Abilities current supporting
|
||||
* <p>
|
||||
* key: ability key from {@link AbilityKey}
|
||||
* value: whether to turn on
|
||||
*/
|
||||
protected final Map<String, Boolean> currentRunningAbility = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Ability table collections
|
||||
* <p>
|
||||
* key: connectionId
|
||||
* value: AbilityTable
|
||||
*/
|
||||
protected final Map<String, AbilityTable> nodeAbilityTable = new ConcurrentHashMap<>();
|
||||
|
||||
/**.
|
||||
* These handlers will be invoke before combine the ability table
|
||||
*/
|
||||
private final List<AbilityHandlePreProcessor> abilityHandlePreProcessors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* This map is used to trace the status of ability table.
|
||||
* Its status should be update after {@link #addNewTable(AbilityTable)} and {@link #removeTable(String)}
|
||||
*/
|
||||
protected final Map<String, AtomicReference<AbilityStatus>> abilityStatus = new ConcurrentHashMap<>();
|
||||
|
||||
private final ReentrantLock lockForProcessors = new ReentrantLock();
|
||||
|
||||
private final ReentrantLock lockForAbilityTable = new ReentrantLock();
|
||||
|
||||
protected AbstractAbilityControlManager() {
|
||||
// register events
|
||||
registerAbilityEvent();
|
||||
// put abilities
|
||||
currentRunningAbility.putAll(AbilityKey.getCurrentNodeSupportAbility());
|
||||
// initialize
|
||||
init();
|
||||
}
|
||||
|
||||
|
||||
private void registerAbilityEvent(){
|
||||
// register events
|
||||
NotifyCenter.registerToPublisher(AbilityComeEvent.class, 16384);
|
||||
NotifyCenter.registerToPublisher(AbilityExpiredEvent.class, 16384);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the ability current node supporting is running. Return false if current node doesn't support.
|
||||
*
|
||||
* @param abilityKey ability key
|
||||
* @return is running
|
||||
*/
|
||||
@Override
|
||||
public boolean isCurrentNodeAbilityRunning(String abilityKey) {
|
||||
return currentRunningAbility.getOrDefault(abilityKey, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new ability table.
|
||||
*
|
||||
* @param table the ability table.
|
||||
*/
|
||||
@Override
|
||||
public final void addNewTable(AbilityTable table) {
|
||||
// id should not be null
|
||||
String connectionId = table.getConnectionId();
|
||||
// if exists
|
||||
if (contains(connectionId) || connectionId == null) {
|
||||
return;
|
||||
}
|
||||
lockForAbilityTable.lock();
|
||||
try {
|
||||
// check
|
||||
if (contains(connectionId)) {
|
||||
return;
|
||||
}
|
||||
// update status
|
||||
abilityStatus.put(connectionId, new AtomicReference<>(AbilityStatus.INITIALIZING));
|
||||
// handle ability table before joining current node
|
||||
AbilityTable processed = process(table);
|
||||
// hook method
|
||||
add(processed);
|
||||
// add to node
|
||||
nodeAbilityTable.put(connectionId, table);
|
||||
} finally {
|
||||
lockForAbilityTable.unlock();
|
||||
}
|
||||
// update status
|
||||
AtomicReference<AbilityStatus> abilityStatusAtomicReference = abilityStatus.get(table.getConnectionId());
|
||||
if (abilityStatusAtomicReference != null) {
|
||||
// try one time
|
||||
// do nothing if AbilityStatus == Expired
|
||||
// if ready
|
||||
if(abilityStatusAtomicReference.compareAndSet(AbilityStatus.INITIALIZING, AbilityStatus.READY)) {
|
||||
// publish event to subscriber
|
||||
AbilityComeEvent updateEvent = new AbilityComeEvent();
|
||||
updateEvent.setConnectionId(table.getConnectionId());
|
||||
updateEvent.setTable(table);
|
||||
NotifyCenter.publishEvent(updateEvent);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("[AbiityControlManager] Cannot get connection status after processing ability table, possible reason is that the network is unstable");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a ability table
|
||||
*
|
||||
* @param connectionId the ability table which is removing.
|
||||
*/
|
||||
@Override
|
||||
public final void removeTable(String connectionId) {
|
||||
// if not exists
|
||||
if(connectionId == null || !nodeAbilityTable.containsKey(connectionId)){
|
||||
return;
|
||||
}
|
||||
AbilityTable removingTable = null;
|
||||
lockForAbilityTable.lock();
|
||||
try {
|
||||
// check
|
||||
if (!nodeAbilityTable.containsKey(connectionId)) {
|
||||
return;
|
||||
}
|
||||
nodeAbilityTable.get(connectionId);
|
||||
// update status
|
||||
abilityStatus.computeIfPresent(connectionId, (k, v) -> {
|
||||
v.set(AbilityStatus.EXPIRED);
|
||||
return v;
|
||||
});
|
||||
// hook method
|
||||
remove(connectionId);
|
||||
// remove
|
||||
nodeAbilityTable.remove(connectionId);
|
||||
} finally {
|
||||
lockForAbilityTable.unlock();
|
||||
}
|
||||
// remove status
|
||||
abilityStatus.remove(connectionId);
|
||||
// publish event
|
||||
if (removingTable != null) {
|
||||
AbilityExpiredEvent expiredEvent = new AbilityExpiredEvent();
|
||||
expiredEvent.setTable(removingTable);
|
||||
expiredEvent.setConnectionId(connectionId);
|
||||
NotifyCenter.publishEvent(expiredEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Register a new ability table. This is a ThreadSafe method for {@link AbstractAbilityControlManager#remove(String)}.
|
||||
*
|
||||
* @param table the ability table.
|
||||
*/
|
||||
protected abstract void add(AbilityTable table);
|
||||
|
||||
|
||||
/**
|
||||
* Remove a ability table. This is a ThreadSafe method for {@link AbstractAbilityControlManager#add(AbilityTable)}.
|
||||
*
|
||||
* @param connectionId the ability table which is removing.
|
||||
*/
|
||||
protected abstract void remove(String connectionId);
|
||||
|
||||
|
||||
/**
|
||||
* wthether contains this ability table
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean contains(String connectionId) {
|
||||
return nodeAbilityTable.containsKey(connectionId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the status of the ability table.
|
||||
*
|
||||
* @param connectionId connection id
|
||||
* @return status of ability table {@link AbilityStatus}
|
||||
*/
|
||||
@Override
|
||||
public AbilityStatus trace(String connectionId) {
|
||||
if (connectionId == null) {
|
||||
return AbilityStatus.NOT_EXIST;
|
||||
}
|
||||
return abilityStatus.getOrDefault(connectionId, new AtomicReference<>(AbilityStatus.NOT_EXIST)).get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace the status of connection if <p>{@link AbilityStatus#INITIALIZING}<p/>, wake up if <p>{@link AbilityStatus#READY}<p/>
|
||||
* It will return if status is <p>{@link AbilityStatus#EXPIRED}<p/> or <p>{@link AbilityStatus#NOT_EXIST}<p/>
|
||||
*
|
||||
* @param connectionId connection id
|
||||
* @param source source status
|
||||
* @param target target status
|
||||
* @return if success
|
||||
*/
|
||||
@Override
|
||||
public boolean traceReadySyn(String connectionId) {
|
||||
AbilityStatus source = AbilityStatus.INITIALIZING;
|
||||
AbilityStatus target = AbilityStatus.READY;
|
||||
AtomicReference<AbilityStatus> atomicReference = abilityStatus.get(connectionId);
|
||||
// return if null
|
||||
if (atomicReference == null || atomicReference.get().equals(AbilityStatus.EXPIRED)) {
|
||||
return false;
|
||||
} else if (target == atomicReference.get()) {
|
||||
return true;
|
||||
}
|
||||
// try if status legal
|
||||
while (!atomicReference.get().equals(target) && atomicReference.get().equals(source)) {
|
||||
LockSupport.parkNanos(100L);
|
||||
// if expired
|
||||
if (atomicReference.get().equals(AbilityStatus.EXPIRED)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return atomicReference.get().equals(target);
|
||||
}
|
||||
|
||||
/**.
|
||||
* Invoking {@link AbilityHandlePreProcessor}
|
||||
*
|
||||
* @param source source ability table
|
||||
* @return result
|
||||
*/
|
||||
protected AbilityTable process(AbilityTable source) {
|
||||
// do nothing if no processor
|
||||
if (CollectionUtils.isEmpty(abilityHandlePreProcessors)) {
|
||||
return source;
|
||||
}
|
||||
// copy to advoid error process
|
||||
AbilityTable abilityTable = source;
|
||||
AbilityTable copy = new AbilityTable(source.getConnectionId(), new HashMap<>(source.getAbility()), source.isServer(), source.getVersion());
|
||||
for (AbilityHandlePreProcessor handler : abilityHandlePreProcessors) {
|
||||
try {
|
||||
abilityTable = handler.handle(abilityTable);
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("[AbilityHandlePostProcessor] Failed to invoke {} :{}",
|
||||
handler.getClass().getSimpleName(), t.getLocalizedMessage());
|
||||
// ensure normal operation
|
||||
abilityTable = copy;
|
||||
}
|
||||
}
|
||||
return abilityTable;
|
||||
}
|
||||
|
||||
/**.
|
||||
* They will be invoked before updating ability table, but the order in which
|
||||
* they are called cannot be guaranteed
|
||||
*
|
||||
* @param postProcessor PostProcessor instance
|
||||
*/
|
||||
@Override
|
||||
public void addPostProcessor(AbilityHandlePreProcessor postProcessor) {
|
||||
lockForProcessors.lock();
|
||||
try {
|
||||
abilityHandlePreProcessors.add(postProcessor);
|
||||
} finally {
|
||||
lockForProcessors.unlock();
|
||||
LOGGER.info("[AbilityHandlePostProcessor] registry handler: {}",
|
||||
postProcessor.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the manager
|
||||
*/
|
||||
@Override
|
||||
public void init() {
|
||||
// default init
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
/**
|
||||
* It should be invoked before destroy
|
||||
*/
|
||||
@Override
|
||||
public void destroy() {
|
||||
// default destroy
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
/**
|
||||
* Return ability table of current node
|
||||
*
|
||||
* @return ability table
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Boolean> getCurrentRunningAbility() {
|
||||
return new HashMap<>(this.currentRunningAbility);
|
||||
}
|
||||
|
||||
/**
|
||||
* base class for ability
|
||||
*/
|
||||
public abstract class AbilityEvent extends Event {
|
||||
|
||||
private static final long serialVersionUID = -123241121302761L;
|
||||
|
||||
protected AbilityEvent(){}
|
||||
|
||||
/**
|
||||
* connection id.
|
||||
*/
|
||||
private String connectionId;
|
||||
|
||||
/**
|
||||
* ability table
|
||||
*/
|
||||
private AbilityTable table;
|
||||
|
||||
|
||||
public String getConnectionId() {
|
||||
return connectionId;
|
||||
}
|
||||
|
||||
public void setConnectionId(String connectionId) {
|
||||
this.connectionId = connectionId;
|
||||
}
|
||||
|
||||
public AbilityTable getTable() {
|
||||
return table;
|
||||
}
|
||||
|
||||
public void setTable(AbilityTable table) {
|
||||
this.table = table;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* when a connection connected.
|
||||
*/
|
||||
public class AbilityComeEvent extends AbilityEvent {
|
||||
|
||||
private static final long serialVersionUID = -123241121302761L;
|
||||
|
||||
private AbilityComeEvent(){}
|
||||
}
|
||||
|
||||
/**
|
||||
* when a connection disconnected.
|
||||
*/
|
||||
public class AbilityExpiredEvent extends AbilityEvent {
|
||||
|
||||
private static final long serialVersionUID = -123241121212127619L;
|
||||
|
||||
private AbilityExpiredEvent(){}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,343 @@
|
||||
/*
|
||||
* Copyright 1999-2022 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.ability;
|
||||
|
||||
import com.alibaba.nacos.api.ability.constant.AbilityKey;
|
||||
import com.alibaba.nacos.api.ability.entity.AbilityTable;
|
||||
import com.alibaba.nacos.common.JustForTest;
|
||||
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
|
||||
import com.alibaba.nacos.common.ability.inter.AbilityHandlerRegistry;
|
||||
import com.alibaba.nacos.common.executor.ExecutorFactory;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.common.utils.MapUtil;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**.
|
||||
* @author Daydreamer
|
||||
* @description It is a relatively complete capability control center implementation.
|
||||
* @date 2022/7/12 19:18
|
||||
**/
|
||||
public abstract class DefaultAbilityControlManager extends AbstractAbilityControlManager
|
||||
implements AbilityHandlerRegistry {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAbilityControlManager.class);
|
||||
|
||||
/**
|
||||
* . These handlers will be invoked when the flag of ability change key: ability key from {@link AbilityKey} value:
|
||||
* componments who want to be invoked if its interested ability turn on/off
|
||||
*/
|
||||
private final Map<String, List<HandlerWithPriority>> handlerMappings = new ConcurrentHashMap<>();
|
||||
|
||||
/**.
|
||||
* run for HandlerMapping
|
||||
*/
|
||||
private final Executor simpleThreadPool = ExecutorFactory.newSingleExecutorService();
|
||||
|
||||
private final ReentrantLock lockForHandlerMappings = new ReentrantLock();
|
||||
|
||||
protected DefaultAbilityControlManager() {
|
||||
ThreadUtils.addShutdownHook(this::destroy);
|
||||
NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384);
|
||||
}
|
||||
|
||||
/**
|
||||
* . Turn on the ability whose key is <p>abilityKey</p>
|
||||
*
|
||||
* @param abilityKey ability key
|
||||
* @return if turn success
|
||||
*/
|
||||
@Override
|
||||
public boolean enableCurrentNodeAbility(String abilityKey) {
|
||||
return doTurn(true, abilityKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* . Turn off the ability whose key is <p>abilityKey</p>
|
||||
*
|
||||
* @param abilityKey ability key
|
||||
* @return if turn success
|
||||
*/
|
||||
@Override
|
||||
public boolean disableCurrentNodeAbility(String abilityKey) {
|
||||
return doTurn(false, abilityKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* . Turn on/off the ability of current node
|
||||
*
|
||||
* @param isOn is on
|
||||
* @param abilityKey ability key from {@link AbilityKey}
|
||||
* @return if turn success
|
||||
*/
|
||||
private boolean doTurn(boolean isOn, String abilityKey) {
|
||||
Boolean isEnabled = currentRunningAbility.get(abilityKey);
|
||||
// if not supporting this key
|
||||
if (isEnabled == null) {
|
||||
LOGGER.warn("[AbilityControlManager] Attempt to turn on/off a not existed ability!");
|
||||
return false;
|
||||
} else if (isOn == isEnabled) {
|
||||
// if already turn on/off
|
||||
return true;
|
||||
}
|
||||
// turn on/off
|
||||
currentRunningAbility.put(abilityKey, isOn);
|
||||
// handler mappings
|
||||
triggerHandlerMappingAsyn(abilityKey, isOn, this.handlerMappings);
|
||||
// notify event
|
||||
AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent();
|
||||
abilityUpdateEvent.setTable(new AbilityTable().setAbility(Collections.unmodifiableMap(currentRunningAbility)));
|
||||
abilityUpdateEvent.isOn = isOn;
|
||||
abilityUpdateEvent.abilityKey = abilityKey;
|
||||
NotifyCenter.publishEvent(abilityUpdateEvent);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the component which is managed by {@link AbstractAbilityControlManager}. if you are hoping that a
|
||||
* component will be invoked when turn on/off the ability whose key is <p>abilityKey</p>.
|
||||
*
|
||||
* @param abilityKey component key.
|
||||
* @param priority the higher the priority is, the faster it will be called.
|
||||
* @param handlerMapping component instance.
|
||||
*/
|
||||
@Override
|
||||
public void registerComponent(String abilityKey, HandlerMapping handlerMapping, int priority) {
|
||||
doRegisterComponent(abilityKey, handlerMapping, this.handlerMappings, lockForHandlerMappings, priority, currentRunningAbility);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int removeComponent(String abilityKey, Class<? extends HandlerMapping> handlerMappingClazz) {
|
||||
return doRemove(abilityKey, handlerMappingClazz, lockForHandlerMappings, handlerMappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void destroy() {
|
||||
LOGGER.warn("[DefaultAbilityControlManager] - Start destroying...");
|
||||
((ThreadPoolExecutor) simpleThreadPool).shutdown();
|
||||
if (MapUtil.isNotEmpty(handlerMappings)) {
|
||||
handlerMappings.keySet().forEach(key -> doTriggerSyn(key, false, handlerMappings));
|
||||
}
|
||||
// hook
|
||||
doDestroy();
|
||||
LOGGER.warn("[DefaultAbilityControlManager] - Destruction of the end");
|
||||
}
|
||||
|
||||
/**.
|
||||
* hook for subclass
|
||||
*/
|
||||
protected void doDestroy() {
|
||||
// for server ability manager
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the component instance of <p>handlerMappingClazz</p>.
|
||||
*
|
||||
* @param abilityKey ability key from {@link com.alibaba.nacos.api.ability.constant.AbilityKey}
|
||||
* @param handlerMappingClazz implement of {@link HandlerMapping}
|
||||
* @param lock lock for operation
|
||||
* @param handlerMappingsMap handler collection map
|
||||
* @return the count of components have removed
|
||||
*/
|
||||
protected int doRemove(String abilityKey, Class<? extends HandlerMapping> handlerMappingClazz, Lock lock,
|
||||
Map<String, List<HandlerWithPriority>> handlerMappingsMap) {
|
||||
List<HandlerWithPriority> handlerMappings = handlerMappingsMap.get(abilityKey);
|
||||
if (CollectionUtils.isEmpty(handlerMappings)) {
|
||||
return 0;
|
||||
}
|
||||
lock.lock();
|
||||
try {
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
handlerMappings.removeIf(item -> {
|
||||
if (item.handlerMapping.getClass().equals(handlerMappingClazz)) {
|
||||
count.getAndIncrement();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
return count.get();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int removeAll(String abilityKey) {
|
||||
List<HandlerWithPriority> remove = this.handlerMappings.remove(abilityKey);
|
||||
return Optional.ofNullable(remove).orElse(Collections.emptyList()).size();
|
||||
}
|
||||
|
||||
/**.
|
||||
* Register the component into handlerMappings locking by lockForHandlerMappings to ensure concurrency security.
|
||||
*
|
||||
* @param abilityKey ability key
|
||||
* @param handlerMapping component instance.
|
||||
* @param handlerMappings container
|
||||
* @param lockForHandlerMappings lock to ensure concurrency
|
||||
* @param abilityTable behavioral basis of handler
|
||||
*/
|
||||
protected void doRegisterComponent(String abilityKey, HandlerMapping handlerMapping,
|
||||
Map<String, List<HandlerWithPriority>> handlerMappings, Lock lockForHandlerMappings,
|
||||
int priority, Map<String, Boolean> abilityTable) {
|
||||
if (!currentRunningAbility.containsKey(abilityKey)) {
|
||||
LOGGER.warn("[AbilityHandlePostProcessor] Failed to register processor: {}, because illegal key!",
|
||||
handlerMapping.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
// legal key
|
||||
lockForHandlerMappings.lock();
|
||||
try {
|
||||
List<HandlerWithPriority> handlers = handlerMappings.getOrDefault(abilityKey, new CopyOnWriteArrayList<>());
|
||||
HandlerWithPriority handlerWithPriority = new HandlerWithPriority(handlerMapping, priority);
|
||||
handlers.add(handlerWithPriority);
|
||||
handlerMappings.put(abilityKey, handlers);
|
||||
// choose behavior
|
||||
// enable default
|
||||
if (abilityTable.getOrDefault(abilityKey, false)) {
|
||||
handlerMapping.enable();
|
||||
} else {
|
||||
handlerMapping.disable();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error("[DefaultAbilityControlManager] Fail to register handler: {}", handlerMapping.getClass().getSimpleName());
|
||||
} finally {
|
||||
lockForHandlerMappings.unlock();
|
||||
LOGGER.info("[DefaultAbilityControlManager] Successfully registered processor: {}",
|
||||
handlerMapping.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke componments which linked to ability key asyn.
|
||||
*
|
||||
* @param key ability key from {@link AbilityKey}
|
||||
* @param isEnabled turn on/off
|
||||
* @param handlerMappingsMap handler collection
|
||||
*/
|
||||
protected void triggerHandlerMappingAsyn(String key, boolean isEnabled,
|
||||
Map<String, List<HandlerWithPriority>> handlerMappingsMap) {
|
||||
simpleThreadPool.execute(() -> doTriggerSyn(key, isEnabled, handlerMappingsMap));
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke componments which linked to ability key syn.
|
||||
*
|
||||
* @param key ability key from {@link AbilityKey}
|
||||
* @param isEnabled turn on/off
|
||||
* @param handlerMappingsMap handler collection
|
||||
*/
|
||||
protected void doTriggerSyn(String key, boolean isEnabled,
|
||||
Map<String, List<HandlerWithPriority>> handlerMappingsMap) {
|
||||
List<HandlerWithPriority> handlerWithPriorities = handlerMappingsMap.get(key);
|
||||
// return if empty
|
||||
if (CollectionUtils.isEmpty(handlerWithPriorities)) {
|
||||
return;
|
||||
}
|
||||
Collections.sort(handlerWithPriorities);
|
||||
// invoked all
|
||||
handlerWithPriorities.forEach(handlerMappingWithPriorities -> {
|
||||
// any error from current handler does not affect other handler
|
||||
HandlerMapping handlerMapping = handlerMappingWithPriorities.handlerMapping;
|
||||
try {
|
||||
if (isEnabled) {
|
||||
handlerMapping.enable();
|
||||
} else {
|
||||
handlerMapping.disable();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("[HandlerMapping] Failed to invoke {} :{}", handlerMapping.getClass().getSimpleName(),
|
||||
t.getLocalizedMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@JustForTest
|
||||
protected Map<String, List<HandlerWithPriority>> handlerMapping() {
|
||||
return this.handlerMappings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Support priority handler.
|
||||
*/
|
||||
protected class HandlerWithPriority implements Comparable<HandlerWithPriority> {
|
||||
|
||||
/**.
|
||||
* Decorated
|
||||
*/
|
||||
public HandlerMapping handlerMapping;
|
||||
|
||||
/**.
|
||||
* the higher the priority, the faster it will be called
|
||||
*/
|
||||
public int priority;
|
||||
|
||||
public HandlerWithPriority(HandlerMapping handlerMapping, int priority) {
|
||||
this.handlerMapping = handlerMapping;
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(HandlerWithPriority o) {
|
||||
return o.priority - this.priority;
|
||||
}
|
||||
}
|
||||
|
||||
/**.
|
||||
* notify when current node ability changing
|
||||
*/
|
||||
public class AbilityUpdateEvent extends AbilityEvent {
|
||||
|
||||
private static final long serialVersionUID = -1232411212311111L;
|
||||
|
||||
private String abilityKey;
|
||||
|
||||
private boolean isOn;
|
||||
|
||||
private AbilityUpdateEvent(){}
|
||||
|
||||
public String getAbilityKey() {
|
||||
return abilityKey;
|
||||
}
|
||||
|
||||
public void setAbilityKey(String abilityKey) {
|
||||
this.abilityKey = abilityKey;
|
||||
}
|
||||
|
||||
public boolean isOn() {
|
||||
return isOn;
|
||||
}
|
||||
|
||||
public void setOn(boolean on) {
|
||||
isOn = on;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright 1999-2022 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.ability.discover;
|
||||
|
||||
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**.
|
||||
* @author Daydreamer
|
||||
* @description It is spi loader to load {@link AbilityHandlePreProcessor}
|
||||
* @date 2022/8/25 18:24
|
||||
**/
|
||||
public class AbilityHandleLoader {
|
||||
|
||||
private final Collection<AbilityHandlePreProcessor> initializers;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbilityHandleLoader.class);
|
||||
|
||||
public AbilityHandleLoader() {
|
||||
initializers = new HashSet<>();
|
||||
for (AbilityHandlePreProcessor preProcessor : NacosServiceLoader.load(AbilityHandlePreProcessor.class)) {
|
||||
initializers.add(preProcessor);
|
||||
LOGGER.info("Load {} for AbilityHandlePreProcessor", preProcessor.getClass().getCanonicalName());
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<AbilityHandlePreProcessor> getInitializers() {
|
||||
return initializers;
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright 1999-2022 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.ability.discover;
|
||||
|
||||
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
|
||||
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
|
||||
import com.alibaba.nacos.common.ability.inter.AbilityControlManager;
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.ServiceConfigurationError;
|
||||
|
||||
/**
|
||||
* This class is used to discover {@link AbstractAbilityControlManager} implements. All the
|
||||
* ability operation will be finish in this singleton.
|
||||
*
|
||||
* @author Daydreamer
|
||||
* @date 2022/7/14 19:58
|
||||
**/
|
||||
public class NacosAbilityManagerHolder {
|
||||
|
||||
/**.
|
||||
* private constructor
|
||||
*/
|
||||
private NacosAbilityManagerHolder() {
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(NacosAbilityManagerHolder.class);
|
||||
|
||||
/**.
|
||||
* singleton
|
||||
*/
|
||||
private static DefaultAbilityControlManager abstractAbilityControlManager;
|
||||
|
||||
static {
|
||||
// spi discover implement
|
||||
Collection<DefaultAbilityControlManager> load = null;
|
||||
try {
|
||||
// if server
|
||||
load = NacosServiceLoader.load(DefaultAbilityControlManager.class);
|
||||
} catch (ServiceConfigurationError e) {
|
||||
// if client or not ability control manager
|
||||
load = NacosServiceLoader.load(DefaultAbilityControlManager.class);
|
||||
}
|
||||
// the priority of the server is higher
|
||||
if (load.size() > 0) {
|
||||
load.forEach(clazz -> {
|
||||
abstractAbilityControlManager = clazz;
|
||||
});
|
||||
LOGGER.info("[AbilityControlManager] Successfully initialize AbilityControlManager");
|
||||
// init pre processor
|
||||
AbilityHandleLoader loader = new AbilityHandleLoader();
|
||||
loader.getInitializers().forEach(processor -> abstractAbilityControlManager.addPostProcessor(processor));
|
||||
}
|
||||
}
|
||||
|
||||
/**.
|
||||
* get nacos ability control manager
|
||||
*
|
||||
* @return BaseAbilityControlManager
|
||||
*/
|
||||
public static DefaultAbilityControlManager getInstance() {
|
||||
return abstractAbilityControlManager;
|
||||
}
|
||||
|
||||
/**.
|
||||
* Return the target type of ability manager
|
||||
*
|
||||
* @param clazz clazz
|
||||
* @param <T> target type
|
||||
* @return AbilityControlManager
|
||||
*/
|
||||
public static <T extends AbilityControlManager> T getInstance(Class<T> clazz) {
|
||||
return clazz.cast(abstractAbilityControlManager);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user