From fca259fd388e9c7b28595a443ee380d63a3c3256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E7=BF=8A=20SionYang?= <263976490@qq.com> Date: Wed, 21 Jul 2021 10:08:10 +0800 Subject: [PATCH] [ISSUE#6384] Add redo feature for nacos client naming (#6395) * Add RedoData * Add NamingGrpcRedoService * Add RedoScheduledTask * Refactor NamingGrpcClientProxy to use new redo service. * For PMD * Remove NamingGrpcConnectionEventListener.java --- .../remote/NamingClientProxyDelegate.java | 2 +- .../remote/gprc/NamingGrpcClientProxy.java | 74 ++- .../NamingGrpcConnectionEventListener.java | 171 ------- .../gprc/redo/NamingGrpcRedoService.java | 257 ++++++++++ .../remote/gprc/redo/RedoScheduledTask.java | 134 +++++ .../gprc/redo/data/InstanceRedoData.java | 45 ++ .../remote/gprc/redo/data/RedoData.java | 130 +++++ .../gprc/redo/data/SubscriberRedoData.java | 43 ++ .../gprc/NamingGrpcClientProxyTest.java | 481 ++++-------------- ...NamingGrpcConnectionEventListenerTest.java | 248 --------- .../gprc/redo/NamingGrpcRedoServiceTest.java | 202 ++++++++ .../gprc/redo/RedoScheduledTaskTest.java | 154 ++++++ 12 files changed, 1139 insertions(+), 802 deletions(-) delete mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTask.java create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/InstanceRedoData.java create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/RedoData.java create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/SubscriberRedoData.java delete mode 100644 client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListenerTest.java create mode 100644 client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java create mode 100644 client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTaskTest.java diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java index 28911beca..bdff6c871 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java @@ -141,11 +141,11 @@ public class NamingClientProxyDelegate implements NamingClientProxy { public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); + serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (null == result) { result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } - serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); serviceInfoHolder.processServiceInfo(result); return result; } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java index 210b22b96..bbd5d9a25 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java @@ -40,6 +40,7 @@ import com.alibaba.nacos.api.selector.SelectorType; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.event.ServerListChangedEvent; import com.alibaba.nacos.client.naming.remote.AbstractNamingClientProxy; +import com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService; import com.alibaba.nacos.client.security.SecurityProxy; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; @@ -72,7 +73,7 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { private final RpcClient rpcClient; - private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener; + private final NamingGrpcRedoService redoService; public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException { @@ -84,15 +85,15 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING); this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels); - this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); + this.redoService = new NamingGrpcRedoService(this); start(serverListFactory, serviceInfoHolder); } private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { rpcClient.serverListFactory(serverListFactory); - rpcClient.start(); + rpcClient.registerConnectionListener(redoService); rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); - rpcClient.registerConnectionListener(namingGrpcConnectionEventListener); + rpcClient.start(); NotifyCenter.registerSubscriber(this); } @@ -110,10 +111,23 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance); - namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance); + redoService.cacheInstanceForRedo(serviceName, groupName, instance); + doRegisterService(serviceName, groupName, instance); + } + + /** + * Execute register operation. + * + * @param serviceName name of service + * @param groupName group of service + * @param instance instance to register + * @throws NacosException nacos exception + */ + public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException { InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); requestToServer(request, Response.class); + redoService.instanceRegistered(serviceName, groupName); } @Override @@ -121,10 +135,23 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { NAMING_LOGGER .info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance); - namingGrpcConnectionEventListener.removeInstanceForRedo(serviceName, groupName, instance); + redoService.instanceDeregister(serviceName, groupName); + doDeregisterService(serviceName, groupName, instance); + } + + /** + * Execute deregister operation. + * + * @param serviceName service name + * @param groupName group name + * @param instance instance + * @throws NacosException nacos exception + */ + public void doDeregisterService(String serviceName, String groupName, Instance instance) throws NacosException { InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance); requestToServer(request, Response.class); + redoService.removeInstanceForRedo(serviceName, groupName); } @Override @@ -181,21 +208,46 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { @Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { + redoService.cacheSubscriberForRedo(serviceName, groupName, clusters); + return doSubscribe(serviceName, groupName, clusters); + } + + /** + * Execute subscribe operation. + * + * @param serviceName service name + * @param groupName group name + * @param clusters clusters, current only support subscribe all clusters, maybe deprecated + * @return current service info of subscribe service + * @throws NacosException nacos exception + */ + public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException { SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true); SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); - namingGrpcConnectionEventListener - .cacheSubscriberForRedo(NamingUtils.getGroupedName(serviceName, groupName), clusters); + redoService.subscriberRegistered(serviceName, groupName, clusters); return response.getServiceInfo(); } @Override public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException { + redoService.subscriberDeregister(serviceName, groupName, clusters); + doUnsubscribe(serviceName, groupName, clusters); + } + + /** + * Execute unsubscribe operation. + * + * @param serviceName service name + * @param groupName group name + * @param clusters clusters, current only support subscribe all clusters, maybe deprecated + * @throws NacosException nacos exception + */ + public void doUnsubscribe(String serviceName, String groupName, String clusters) throws NacosException { SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, groupName, clusters, false); requestToServer(request, SubscribeServiceResponse.class); - namingGrpcConnectionEventListener - .removeSubscriberForRedo(NamingUtils.getGroupedName(serviceName, groupName), clusters); + redoService.removeSubscriberForRedo(serviceName, groupName, clusters); } @Override @@ -232,7 +284,7 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy { @Override public void shutdown() throws NacosException { rpcClient.shutdown(); - namingGrpcConnectionEventListener.shutdown(); + redoService.shutdown(); } public boolean isEnable() { diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java deleted file mode 100644 index 09c6413e1..000000000 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListener.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.client.naming.remote.gprc; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.pojo.Instance; -import com.alibaba.nacos.api.naming.pojo.ServiceInfo; -import com.alibaba.nacos.api.naming.utils.NamingUtils; -import com.alibaba.nacos.client.utils.LogUtils; -import com.alibaba.nacos.common.remote.client.ConnectionEventListener; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; - -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Naming client gprc connection event listener. - * - *

- * When connection reconnect to server, redo the register and subscribe. - *

- * - * @author xiweng.yy - */ -public class NamingGrpcConnectionEventListener implements ConnectionEventListener { - - private final NamingGrpcClientProxy clientProxy; - - private final ConcurrentMap registeredInstanceCached = new ConcurrentHashMap<>(); - - private final Set subscribes = new ConcurrentHashSet(); - - private volatile boolean connected = false; - - private static final long DEFAULT_REDO_DELAY = 3000L; - - private static final int DEFAULT_REDO_THREAD = 2; - - private ScheduledExecutorService redoExecutorService; - - public NamingGrpcConnectionEventListener(NamingGrpcClientProxy clientProxy) { - this.clientProxy = clientProxy; - this.redoExecutorService = new ScheduledThreadPoolExecutor(DEFAULT_REDO_THREAD, r -> { - Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.naming.grpc.event.listener"); - t.setDaemon(true); - return t; - }); - } - - @Override - public void onConnected() { - connected = true; - LogUtils.NAMING_LOGGER.info("Grpc re-connect, redo subscribe services"); - redoSubscribe(subscribes); - LogUtils.NAMING_LOGGER.info("Grpc re-connect, redo register services"); - redoRegisterEachService(registeredInstanceCached.keySet()); - } - - private void redoSubscribe(Set subscribes) { - Set failedSubscribes = new ConcurrentHashSet<>(); - for (String each : subscribes) { - if (!connected) { - failedSubscribes.clear(); - break; - } - ServiceInfo serviceInfo = ServiceInfo.fromKey(each); - try { - clientProxy.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters()); - } catch (NacosException e) { - failedSubscribes.add(each); - LogUtils.NAMING_LOGGER.warn(String.format("re subscribe service %s failed, try again later.", serviceInfo.getName()), e); - } - } - if (!failedSubscribes.isEmpty()) { - redoExecutorService.schedule(() -> redoSubscribe(failedSubscribes), DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS); - } - } - - private void redoRegisterEachService(Set services) { - Set failedServices = new ConcurrentHashSet<>(); - for (String each : services) { - if (!connected) { - failedServices.clear(); - break; - } - String serviceName = NamingUtils.getServiceName(each); - String groupName = NamingUtils.getGroupName(each); - Instance instance = registeredInstanceCached.get(each); - if (!redoRegisterEachInstance(serviceName, groupName, instance)) { - failedServices.add(each); - } - } - if (!failedServices.isEmpty()) { - redoExecutorService.schedule(() -> redoRegisterEachService(failedServices), DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS); - } - } - - private boolean redoRegisterEachInstance(String serviceName, String groupName, Instance instance) { - try { - clientProxy.registerService(serviceName, groupName, instance); - } catch (NacosException e) { - LogUtils.NAMING_LOGGER.warn(String - .format("redo register for service %s@@%s, %s failed, try again later.", groupName, serviceName, instance.toString()), e); - return false; - } - return true; - } - - @Override - public void onDisConnect() { - connected = false; - LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect"); - } - - /** - * Cache registered instance for redo. - * - * @param serviceName service name - * @param groupName group name - * @param instance registered instance - */ - public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) { - String key = NamingUtils.getGroupedName(serviceName, groupName); - registeredInstanceCached.put(key, instance); - } - - /** - * Remove registered instance for redo. - * - * @param serviceName service name - * @param groupName group name - * @param instance registered instance - */ - public void removeInstanceForRedo(String serviceName, String groupName, Instance instance) { - String key = NamingUtils.getGroupedName(serviceName, groupName); - registeredInstanceCached.remove(key); - } - - public void cacheSubscriberForRedo(String fullServiceName, String cluster) { - subscribes.add(ServiceInfo.getKey(fullServiceName, cluster)); - } - - public void removeSubscriberForRedo(String fullServiceName, String cluster) { - subscribes.remove(ServiceInfo.getKey(fullServiceName, cluster)); - } - - public void shutdown() { - LogUtils.NAMING_LOGGER.info("Shutdown grpc event listener executor " + redoExecutorService); - redoExecutorService.shutdownNow(); - } - -} diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java new file mode 100644 index 000000000..a686b7600 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java @@ -0,0 +1,257 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo; + +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; +import com.alibaba.nacos.client.utils.LogUtils; +import com.alibaba.nacos.common.executor.NameThreadFactory; +import com.alibaba.nacos.common.remote.client.ConnectionEventListener; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Naming client gprc redo service. + * + *

When connection reconnect to server, redo the register and subscribe. + * + * @author xiweng.yy + */ +public class NamingGrpcRedoService implements ConnectionEventListener { + + private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo"; + + private static final int REDO_THREAD = 1; + + /** + * TODO get redo delay from config. + */ + private static final long DEFAULT_REDO_DELAY = 3000L; + + private final ConcurrentMap registeredInstances = new ConcurrentHashMap<>(); + + private final ConcurrentMap subscribes = new ConcurrentHashMap<>(); + + private final ScheduledExecutorService redoExecutor; + + private volatile boolean connected = false; + + public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) { + this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME)); + this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY, + DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS); + } + + public boolean isConnected() { + return connected; + } + + @Override + public void onConnected() { + connected = true; + LogUtils.NAMING_LOGGER.info("Grpc connection connect"); + } + + @Override + public void onDisConnect() { + connected = false; + LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo"); + synchronized (registeredInstances) { + registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false)); + } + synchronized (subscribes) { + subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false)); + } + LogUtils.NAMING_LOGGER.warn("mark to redo completed"); + } + + /** + * Cache registered instance for redo. + * + * @param serviceName service name + * @param groupName group name + * @param instance registered instance + */ + public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) { + String key = NamingUtils.getGroupedName(serviceName, groupName); + InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance); + synchronized (registeredInstances) { + registeredInstances.put(key, redoData); + } + } + + /** + * Instance register successfully, mark registered status as {@code true}. + * + * @param serviceName service name + * @param groupName group name + */ + public void instanceRegistered(String serviceName, String groupName) { + String key = NamingUtils.getGroupedName(serviceName, groupName); + synchronized (registeredInstances) { + InstanceRedoData redoData = registeredInstances.get(key); + if (null != redoData) { + redoData.setRegistered(true); + } + } + } + + /** + * Instance deregister, mark unregistering status as {@code true}. + * + * @param serviceName service name + * @param groupName group name + */ + public void instanceDeregister(String serviceName, String groupName) { + String key = NamingUtils.getGroupedName(serviceName, groupName); + synchronized (registeredInstances) { + InstanceRedoData redoData = registeredInstances.get(key); + if (null != redoData) { + redoData.setUnregistering(true); + } + } + } + + /** + * Remove registered instance for redo. + * + * @param serviceName service name + * @param groupName group name + */ + public void removeInstanceForRedo(String serviceName, String groupName) { + synchronized (registeredInstances) { + registeredInstances.remove(NamingUtils.getGroupedName(serviceName, groupName)); + } + } + + /** + * Find all instance redo data which need do redo. + * + * @return set of {@code InstanceRedoData} need to do redo. + */ + public Set findInstanceRedoData() { + Set result = new HashSet<>(); + synchronized (registeredInstances) { + for (InstanceRedoData each : registeredInstances.values()) { + if (each.isNeedRedo()) { + result.add(each); + } + } + } + return result; + } + + /** + * Cache subscriber for redo. + * + * @param serviceName service name + * @param groupName group name + * @param cluster cluster + */ + public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster); + SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster); + synchronized (subscribes) { + subscribes.put(key, redoData); + } + } + + /** + * Subscriber register successfully, mark registered status as {@code true}. + * + * @param serviceName service name + * @param groupName group name + * @param cluster cluster + */ + public void subscriberRegistered(String serviceName, String groupName, String cluster) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster); + synchronized (subscribes) { + SubscriberRedoData redoData = subscribes.get(key); + if (null != redoData) { + redoData.setRegistered(true); + } + } + } + + /** + * Subscriber deregister, mark unregistering status as {@code true}. + * + * @param serviceName service name + * @param groupName group name + * @param cluster cluster + */ + public void subscriberDeregister(String serviceName, String groupName, String cluster) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster); + synchronized (subscribes) { + SubscriberRedoData redoData = subscribes.get(key); + if (null != redoData) { + redoData.setUnregistering(true); + } + } + } + + /** + * Remove subscribe for redo. + * + * @param serviceName service name + * @param groupName group name + * @param cluster cluster + */ + public void removeSubscriberForRedo(String serviceName, String groupName, String cluster) { + synchronized (subscribes) { + subscribes.remove(ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster)); + } + } + + /** + * Find all subscriber redo data which need do redo. + * + * @return set of {@code InstanceRedoData} need to do redo. + */ + public Set findSubscriberRedoData() { + Set result = new HashSet<>(); + synchronized (subscribes) { + for (SubscriberRedoData each : subscribes.values()) { + if (each.isNeedRedo()) { + result.add(each); + } + } + } + return result; + } + + /** + * Shutdown redo service. + */ + public void shutdown() { + LogUtils.NAMING_LOGGER.info("Shutdown grpc event listener executor " + redoExecutor); + registeredInstances.clear(); + subscribes.clear(); + redoExecutor.shutdownNow(); + } + +} diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTask.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTask.java new file mode 100644 index 000000000..f7607b396 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTask.java @@ -0,0 +1,134 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.RedoData; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; +import com.alibaba.nacos.client.utils.LogUtils; +import com.alibaba.nacos.common.task.AbstractExecuteTask; + +/** + * Redo task. + * + * @author xiweng.yy + */ +public class RedoScheduledTask extends AbstractExecuteTask { + + private final NamingGrpcClientProxy clientProxy; + + private final NamingGrpcRedoService redoService; + + public RedoScheduledTask(NamingGrpcClientProxy clientProxy, NamingGrpcRedoService redoService) { + this.clientProxy = clientProxy; + this.redoService = redoService; + } + + @Override + public void run() { + if (!redoService.isConnected()) { + LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task"); + return; + } + try { + redoForInstances(); + redoForSubscribes(); + } catch (Exception e) { + LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e); + } + } + + private void redoForInstances() { + for (InstanceRedoData each : redoService.findInstanceRedoData()) { + try { + redoForInstance(each); + } catch (NacosException e) { + LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(), + each.getGroupName(), each.getServiceName(), e); + } + } + } + + private void redoForInstance(InstanceRedoData redoData) throws NacosException { + RedoData.RedoType redoType = redoData.getRedoType(); + String serviceName = redoData.getServiceName(); + String groupName = redoData.getGroupName(); + LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName); + switch (redoType) { + case REGISTER: + if (isClientDisabled()) { + return; + } + clientProxy.doRegisterService(serviceName, groupName, redoData.get()); + break; + case UNREGISTER: + if (isClientDisabled()) { + return; + } + clientProxy.doDeregisterService(serviceName, groupName, redoData.get()); + break; + case REMOVE: + redoService.removeInstanceForRedo(serviceName, groupName); + break; + default: + } + + } + + private void redoForSubscribes() { + for (SubscriberRedoData each : redoService.findSubscriberRedoData()) { + try { + redoForSubscribe(each); + } catch (NacosException e) { + LogUtils.NAMING_LOGGER.error("Redo subscriber operation {} for {}@@{}#{} failed. ", each.getRedoType(), + each.getGroupName(), each.getServiceName(), each.get(), e); + } + } + } + + private void redoForSubscribe(SubscriberRedoData redoData) throws NacosException { + RedoData.RedoType redoType = redoData.getRedoType(); + String serviceName = redoData.getServiceName(); + String groupName = redoData.getGroupName(); + String cluster = redoData.get(); + LogUtils.NAMING_LOGGER.info("Redo subscriber operation {} for {}@@{}#{}", redoType, groupName, serviceName, cluster); + switch (redoData.getRedoType()) { + case REGISTER: + if (isClientDisabled()) { + return; + } + clientProxy.doSubscribe(serviceName, groupName, cluster); + break; + case UNREGISTER: + if (isClientDisabled()) { + return; + } + clientProxy.doUnsubscribe(serviceName, groupName, cluster); + break; + case REMOVE: + redoService.removeSubscriberForRedo(redoData.getServiceName(), redoData.getGroupName(), redoData.get()); + break; + default: + } + } + + private boolean isClientDisabled() { + return !clientProxy.isEnable(); + } +} diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/InstanceRedoData.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/InstanceRedoData.java new file mode 100644 index 000000000..f9a10c8ec --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/InstanceRedoData.java @@ -0,0 +1,45 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo.data; + +import com.alibaba.nacos.api.naming.pojo.Instance; + +/** + * Redo data for register service instance. + * + * @author xiweng.yy + */ +public class InstanceRedoData extends RedoData { + + private InstanceRedoData(String serviceName, String groupName) { + super(serviceName, groupName); + } + + /** + * Build a new {@code RedoData} for register service instance. + * + * @param serviceName service name for redo data + * @param groupName group name for redo data + * @param instance instance for redo data + * @return new {@code RedoData} for register service instance + */ + public static InstanceRedoData build(String serviceName, String groupName, Instance instance) { + InstanceRedoData result = new InstanceRedoData(serviceName, groupName); + result.set(instance); + return result; + } +} diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/RedoData.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/RedoData.java new file mode 100644 index 000000000..a6c693742 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/RedoData.java @@ -0,0 +1,130 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo.data; + +/** + * Nacos naming redo data. + * + * @author xiweng.yy + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class RedoData { + + private final String serviceName; + + private final String groupName; + + /** + * If {@code true} means cached data has been registered to server successfully. + */ + private volatile boolean registered; + + /** + * If {@code true} means cached data is unregistering from server. + */ + private volatile boolean unregistering; + + private T data; + + protected RedoData(String serviceName, String groupName) { + this.serviceName = serviceName; + this.groupName = groupName; + } + + public String getServiceName() { + return serviceName; + } + + public String getGroupName() { + return groupName; + } + + public boolean isRegistered() { + return registered; + } + + public void setRegistered(boolean registered) { + this.registered = registered; + } + + public boolean isUnregistering() { + return unregistering; + } + + public void setUnregistering(boolean unregistering) { + this.unregistering = unregistering; + } + + public T get() { + return data; + } + + public void set(T data) { + this.data = data; + } + + /** + * Get redo type for current redo data. + * + *

    + *
  • {@code registered=true} & {@code unregistering=false} means data has registered, so redo should not do anything.
  • + *
  • {@code registered=true} & {@code unregistering=true} means data has registered and now need unregister.
  • + *
  • {@code registered=false} & {@code unregistering=false} means not registered yet, need register again.
  • + *
  • {@code registered=false} & {@code unregistering=true} means not registered yet and not continue to register.
  • + *
+ * + * @return redo type + */ + public RedoType getRedoType() { + if (isRegistered() && !isUnregistering()) { + return RedoType.NONE; + } else if (isRegistered() && isUnregistering()) { + return RedoType.UNREGISTER; + } else if (!isRegistered() && !isUnregistering()) { + return RedoType.REGISTER; + } else { + return RedoType.REMOVE; + } + } + + public boolean isNeedRedo() { + return !RedoType.NONE.equals(getRedoType()); + } + + public enum RedoType { + + /** + * Redo register. + */ + REGISTER, + + /** + * Redo unregister. + */ + UNREGISTER, + + /** + * Redo nothing. + */ + NONE, + + /** + * Remove redo data. + */ + REMOVE; + } +} diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/SubscriberRedoData.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/SubscriberRedoData.java new file mode 100644 index 000000000..f01a95d03 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/data/SubscriberRedoData.java @@ -0,0 +1,43 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo.data; + +/** + * Redo data for subscribers. + * + * @author xiweng.yy + */ +public class SubscriberRedoData extends RedoData { + + private SubscriberRedoData(String serviceName, String groupName) { + super(serviceName, groupName); + } + + /** + * Build a new {@code RedoData} for subscribers. + * + * @param serviceName service name for redo data + * @param groupName group name for redo data + * @param clusters clusters for redo data + * @return new {@code RedoData} for subscribers + */ + public static SubscriberRedoData build(String serviceName, String groupName, String clusters) { + SubscriberRedoData result = new SubscriberRedoData(serviceName, groupName); + result.set(clusters); + return result; + } +} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxyTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxyTest.java index adb9a038c..af2a4f95a 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxyTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxyTest.java @@ -39,6 +39,7 @@ import com.alibaba.nacos.api.selector.AbstractSelector; import com.alibaba.nacos.api.selector.NoneSelector; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.event.ServerListChangedEvent; +import com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService; import com.alibaba.nacos.client.security.SecurityProxy; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.remote.ConnectionType; @@ -46,10 +47,13 @@ import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.ServerListFactory; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.ArgumentMatcher; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import java.lang.reflect.Field; import java.util.Arrays; @@ -62,480 +66,215 @@ import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@RunWith(MockitoJUnitRunner.class) public class NamingGrpcClientProxyTest { + private static final String NAMESPACE_ID = "ns1"; + + private static final String SERVICE_NAME = "service1"; + + private static final String GROUP_NAME = "group1"; + + private static final String CLUSTERS = "cluster1"; + + @Mock + private SecurityProxy proxy; + + @Mock + private ServerListFactory factory; + + @Mock + private ServiceInfoHolder holder; + + @Mock + private RpcClient rpcClient; + + private Properties prop; + + private NamingGrpcClientProxy client; + + private Response response; + + private Instance instance; + @Rule public final ExpectedException thrown = ExpectedException.none(); - @Test - public void testRegisterService() throws NacosException, NoSuchFieldException, IllegalAccessException { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - Instance instance = new Instance(); - instance.setServiceName(serviceName); + @Before + public void setUp() throws NacosException, NoSuchFieldException, IllegalAccessException { + prop = new Properties(); + client = new NamingGrpcClientProxy(NAMESPACE_ID, proxy, factory, prop, holder); + Field rpcClientField = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); + rpcClientField.setAccessible(true); + rpcClientField.set(client, this.rpcClient); + response = new InstanceResponse(); + when(this.rpcClient.request(any())).thenReturn(response); + instance = new Instance(); + instance.setServiceName(SERVICE_NAME); instance.setIp("1.1.1.1"); instance.setPort(1111); - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - // inject rpcClient; - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - - Response res = new InstanceResponse(); - when(rpc.request(any())).thenReturn(res); - - String groupName = "group1"; - - // when - client.registerService(serviceName, groupName, instance); - - // then - verify(rpc, times(1)).request(argThat(new ArgumentMatcher() { - @Override - public boolean matches(Request request) { - if (request instanceof InstanceRequest) { - InstanceRequest request1 = (InstanceRequest) request; - return request1.getType().equals(NamingRemoteConstants.REGISTER_INSTANCE); - } - return false; - } - })); - } @Test - public void testDeregisterService() throws NacosException, NoSuchFieldException, IllegalAccessException { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - // inject rpcClient; - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - - Response res = new InstanceResponse(); - when(rpc.request(any())).thenReturn(res); - - String groupName = "group1"; - - // when - client.deregisterService(serviceName, groupName, instance); - - // then - verify(rpc, times(1)).request(argThat(new ArgumentMatcher() { - @Override - public boolean matches(Request request) { - if (request instanceof InstanceRequest) { - InstanceRequest request1 = (InstanceRequest) request; - return request1.getType().equals(NamingRemoteConstants.DE_REGISTER_INSTANCE); - } - return false; + public void testRegisterService() throws NacosException { + client.registerService(SERVICE_NAME, GROUP_NAME, instance); + verify(this.rpcClient, times(1)).request(argThat(request -> { + if (request instanceof InstanceRequest) { + InstanceRequest request1 = (InstanceRequest) request; + return request1.getType().equals(NamingRemoteConstants.REGISTER_INSTANCE); } + return false; + })); + } + + @Test + public void testDeregisterService() throws NacosException { + client.deregisterService(SERVICE_NAME, GROUP_NAME, instance); + verify(this.rpcClient, times(1)).request(argThat(request -> { + if (request instanceof InstanceRequest) { + InstanceRequest request1 = (InstanceRequest) request; + return request1.getType().equals(NamingRemoteConstants.DE_REGISTER_INSTANCE); + } + return false; })); } @Test public void testUpdateInstance() throws Exception { //TODO thrown.expect(UnsupportedOperationException.class); - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - - String groupName = "group1"; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - client.updateInstance(serviceName, groupName, instance); + client.updateInstance(SERVICE_NAME, GROUP_NAME, instance); } @Test public void testQueryInstancesOfService() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - QueryServiceResponse res = new QueryServiceResponse(); - String clusters = "cluster1"; - String groupName = "group1"; - - ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters); + ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS); res.setServiceInfo(info); - when(rpc.request(any())).thenReturn(res); - - // when - ServiceInfo actual = client.queryInstancesOfService(serviceName, groupName, clusters, 0, false); - + when(this.rpcClient.request(any())).thenReturn(res); + ServiceInfo actual = client.queryInstancesOfService(SERVICE_NAME, GROUP_NAME, CLUSTERS, 0, false); Assert.assertEquals(info, actual); } @Test public void testQueryService() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - String serviceName = "service1"; - String groupName = "group1"; - // when - Service service = client.queryService(serviceName, groupName); - // then + Service service = client.queryService(SERVICE_NAME, GROUP_NAME); Assert.assertNull(service); } @Test public void testCreateService() throws Exception { //TODO thrown.expect(UnsupportedOperationException.class); - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - Service service = new Service(); AbstractSelector selector = new NoneSelector(); - - // when client.createService(service, selector); - } @Test public void testDeleteService() throws Exception { //TODO thrown.expect(UnsupportedOperationException.class); - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - String serviceName = "service1"; - String groupName = "group1"; - // when - Assert.assertFalse(client.deleteService(serviceName, groupName)); + Assert.assertFalse(client.deleteService(SERVICE_NAME, GROUP_NAME)); } @Test public void testUpdateService() throws NacosException { //TODO thrown.expect(UnsupportedOperationException.class); - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); Service service = new Service(); AbstractSelector selector = new NoneSelector(); - // when client.updateService(service, selector); } @Test public void testGetServiceList() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - ServiceListResponse res = new ServiceListResponse(); - List services = Arrays.asList("service1", "service2"); res.setServiceNames(services); res.setCount(5); - when(rpc.request(any())).thenReturn(res); - + when(this.rpcClient.request(any())).thenReturn(res); AbstractSelector selector = new NoneSelector(); - String groupName = "group1"; - - // when - ListView serviceList = client.getServiceList(1, 10, groupName, selector); - + ListView serviceList = client.getServiceList(1, 10, GROUP_NAME, selector); Assert.assertEquals(5, serviceList.getCount()); Assert.assertEquals(services, serviceList.getData()); } @Test public void testSubscribe() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - SubscribeServiceResponse res = new SubscribeServiceResponse(); - String clusters = "cluster1"; - String groupName = "group1"; - - ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters); + ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS); res.setServiceInfo(info); - when(rpc.request(any())).thenReturn(res); - - // when - ServiceInfo actual = client.subscribe(serviceName, groupName, clusters); - + when(this.rpcClient.request(any())).thenReturn(res); + ServiceInfo actual = client.subscribe(SERVICE_NAME, GROUP_NAME, CLUSTERS); Assert.assertEquals(info, actual); } @Test public void testUnsubscribe() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - String serviceName = "service1"; - - Instance instance = new Instance(); - instance.setServiceName(serviceName); - instance.setIp("1.1.1.1"); - instance.setPort(1111); - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - SubscribeServiceResponse res = new SubscribeServiceResponse(); - String clusters = "cluster1"; - String groupName = "group1"; - - ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters); + ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS); res.setServiceInfo(info); - when(rpc.request(any())).thenReturn(res); - - // when - client.unsubscribe(serviceName, groupName, clusters); - - verify(rpc, times(1)).request(argThat(new ArgumentMatcher() { - @Override - public boolean matches(Request request) { - if (request instanceof SubscribeServiceRequest) { - SubscribeServiceRequest request1 = (SubscribeServiceRequest) request; - // not subscribe - return !request1.isSubscribe(); - } - return false; + when(this.rpcClient.request(any())).thenReturn(res); + client.unsubscribe(SERVICE_NAME, GROUP_NAME, CLUSTERS); + verify(this.rpcClient, times(1)).request(argThat(request -> { + if (request instanceof SubscribeServiceRequest) { + SubscribeServiceRequest request1 = (SubscribeServiceRequest) request; + // not subscribe + return !request1.isSubscribe(); } + return false; })); } @Test - public void testUpdateBeatInfo() throws NacosException { + public void testUpdateBeatInfo() { //TODO thrown.expect(UnsupportedOperationException.class); - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - // when client.updateBeatInfo(new HashSet<>()); } @Test - public void testServerHealthy() throws NacosException, IllegalAccessException, NoSuchFieldException { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - - when(rpc.isRunning()).thenReturn(true); - - // when + public void testServerHealthy() { + when(this.rpcClient.isRunning()).thenReturn(true); Assert.assertTrue(client.serverHealthy()); - - verify(rpc, times(1)).isRunning(); + verify(this.rpcClient, times(1)).isRunning(); } @Test public void testShutdown() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - - when(rpc.isRunning()).thenReturn(true); - - // when client.shutdown(); - - verify(rpc, times(1)).shutdown(); + verify(this.rpcClient, times(1)).shutdown(); } @Test - public void testIsEnable() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); - - RpcClient rpc = mock(RpcClient.class); - Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); - rpcClient.setAccessible(true); - rpcClient.set(client, rpc); - - when(rpc.isRunning()).thenReturn(true); - - // when + public void testIsEnable() { + when(this.rpcClient.isRunning()).thenReturn(true); Assert.assertTrue(client.isEnable()); - - verify(rpc, times(1)).isRunning(); + verify(this.rpcClient, times(1)).isRunning(); } @Test public void testServerListChanged() throws Exception { - // given - String namespaceId = "ns1"; - SecurityProxy proxy = mock(SecurityProxy.class); - ServerListFactory factory = mock(ServerListFactory.class); - ServiceInfoHolder holder = mock(ServiceInfoHolder.class); - Properties prop = new Properties(); - String originServer = "www.google.com"; - when(factory.getServerList()).thenReturn(Stream.of(originServer, "anotherServer").collect(Collectors.toList())); - // inject rpcClient; - NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder); + List serverList = Stream.of(originServer, "anotherServer").collect(Collectors.toList()); + when(factory.getServerList()).thenReturn(serverList); when(factory.genNextServer()).thenReturn(originServer); - + RpcClient rpc = new RpcClient("testServerListHasChanged", factory) { @Override public ConnectionType getConnectionType() { return ConnectionType.GRPC; } - + @Override public int rpcPortOffset() { return 0; } - + @Override public Connection connectToServer(ServerInfo serverInfo) throws Exception { return new Connection(serverInfo) { - + @Override public Response request(Request request, long timeoutMills) throws NacosException { Response response = new Response() { @@ -543,17 +282,17 @@ public class NamingGrpcClientProxyTest { response.setRequestId(request.getRequestId()); return response; } - + @Override public RequestFuture requestFuture(Request request) throws NacosException { return new DefaultRequestFuture("test", request.getRequestId()); } - + @Override public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { - + } - + @Override public void close() { } @@ -563,12 +302,12 @@ public class NamingGrpcClientProxyTest { Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); rpcClient.setAccessible(true); rpcClient.set(client, rpc); - + rpc.serverListFactory(factory); rpc.registerServerRequestHandler(new NamingPushRequestHandler(holder)); - Field listenerField = NamingGrpcClientProxy.class.getDeclaredField("namingGrpcConnectionEventListener"); + Field listenerField = NamingGrpcClientProxy.class.getDeclaredField("redoService"); listenerField.setAccessible(true); - NamingGrpcConnectionEventListener listener = (NamingGrpcConnectionEventListener) listenerField.get(client); + NamingGrpcRedoService listener = (NamingGrpcRedoService) listenerField.get(client); rpc.registerConnectionListener(listener); rpc.start(); int retry = 10; @@ -578,14 +317,14 @@ public class NamingGrpcClientProxyTest { Assert.fail("rpc is not running"); } } - + Assert.assertEquals(originServer, rpc.getCurrentServer().getServerIp()); - + String newServer = "www.aliyun.com"; when(factory.genNextServer()).thenReturn(newServer); when(factory.getServerList()).thenReturn(Stream.of(newServer, "anotherServer").collect(Collectors.toList())); NotifyCenter.publishEvent(new ServerListChangedEvent()); - + retry = 10; while (originServer.equals(rpc.getCurrentServer().getServerIp())) { TimeUnit.SECONDS.sleep(1); @@ -593,7 +332,7 @@ public class NamingGrpcClientProxyTest { Assert.fail("failed to auth switch server"); } } - + Assert.assertEquals(newServer, rpc.getCurrentServer().getServerIp()); } -} \ No newline at end of file +} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListenerTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListenerTest.java deleted file mode 100644 index 91c9805b2..000000000 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcConnectionEventListenerTest.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.alibaba.nacos.client.naming.remote.gprc; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.pojo.Instance; -import com.alibaba.nacos.api.naming.pojo.ServiceInfo; -import com.alibaba.nacos.api.naming.utils.NamingUtils; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class NamingGrpcConnectionEventListenerTest { - - @Test - public void testCacheInstanceForRedo() throws NacosException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String serviceName = "service1"; - String groupName = "group1"; - Instance instance = new Instance(); - listener.cacheInstanceForRedo(serviceName, groupName, instance); - //when - listener.onConnected(); - //then - verify(proxy, times(1)).registerService(serviceName, groupName, instance); - } - - @Test - public void testRemoveInstanceForRedo() throws NacosException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String serviceName = "service1"; - String groupName = "group1"; - Instance instance = new Instance(); - listener.cacheInstanceForRedo(serviceName, groupName, instance); - listener.removeInstanceForRedo(serviceName, groupName, instance); - //when - listener.onConnected(); - //then - verify(proxy, times(0)).registerService(serviceName, groupName, instance); - } - - @Test - public void testCacheSubscriberForRedo() throws NacosException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String fullServiceName = "group1@@service1"; - String cluster = "cluster1"; - listener.cacheSubscriberForRedo(fullServiceName, cluster); - //when - listener.onConnected(); - //then - ServiceInfo info = ServiceInfo.fromKey(fullServiceName); - verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster); - } - - @Test - public void testRemoveSubscriberForRedo() throws NacosException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String fullServiceName = "group1@@service1"; - String cluster = "cluster1"; - listener.cacheSubscriberForRedo(fullServiceName, cluster); - listener.removeSubscriberForRedo(fullServiceName, cluster); - //when - listener.onConnected(); - //then - ServiceInfo info = ServiceInfo.fromKey(fullServiceName); - verify(proxy, times(0)).subscribe(info.getName(), info.getGroupName(), cluster); - } - - @Test - public void testRedoRegisterEachService1() - throws NacosException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String serviceName = "service1"; - String groupName = "group1"; - Instance instance = new Instance(); - listener.cacheInstanceForRedo(serviceName, groupName, instance); - Set failedServices = new ConcurrentHashSet<>(); - failedServices.add(NamingUtils.getGroupedName(serviceName, groupName)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, true); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class); - method.setAccessible(true); - method.invoke(listener, failedServices); - //then - verify(proxy, times(1)).registerService(serviceName, groupName, instance); - } - - @Test - public void testRedoRegisterEachService2() - throws NacosException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String serviceName = "service1"; - String groupName = "group1"; - Instance instance = new Instance(); - listener.cacheInstanceForRedo(serviceName, groupName, instance); - Set failedServices = new ConcurrentHashSet<>(); - failedServices.add(NamingUtils.getGroupedName(serviceName, groupName)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, false); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class); - method.setAccessible(true); - method.invoke(listener, failedServices); - //then - verify(proxy, times(0)).registerService(serviceName, groupName, instance); - } - - @Test - public void testRedoRegisterEachService3() - throws NacosException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InterruptedException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String serviceName = "service1"; - String groupName = "group1"; - Instance instance = new Instance(); - listener.cacheInstanceForRedo(serviceName, groupName, instance); - Set failedServices = new ConcurrentHashSet<>(); - failedServices.add(NamingUtils.getGroupedName(serviceName, groupName)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, true); - Field executorFiled = NamingGrpcConnectionEventListener.class.getDeclaredField("redoExecutorService"); - executorFiled.setAccessible(true); - ScheduledExecutorService executorService = (ScheduledExecutorService) executorFiled.get(listener); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class); - method.setAccessible(true); - executorService.schedule(() -> method.invoke(listener, failedServices), 0, TimeUnit.MILLISECONDS); - //then - TimeUnit.MILLISECONDS.sleep(100L); - verify(proxy, times(1)).registerService(serviceName, groupName, instance); - } - - @Test - public void testRedoSubscribe1() - throws NacosException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String fullServiceName = "group1@@service1"; - String cluster = "cluster1"; - listener.cacheSubscriberForRedo(fullServiceName, cluster); - Set failedSubscribes = new ConcurrentHashSet<>(); - failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, true); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class); - method.setAccessible(true); - method.invoke(listener, failedSubscribes); - //then - ServiceInfo info = ServiceInfo.fromKey(fullServiceName); - verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster); - } - - @Test - public void testRedoSubscribe2() - throws NacosException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String fullServiceName = "group1@@service1"; - String cluster = "cluster1"; - listener.cacheSubscriberForRedo(fullServiceName, cluster); - Set failedSubscribes = new ConcurrentHashSet<>(); - failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, false); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class); - method.setAccessible(true); - method.invoke(listener, failedSubscribes); - //then - ServiceInfo info = ServiceInfo.fromKey(fullServiceName); - verify(proxy, times(0)).subscribe(info.getName(), info.getGroupName(), cluster); - } - - @Test - public void testRedoSubscribe3() - throws NacosException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InterruptedException { - //given - NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class); - NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy); - String fullServiceName = "group1@@service1"; - String cluster = "cluster1"; - listener.cacheSubscriberForRedo(fullServiceName, cluster); - Set failedSubscribes = new ConcurrentHashSet<>(); - failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster)); - //when - Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected"); - connected.setAccessible(true); - connected.setBoolean(listener, true); - Field executorFiled = NamingGrpcConnectionEventListener.class.getDeclaredField("redoExecutorService"); - executorFiled.setAccessible(true); - ScheduledExecutorService executorService = (ScheduledExecutorService) executorFiled.get(listener); - Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class); - method.setAccessible(true); - executorService.schedule(() -> method.invoke(listener, failedSubscribes), 0, TimeUnit.MILLISECONDS); - //then - TimeUnit.MILLISECONDS.sleep(100L); - ServiceInfo info = ServiceInfo.fromKey(fullServiceName); - verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster); - } -} \ No newline at end of file diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java new file mode 100644 index 000000000..afaffea00 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java @@ -0,0 +1,202 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo; + +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; +import com.alibaba.nacos.common.utils.ReflectUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(MockitoJUnitRunner.class) +public class NamingGrpcRedoServiceTest { + + private static final String SERVICE = "service"; + + private static final String GROUP = "group"; + + private static final String CLUSTER = "cluster"; + + @Mock + private NamingGrpcClientProxy clientProxy; + + private NamingGrpcRedoService redoService; + + @Before + public void setUp() throws Exception { + redoService = new NamingGrpcRedoService(clientProxy); + ScheduledExecutorService redoExecutor = (ScheduledExecutorService) ReflectUtils + .getFieldValue(redoService, "redoExecutor"); + redoExecutor.shutdownNow(); + } + + @After + public void tearDown() throws Exception { + redoService.shutdown(); + } + + @Test + public void testOnConnected() { + assertFalse(redoService.isConnected()); + redoService.onConnected(); + assertTrue(redoService.isConnected()); + } + + @Test + public void testOnDisConnect() { + redoService.onConnected(); + redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); + redoService.instanceRegistered(SERVICE, GROUP); + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER); + assertTrue(redoService.isConnected()); + assertTrue(redoService.findInstanceRedoData().isEmpty()); + assertTrue(redoService.findSubscriberRedoData().isEmpty()); + redoService.onDisConnect(); + assertFalse(redoService.isConnected()); + assertFalse(redoService.findInstanceRedoData().isEmpty()); + assertFalse(redoService.findSubscriberRedoData().isEmpty()); + } + + @Test + public void testCacheInstanceForRedo() { + ConcurrentMap registeredInstances = getInstanceRedoDataMap(); + assertTrue(registeredInstances.isEmpty()); + Instance instance = new Instance(); + redoService.cacheInstanceForRedo(SERVICE, GROUP, instance); + assertFalse(registeredInstances.isEmpty()); + InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue(); + assertEquals(SERVICE, actual.getServiceName()); + assertEquals(GROUP, actual.getGroupName()); + assertEquals(instance, actual.get()); + assertFalse(actual.isRegistered()); + assertFalse(actual.isUnregistering()); + } + + @Test + public void testInstanceRegistered() { + ConcurrentMap registeredInstances = getInstanceRedoDataMap(); + redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); + redoService.instanceRegistered(SERVICE, GROUP); + InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue(); + assertTrue(actual.isRegistered()); + } + + @Test + public void testInstanceDeregister() { + ConcurrentMap registeredInstances = getInstanceRedoDataMap(); + redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); + redoService.instanceDeregister(SERVICE, GROUP); + InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue(); + assertTrue(actual.isUnregistering()); + } + + @Test + public void testRemoveInstanceForRedo() { + ConcurrentMap registeredInstances = getInstanceRedoDataMap(); + assertTrue(registeredInstances.isEmpty()); + redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); + assertFalse(registeredInstances.isEmpty()); + redoService.removeInstanceForRedo(SERVICE, GROUP); + assertTrue(registeredInstances.isEmpty()); + } + + @Test + public void testFindInstanceRedoData() { + redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); + assertFalse(redoService.findInstanceRedoData().isEmpty()); + redoService.instanceRegistered(SERVICE, GROUP); + assertTrue(redoService.findInstanceRedoData().isEmpty()); + redoService.instanceDeregister(SERVICE, GROUP); + assertFalse(redoService.findInstanceRedoData().isEmpty()); + } + + @SuppressWarnings("all") + private ConcurrentMap getInstanceRedoDataMap() { + return (ConcurrentMap) ReflectUtils.getFieldValue(redoService, "registeredInstances"); + } + + @Test + public void testCacheSubscriberForRedo() { + ConcurrentMap subscribes = getSubscriberRedoDataMap(); + assertTrue(subscribes.isEmpty()); + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + assertFalse(subscribes.isEmpty()); + SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue(); + assertEquals(SERVICE, actual.getServiceName()); + assertEquals(GROUP, actual.getGroupName()); + assertEquals(CLUSTER, actual.get()); + assertFalse(actual.isRegistered()); + assertFalse(actual.isUnregistering()); + } + + @Test + public void testSubscriberRegistered() { + ConcurrentMap subscribes = getSubscriberRedoDataMap(); + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER); + SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue(); + assertTrue(actual.isRegistered()); + } + + @Test + public void testSubscriberDeregister() { + ConcurrentMap subscribes = getSubscriberRedoDataMap(); + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + redoService.subscriberDeregister(SERVICE, GROUP, CLUSTER); + SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue(); + assertTrue(actual.isUnregistering()); + } + + @Test + public void testRemoveSubscriberForRedo() { + ConcurrentMap subscribes = getSubscriberRedoDataMap(); + assertTrue(subscribes.isEmpty()); + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + assertFalse(subscribes.isEmpty()); + redoService.removeSubscriberForRedo(SERVICE, GROUP, CLUSTER); + assertTrue(subscribes.isEmpty()); + } + + @Test + public void testFindSubscriberRedoData() { + redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); + assertFalse(redoService.findSubscriberRedoData().isEmpty()); + redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER); + assertTrue(redoService.findSubscriberRedoData().isEmpty()); + redoService.subscriberDeregister(SERVICE, GROUP, CLUSTER); + assertFalse(redoService.findSubscriberRedoData().isEmpty()); + } + + @SuppressWarnings("all") + private ConcurrentMap getSubscriberRedoDataMap() { + return (ConcurrentMap) ReflectUtils.getFieldValue(redoService, "subscribes"); + } +} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTaskTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTaskTest.java new file mode 100644 index 000000000..51f5e1a24 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/RedoScheduledTaskTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote.gprc.redo; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; +import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashSet; +import java.util.Set; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RedoScheduledTaskTest { + + private static final String SERVICE = "service"; + + private static final String GROUP = "group"; + + private static final String CLUSTER = "cluster"; + + private static final Instance INSTANCE = new Instance(); + + @Mock + private NamingGrpcClientProxy clientProxy; + + @Mock + private NamingGrpcRedoService redoService; + + private RedoScheduledTask redoTask; + + @Before + public void setUp() throws Exception { + redoTask = new RedoScheduledTask(clientProxy, redoService); + when(clientProxy.isEnable()).thenReturn(true); + when(redoService.isConnected()).thenReturn(true); + } + + @Test + public void testRunRedoRegisterInstance() throws NacosException { + Set mockData = generateMockInstanceData(false, false); + when(redoService.findInstanceRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy).doRegisterService(SERVICE, GROUP, INSTANCE); + } + + @Test + public void testRunRedoDeregisterInstance() throws NacosException { + Set mockData = generateMockInstanceData(true, true); + when(redoService.findInstanceRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy).doDeregisterService(SERVICE, GROUP, INSTANCE); + } + + @Test + public void testRunRedoRemoveInstanceRedoData() throws NacosException { + Set mockData = generateMockInstanceData(false, true); + when(redoService.findInstanceRedoData()).thenReturn(mockData); + redoTask.run(); + verify(redoService).removeInstanceForRedo(SERVICE, GROUP); + } + + @Test + public void testRunRedoRegisterInstanceWithClientDisabled() throws NacosException { + when(clientProxy.isEnable()).thenReturn(false); + Set mockData = generateMockInstanceData(false, false); + when(redoService.findInstanceRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy, never()).doRegisterService(SERVICE, GROUP, INSTANCE); + } + + private Set generateMockInstanceData(boolean registered, boolean unregistering) { + InstanceRedoData redoData = InstanceRedoData.build(SERVICE, GROUP, INSTANCE); + redoData.setRegistered(registered); + redoData.setUnregistering(unregistering); + Set result = new HashSet<>(); + result.add(redoData); + return result; + } + + @Test + public void testRunRedoRegisterSubscriber() throws NacosException { + Set mockData = generateMockSubscriberData(false, false); + when(redoService.findSubscriberRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy).doSubscribe(SERVICE, GROUP, CLUSTER); + } + + @Test + public void testRunRedoDeregisterSubscriber() throws NacosException { + Set mockData = generateMockSubscriberData(true, true); + when(redoService.findSubscriberRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy).doUnsubscribe(SERVICE, GROUP, CLUSTER); + } + + @Test + public void testRunRedoRemoveSubscriberRedoData() throws NacosException { + Set mockData = generateMockSubscriberData(false, true); + when(redoService.findSubscriberRedoData()).thenReturn(mockData); + redoTask.run(); + verify(redoService).removeSubscriberForRedo(SERVICE, GROUP, CLUSTER); + } + + @Test + public void testRunRedoRegisterSubscriberWithClientDisabled() throws NacosException { + when(clientProxy.isEnable()).thenReturn(false); + Set mockData = generateMockSubscriberData(false, false); + when(redoService.findSubscriberRedoData()).thenReturn(mockData); + redoTask.run(); + verify(clientProxy, never()).doSubscribe(SERVICE, GROUP, CLUSTER); + } + + private Set generateMockSubscriberData(boolean registered, boolean unregistering) { + SubscriberRedoData redoData = SubscriberRedoData.build(SERVICE, GROUP, CLUSTER); + redoData.setRegistered(registered); + redoData.setUnregistering(unregistering); + Set result = new HashSet<>(); + result.add(redoData); + return result; + } + + @Test + public void testRunRedoWithDisconnection() { + when(redoService.isConnected()).thenReturn(false); + redoTask.run(); + verify(redoService, never()).findInstanceRedoData(); + verify(redoService, never()).findSubscriberRedoData(); + } +}