Remove the AbilityHandlePreProcessor and AbilityStatus. Change the way to get or remove AbilityTable for RpcClient.

This commit is contained in:
Daydreamer-ia 2022-08-30 21:36:24 +08:00
parent de9e113e24
commit a47f052c90
18 changed files with 116 additions and 504 deletions

View File

@ -1,46 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.constant;
/**.
* @author Daydreamer
* @description This enum is used to track the status of the ability table.
* @date 2022/7/13 14:11
**/
public enum AbilityStatus {
/**
* It means that the ability table does not exist in the current node.
*/
NOT_EXIST,
/**
* It means that current node has received the ability table and the table is initializing by AbilityPostProcessor.
*/
INITIALIZING,
/**
* It means that the ability table is ready.
*/
READY,
/**
* It means that the ability table will be removed soon.
*/
EXPIRED
}

View File

@ -16,7 +16,6 @@
package com.alibaba.nacos.client.ability;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
@ -41,17 +40,6 @@ public class ClientAbilityControlManager extends DefaultAbilityControlManager {
return false;
}
AbilityTable abilityTable = nodeAbilityTable.get(connectionId);
// it is null, check if initialing
if (abilityTable == null && AbilityStatus.INITIALIZING.equals(trace(connectionId))) {
// wait for ready
boolean finish = traceReadySyn(connectionId);
// if expired
if (!finish) {
return false;
} else {
abilityTable = nodeAbilityTable.get(connectionId);
}
}
// false if null
return abilityTable != null
&& Optional.ofNullable(abilityTable.getAbility())

View File

@ -37,6 +37,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.plugin.auth.api.RequestResource;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
@ -629,13 +630,13 @@ public class ClientWorker implements Closeable {
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected() {
public void onConnected(Connection connection) {
LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
notifyListenConfig();
}
@Override
public void onDisConnect() {
public void onDisConnect(Connection connection) {
String taskId = rpcClientInner.getLabels().get("taskId");
LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName());
Collection<CacheData> values = cacheMap.get().values();

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import java.util.HashSet;
@ -73,13 +74,13 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
}
@Override
public void onConnected() {
public void onConnected(Connection connection) {
connected = true;
LogUtils.NAMING_LOGGER.info("Grpc connection connect");
}
@Override
public void onDisConnect() {
public void onDisConnect(Connection connection) {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
synchronized (registeredInstances) {

View File

@ -0,0 +1,36 @@
package com.alibaba.nacos.client.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
public class TestConnection extends Connection {
public TestConnection(RpcClient.ServerInfo serverInfo) {
super(serverInfo);
}
@Override
public Response request(Request request, long timeoutMills) throws NacosException {
return null;
}
@Override
public RequestFuture requestFuture(Request request) throws NacosException {
return null;
}
@Override
public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
}
@Override
public void close() {
}
}

View File

@ -17,10 +17,12 @@
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.remote.TestConnection;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.BatchInstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.utils.ReflectUtils;
import org.junit.After;
import org.junit.Before;
@ -68,13 +70,13 @@ public class NamingGrpcRedoServiceTest {
@Test
public void testOnConnected() {
assertFalse(redoService.isConnected());
redoService.onConnected();
redoService.onConnected(new TestConnection(new RpcClient.ServerInfo()));
assertTrue(redoService.isConnected());
}
@Test
public void testOnDisConnect() {
redoService.onConnected();
redoService.onConnected(new TestConnection(new RpcClient.ServerInfo()));
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
redoService.instanceRegistered(SERVICE, GROUP);
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
@ -82,7 +84,7 @@ public class NamingGrpcRedoServiceTest {
assertTrue(redoService.isConnected());
assertTrue(redoService.findInstanceRedoData().isEmpty());
assertTrue(redoService.findSubscriberRedoData().isEmpty());
redoService.onDisConnect();
redoService.onDisConnect(new TestConnection(new RpcClient.ServerInfo()));
assertFalse(redoService.isConnected());
assertFalse(redoService.findInstanceRedoData().isEmpty());
assertFalse(redoService.findSubscriberRedoData().isEmpty());

View File

@ -17,23 +17,16 @@
package com.alibaba.nacos.common.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
import com.alibaba.nacos.common.ability.inter.TraceableAbilityControlManager;
import com.alibaba.nacos.common.ability.inter.AbilityControlManager;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**.
@ -42,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
* @date 2022/7/12 19:18
**/
@SuppressWarnings("all")
public abstract class AbstractAbilityControlManager implements TraceableAbilityControlManager {
public abstract class AbstractAbilityControlManager implements AbilityControlManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class);
@ -63,17 +56,6 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
*/
protected final Map<String, AbilityTable> nodeAbilityTable = new ConcurrentHashMap<>();
/**.
* These handlers will be invoke before combine the ability table
*/
private final List<AbilityHandlePreProcessor> abilityHandlePreProcessors = new ArrayList<>();
/**
* This map is used to trace the status of ability table.
* Its status should be update after {@link #addNewTable(AbilityTable)} and {@link #removeTable(String)}
*/
protected final Map<String, AtomicReference<AbilityStatus>> abilityStatus = new ConcurrentHashMap<>();
private final ReentrantLock lockForProcessors = new ReentrantLock();
private final ReentrantLock lockForAbilityTable = new ReentrantLock();
@ -124,33 +106,18 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
if (contains(connectionId)) {
return;
}
// update status
abilityStatus.put(connectionId, new AtomicReference<>(AbilityStatus.INITIALIZING));
// handle ability table before joining current node
AbilityTable processed = process(table);
// hook method
add(processed);
add(table);
// add to node
nodeAbilityTable.put(connectionId, table);
} finally {
lockForAbilityTable.unlock();
}
// update status
AtomicReference<AbilityStatus> abilityStatusAtomicReference = abilityStatus.get(table.getConnectionId());
if (abilityStatusAtomicReference != null) {
// try one time
// do nothing if AbilityStatus == Expired
// if ready
if(abilityStatusAtomicReference.compareAndSet(AbilityStatus.INITIALIZING, AbilityStatus.READY)) {
// publish event to subscriber
AbilityComeEvent updateEvent = new AbilityComeEvent();
updateEvent.setConnectionId(table.getConnectionId());
updateEvent.setTable(table);
NotifyCenter.publishEvent(updateEvent);
}
} else {
LOGGER.warn("[AbiityControlManager] Cannot get connection status after processing ability table, possible reason is that the network is unstable");
}
// publish event to subscriber
AbilityComeEvent updateEvent = new AbilityComeEvent();
updateEvent.setConnectionId(table.getConnectionId());
updateEvent.setTable(table);
NotifyCenter.publishEvent(updateEvent);
}
/**
@ -171,12 +138,6 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
if (!nodeAbilityTable.containsKey(connectionId)) {
return;
}
nodeAbilityTable.get(connectionId);
// update status
abilityStatus.computeIfPresent(connectionId, (k, v) -> {
v.set(AbilityStatus.EXPIRED);
return v;
});
// hook method
remove(connectionId);
// remove
@ -184,8 +145,6 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
} finally {
lockForAbilityTable.unlock();
}
// remove status
abilityStatus.remove(connectionId);
// publish event
if (removingTable != null) {
AbilityExpiredEvent expiredEvent = new AbilityExpiredEvent();
@ -195,7 +154,6 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
}
}
/**
* Register a new ability table. This is a ThreadSafe method for {@link AbstractAbilityControlManager#remove(String)}.
*
@ -222,97 +180,6 @@ public abstract class AbstractAbilityControlManager implements TraceableAbilityC
return nodeAbilityTable.containsKey(connectionId);
}
/**
* Get the status of the ability table.
*
* @param connectionId connection id
* @return status of ability table {@link AbilityStatus}
*/
@Override
public AbilityStatus trace(String connectionId) {
if (connectionId == null) {
return AbilityStatus.NOT_EXIST;
}
return abilityStatus.getOrDefault(connectionId, new AtomicReference<>(AbilityStatus.NOT_EXIST)).get();
}
/**
* Trace the status of connection if <p>{@link AbilityStatus#INITIALIZING}<p/>, wake up if <p>{@link AbilityStatus#READY}<p/>
* It will return if status is <p>{@link AbilityStatus#EXPIRED}<p/> or <p>{@link AbilityStatus#NOT_EXIST}<p/>
*
* @param connectionId connection id
* @param source source status
* @param target target status
* @return if success
*/
@Override
public boolean traceReadySyn(String connectionId) {
AbilityStatus source = AbilityStatus.INITIALIZING;
AbilityStatus target = AbilityStatus.READY;
AtomicReference<AbilityStatus> atomicReference = abilityStatus.get(connectionId);
// return if null
if (atomicReference == null || atomicReference.get().equals(AbilityStatus.EXPIRED)) {
return false;
} else if (target == atomicReference.get()) {
return true;
}
// try if status legal
while (!atomicReference.get().equals(target) && atomicReference.get().equals(source)) {
LockSupport.parkNanos(100L);
// if expired
if (atomicReference.get().equals(AbilityStatus.EXPIRED)) {
return false;
}
}
return atomicReference.get().equals(target);
}
/**.
* Invoking {@link AbilityHandlePreProcessor}
*
* @param source source ability table
* @return result
*/
protected AbilityTable process(AbilityTable source) {
// do nothing if no processor
if (CollectionUtils.isEmpty(abilityHandlePreProcessors)) {
return source;
}
// copy to advoid error process
AbilityTable abilityTable = source;
AbilityTable copy = new AbilityTable(source.getConnectionId(), new HashMap<>(source.getAbility()), source.isServer(), source.getVersion());
for (AbilityHandlePreProcessor handler : abilityHandlePreProcessors) {
try {
abilityTable = handler.handle(abilityTable);
} catch (Throwable t) {
LOGGER.warn("[AbilityHandlePostProcessor] Failed to invoke {} :{}",
handler.getClass().getSimpleName(), t.getLocalizedMessage());
// ensure normal operation
abilityTable = copy;
}
}
return abilityTable;
}
/**.
* They will be invoked before updating ability table, but the order in which
* they are called cannot be guaranteed
*
* @param postProcessor PostProcessor instance
*/
@Override
public void addPostProcessor(AbilityHandlePreProcessor postProcessor) {
lockForProcessors.lock();
try {
abilityHandlePreProcessors.add(postProcessor);
} finally {
lockForProcessors.unlock();
LOGGER.info("[AbilityHandlePostProcessor] registry handler: {}",
postProcessor.getClass().getSimpleName());
}
}
/**
* Initialize the manager
*/

View File

@ -1,49 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.discover;
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
/**.
* @author Daydreamer
* @description It is spi loader to load {@link AbilityHandlePreProcessor}
* @date 2022/8/25 18:24
**/
public class AbilityHandleLoader {
private final Collection<AbilityHandlePreProcessor> initializers;
private static final Logger LOGGER = LoggerFactory.getLogger(AbilityHandleLoader.class);
public AbilityHandleLoader() {
initializers = new HashSet<>();
for (AbilityHandlePreProcessor preProcessor : NacosServiceLoader.load(AbilityHandlePreProcessor.class)) {
initializers.add(preProcessor);
LOGGER.info("Load {} for AbilityHandlePreProcessor", preProcessor.getClass().getCanonicalName());
}
}
public Collection<AbilityHandlePreProcessor> getInitializers() {
return initializers;
}
}

View File

@ -64,9 +64,6 @@ public class NacosAbilityManagerHolder {
abstractAbilityControlManager = clazz;
});
LOGGER.info("[AbilityControlManager] Successfully initialize AbilityControlManager");
// init pre processor
AbilityHandleLoader loader = new AbilityHandleLoader();
loader.getInitializers().forEach(processor -> abstractAbilityControlManager.addPostProcessor(processor));
}
}

View File

@ -1,36 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.handler;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
/**.
* @author Daydreamer
* @description This handler will should be invoked before ability table joining current node.
* @date 2022/7/12 19:24
**/
public interface AbilityHandlePreProcessor {
/**
* Handling before joining current node.
*
* @param source source ability handler
* @return result table
*/
AbilityTable handle(AbilityTable source);
}

View File

@ -18,7 +18,6 @@ package com.alibaba.nacos.common.ability.inter;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
import java.util.Map;
@ -75,14 +74,6 @@ public interface AbilityControlManager {
*/
Map<String, Boolean> getCurrentRunningAbility();
/**.
* They will be invoked before updating ability table, but the order in which
* they are called cannot be guaranteed
*
* @param postProcessor PostProcessor instance
*/
void addPostProcessor(AbilityHandlePreProcessor postProcessor);
/**.
* Initialize the manager
*/

View File

@ -1,44 +0,0 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.inter;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
/**.
* @author Daydreamer
* @description It provides the capability to trace the state of AbilityTable for the {@link AbilityControlManager}
* @date 2022/8/10 23:30
**/
public interface TraceableAbilityControlManager extends AbilityControlManager {
/**
* Get the status of the ability table.
*
* @param connectionId connection id
* @return status of ability table {@link AbilityStatus}
*/
AbilityStatus trace(String connectionId);
/**.
* Trace the status of connection if {@link AbilityStatus#INITIALIZING}, wake up if {@link AbilityStatus#READY}
* It will return if status is {@link AbilityStatus#EXPIRED} or {@link AbilityStatus#NOT_EXIST}
*
* @param connectionId connection id
* @return if success to {@link AbilityStatus#READY}
*/
boolean traceReadySyn(String connectionId);
}

View File

@ -0,0 +1,24 @@
package com.alibaba.nacos.common.ability.listener;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
/**.
* @author Daydreamer
* @description This listener is used for remove ability table if disconnected.
* @date 2022/8/30 22:00
**/
public class ClientAbilityEventListener implements ConnectionEventListener {
@Override
public void onConnected(Connection connection) {
// nothing to do
}
@Override
public void onDisConnect(Connection connection) {
NacosAbilityManagerHolder.getInstance().removeTable(connection.getConnectionId());
}
}

View File

@ -25,11 +25,15 @@ public interface ConnectionEventListener {
/**
* notify when connected to server.
*
* @param connection connection has connected
*/
public void onConnected();
public void onConnected(Connection connection);
/**
* notify when disconnected to server.
*
* @param connection connection has disconnected
*/
public void onDisConnect();
public void onDisConnect(Connection connection);
}

View File

@ -16,8 +16,6 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
@ -30,8 +28,8 @@ import com.alibaba.nacos.api.remote.response.ClientDetectionResponse;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.ability.DefaultAbilityControlManager;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.ability.listener.ClientAbilityEventListener;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.PayloadRegistry;
@ -53,7 +51,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -81,8 +78,6 @@ public abstract class RpcClient implements Closeable {
private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
protected final BlockingQueue<RecServerAbilityContext> recServerAbilitySignal = new LinkedBlockingQueue<>();
protected volatile Connection currentConnection;
protected Map<String, String> labels = new HashMap<>();
@ -191,15 +186,17 @@ public abstract class RpcClient implements Closeable {
/**
* Notify when client disconnected.
*
* @param connection connection has disconnected
*/
protected void notifyDisConnected() {
protected void notifyDisConnected(Connection connection) {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onDisConnect();
connectionEventListener.onDisConnect(connection);
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
@ -209,15 +206,17 @@ public abstract class RpcClient implements Closeable {
/**
* Notify when client new connected.
*
* @param connection connection has connected
*/
protected void notifyConnected() {
protected void notifyConnected(Connection connection) {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onConnected();
connectionEventListener.onConnected(connection);
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
@ -285,7 +284,10 @@ public abstract class RpcClient implements Closeable {
return;
}
clientEventExecutor = new ScheduledThreadPoolExecutor(3, r -> {
// add listener to remove expired ability table
registerConnectionListener(new ClientAbilityEventListener());
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
@ -299,9 +301,9 @@ public abstract class RpcClient implements Closeable {
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
notifyConnected(take.connection);
} else if (take.isDisConnected()) {
notifyDisConnected();
notifyDisConnected(take.connection);
}
} catch (Throwable e) {
// Do nothing
@ -309,32 +311,6 @@ public abstract class RpcClient implements Closeable {
}
});
// receive ability table
clientEventExecutor.submit(() -> {
// save server ability table or remove
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
try {
RecServerAbilityContext take = recServerAbilitySignal.take();
// avoid interrupted should not null
if (take != null) {
DefaultAbilityControlManager manager = NacosAbilityManagerHolder.getInstance();
// remove
manager.removeTable(take.oldConnectionId);
// and add
manager.addNewTable(
new AbilityTable()
.setAbility(take.abilityTable)
.setConnectionId(take.connectionId)
.setVersion(take.version)
.setServer(true)
);
}
} catch (InterruptedException e) {
// do nothing
}
}
});
clientEventExecutor.submit(() -> {
while (true) {
try {
@ -427,30 +403,12 @@ public abstract class RpcClient implements Closeable {
}
// try to wait for the ability table to be added, but it will check three time at most
AbilityStatus status = AbilityStatus.NOT_EXIST;
int reCheckTimes = RETRY_TIMES;
while (!isShutdown() && connectToServer != null && status.equals(AbilityStatus.NOT_EXIST) && reCheckTimes > 0) {
LockSupport.parkNanos(100L);
status = NacosAbilityManagerHolder.getInstance().trace(connectToServer.getConnectionId());
reCheckTimes--;
}
// judge whether support ability table
// wait to get ability table if initializing, it will pass if server doesn't support ability table or table ready
boolean connected = true;
while (!isShutdown() && connectToServer != null && AbilityStatus.INITIALIZING.equals(status) && connected) {
// wait for complete
// return false if disconnect
connected = NacosAbilityManagerHolder.getInstance().traceReadySyn(connectToServer.getConnectionId());
}
if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
} else {
switchServerAsync();
}
@ -576,28 +534,6 @@ public abstract class RpcClient implements Closeable {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
name, serverInfo.getAddress(), connectionNew.getConnectionId());
// try to wait for the ability table to be added, but it will check three time at most
AbilityStatus status = AbilityStatus.NOT_EXIST;
int reCheckTimes = RETRY_TIMES;
while (status.equals(AbilityStatus.NOT_EXIST) && reCheckTimes > 0) {
LockSupport.parkNanos(100L);
status = NacosAbilityManagerHolder.getInstance().trace(connectionNew.getConnectionId());
reCheckTimes--;
}
// judge whether support ability table
// wait to get ability table if initializing, it will pass if server doesn't support ability table or table ready
boolean connected = true;
while (!isShutdown() && AbilityStatus.INITIALIZING.equals(status) && connected) {
// wait for complete
// return false if disconnect
connected = NacosAbilityManagerHolder.getInstance()
.traceReadySyn(connectionNew.getConnectionId());
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to get server ability table, connectionId = {}",
name, connectionNew.getConnectionId());
// successfully create a new connect
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
@ -610,7 +546,7 @@ public abstract class RpcClient implements Closeable {
currentConnection = connectionNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
return;
}
@ -671,7 +607,7 @@ public abstract class RpcClient implements Closeable {
if (connection != null) {
LOGGER.info("Close current connection " + connection.getConnectionId());
connection.close();
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED, connection));
}
}
@ -1076,8 +1012,11 @@ public abstract class RpcClient implements Closeable {
int eventType;
public ConnectionEvent(int eventType) {
Connection connection;
public ConnectionEvent(int eventType, Connection connection) {
this.eventType = eventType;
this.connection = connection;
}
public boolean isConnected() {

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
@ -28,6 +29,7 @@ import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.api.utils.AbilityTableUtils;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
@ -317,10 +319,11 @@ public abstract class GrpcClient extends RpcClient {
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
Map<String, Boolean> abilityTable = AbilityTableUtils
.getAbilityTableBy(serverCheckResponse.getAbilities(), AbilityKey.offset());
String oldConnId = currentConnection == null ? null : currentConnection.getConnectionId();
RecServerAbilityContext recServerAbilityContext = new RecServerAbilityContext(serverCheckResponse.getConnectionId(),
abilityTable, null, oldConnId);
recServerAbilitySignal.offer(recServerAbilityContext);
AbilityTable table = new AbilityTable();
table.setServer(true)
.setConnectionId(serverCheckResponse.getConnectionId())
.setAbility(abilityTable);
NacosAbilityManagerHolder.getInstance().addNewTable(table);
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());

View File

@ -16,7 +16,6 @@
package com.alibaba.nacos.core.ability.control;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
@ -80,17 +79,6 @@ public class ServerAbilityControlManager extends DefaultAbilityControlManager im
return false;
}
AbilityTable abilityTable = nodeAbilityTable.get(connectionId);
// it is null, check if initialing
if (abilityTable == null && AbilityStatus.INITIALIZING.equals(trace(connectionId))) {
// wait for ready
boolean finish = traceReadySyn(connectionId);
// if expired
if (!finish) {
return false;
} else {
abilityTable = nodeAbilityTable.get(connectionId);
}
}
// false if null
return abilityTable != null
&& Optional.ofNullable(abilityTable.getAbility())

View File

@ -16,9 +16,7 @@
package com.alibaba.nacos.test.ability;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.entity.AbilityTable;
import com.alibaba.nacos.common.ability.handler.AbilityHandlePreProcessor;
import com.alibaba.nacos.common.ability.handler.HandlerMapping;
import org.junit.Assert;
import org.junit.Before;
@ -118,9 +116,9 @@ public class AbilityControlManagerTest {
clientTable.setAbility(clientTa);
clientTable.setServer(true);
clientAbilityControlManager.addNewTable(clientTable);
Assert.assertEquals(AbilityStatus.READY, clientAbilityControlManager.trace("test-01111"));
Assert.assertTrue(clientAbilityControlManager.contains(clientTable.getConnectionId()));
clientAbilityControlManager.removeTable("test-01111");
Assert.assertEquals(AbilityStatus.NOT_EXIST, clientAbilityControlManager.trace("test-01111"));
Assert.assertFalse(clientAbilityControlManager.contains(clientTable.getConnectionId()));
}
@Test
@ -214,36 +212,6 @@ public class AbilityControlManagerTest {
});
}
@Test
public void testPre() {
clientAbilityControlManager.addPostProcessor(new TestPreHandler());
clientAbilityControlManager.enableCurrentNodeAbility("stop-raft");
AbilityTable abilityTable = new AbilityTable();
abilityTable.setConnectionId("1111");
HashMap<String, Boolean> table = new HashMap<>();
table.put("stop-raft", false);
abilityTable.setAbility(table);
clientAbilityControlManager.addNewTable(abilityTable);
Assert.assertFalse(clientAbilityControlManager.isSupport("1111", "stop-raft"));
}
@Test
public void testStateSyn() {
// register a processing long time handler
AbilityTable abilityTable = new AbilityTable();
abilityTable.setConnectionId("1111");
HashMap<String, Boolean> table = new HashMap<>();
table.put("stop-raft", false);
abilityTable.setAbility(table);
clientAbilityControlManager.addPostProcessor(new TestStatusPreHandler());
// 追踪状态
long begin = System.currentTimeMillis();
clientAbilityControlManager.addNewTable(abilityTable);
Assert.assertTrue(clientAbilityControlManager.traceReadySyn("1111"));
long end = System.currentTimeMillis();
Assert.assertTrue(end - begin > 5000);
}
@Test
public void testPriority() throws InterruptedException {
TestServerAbilityControlManager testServerAbilityControlManager = new TestServerAbilityControlManager();
@ -353,28 +321,6 @@ public class AbilityControlManagerTest {
}
class TestPreHandler implements AbilityHandlePreProcessor {
@Override
public AbilityTable handle(AbilityTable source) {
source.setConnectionId("pre-handle");
return source;
}
}
class TestStatusPreHandler implements AbilityHandlePreProcessor {
@Override
public AbilityTable handle(AbilityTable source) {
try {
// block
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return source;
}
}
}