From 4ad98d837d54e0c937b4c7bb301003e161fdd0ed Mon Sep 17 00:00:00 2001 From: Dale Lee <112548822+ldyedu@users.noreply.github.com> Date: Wed, 6 Sep 2023 17:18:18 +0800 Subject: [PATCH] [ISSUE #10374] Select clusters using the selector (#10995) * Select clusters using the selector * Add Event Cache * Update NacosNamingService,InstancesChangeNotifier --- .../nacos/api/naming/NamingService.java | 25 ++++++ .../client/naming/NacosNamingService.java | 78 +++++++++++++++---- .../naming/core/ServiceInfoUpdateService.java | 2 +- .../naming/event/InstancesChangeNotifier.java | 75 ++++++------------ .../selector/NamingSelectorWrapper.java | 71 ++++++++--------- .../client/selector/SelectorFactory.java | 2 +- .../client/selector/SelectorManager.java | 6 +- .../client/naming/NacosNamingServiceTest.java | 58 +++++++++----- .../event/InstancesChangeNotifierTest.java | 60 +++++++++----- .../selector/NamingSelectorWrapperTest.java | 24 +++--- .../client/selector/SelectorManagerTest.java | 5 +- 11 files changed, 246 insertions(+), 160 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java b/api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java index d27ad5bd0..09e3f9b84 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.selector.NamingSelector; import com.alibaba.nacos.api.selector.AbstractSelector; import java.util.List; @@ -490,6 +491,18 @@ public interface NamingService { */ void subscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException; + + /** + * Subscribe service to receive events of instances alteration. + * + * @param serviceName name of service + * @param groupName group of service + * @param selector selector of instances + * @param listener event listener + * @throws NacosException nacos exception + */ + void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener) + throws NacosException; /** * Unsubscribe event listener of service. @@ -531,6 +544,18 @@ public interface NamingService { */ void unsubscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException; + + /** + * Unsubscribe event listener of service. + * + * @param serviceName name of service + * @param groupName group of service + * @param selector selector of instances + * @param listener event listener + * @throws NacosException nacos exception + */ + void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener) + throws NacosException; /** * Get all service names from server. diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index a03e35559..a56b9be47 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -24,6 +24,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.selector.NamingSelector; import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.selector.AbstractSelector; import com.alibaba.nacos.client.env.NacosClientProperties; @@ -31,11 +32,14 @@ import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.core.Balancer; import com.alibaba.nacos.client.naming.event.InstancesChangeEvent; import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier; +import com.alibaba.nacos.client.naming.event.InstancesDiff; import com.alibaba.nacos.client.naming.remote.NamingClientProxy; import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate; +import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.alibaba.nacos.client.naming.utils.InitUtils; import com.alibaba.nacos.client.naming.utils.UtilAndComs; +import com.alibaba.nacos.client.selector.SelectorFactory; import com.alibaba.nacos.client.utils.ValidatorUtils; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.utils.StringUtils; @@ -46,6 +50,8 @@ import java.util.List; import java.util.Properties; import java.util.UUID; +import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString; + /** * Nacos Naming Service. * @@ -54,7 +60,7 @@ import java.util.UUID; @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") public class NacosNamingService implements NamingService { - private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log"; + private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log"; private static final String UP = "UP"; @@ -93,13 +99,14 @@ public class NacosNamingService implements NamingService { InitUtils.initSerialization(); InitUtils.initWebRootContext(nacosClientProperties); initLogName(nacosClientProperties); - + this.notifierEventScope = UUID.randomUUID().toString(); this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(changeNotifier); this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties); - this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier); + this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, + changeNotifier); } private void initLogName(NacosClientProperties properties) { @@ -373,8 +380,8 @@ public class NacosNamingService implements NamingService { } return Balancer.RandomByWeight.selectHost(serviceInfo); } else { - ServiceInfo serviceInfo = clientProxy - .queryInstancesOfService(serviceName, groupName, clusterString, 0, false); + ServiceInfo serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, + false); return Balancer.RandomByWeight.selectHost(serviceInfo); } } @@ -397,12 +404,25 @@ public class NacosNamingService implements NamingService { @Override public void subscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException { - if (null == listener) { + NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters); + doSubscribe(serviceName, groupName, getUniqueClusterString(clusters), clusterSelector, listener); + } + + @Override + public void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener) + throws NacosException { + doSubscribe(serviceName, groupName, Constants.NULL, selector, listener); + } + + private void doSubscribe(String serviceName, String groupName, String clusters, NamingSelector selector, + EventListener listener) throws NacosException { + if (selector == null || listener == null) { return; } - String clusterString = StringUtils.join(clusters, ","); - changeNotifier.registerListener(groupName, serviceName, clusterString, listener); - clientProxy.subscribe(serviceName, groupName, clusterString); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, clusters, selector, listener); + notifyIfSubscribed(serviceName, groupName, wrapper); + changeNotifier.registerListener(groupName, serviceName, wrapper); + clientProxy.subscribe(serviceName, groupName, Constants.NULL); } @Override @@ -423,10 +443,25 @@ public class NacosNamingService implements NamingService { @Override public void unsubscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException { - String clustersString = StringUtils.join(clusters, ","); - changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener); - if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) { - clientProxy.unsubscribe(serviceName, groupName, clustersString); + NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters); + unsubscribe(serviceName, groupName, clusterSelector, listener); + } + + @Override + public void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener) + throws NacosException { + doUnsubscribe(serviceName, groupName, selector, listener); + } + + private void doUnsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener) + throws NacosException { + if (selector == null || listener == null) { + return; + } + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener); + changeNotifier.deregisterListener(groupName, serviceName, wrapper); + if (!changeNotifier.isSubscribed(groupName, serviceName)) { + clientProxy.unsubscribe(serviceName, groupName, Constants.NULL); } } @@ -467,6 +502,23 @@ public class NacosNamingService implements NamingService { serviceInfoHolder.shutdown(); clientProxy.shutdown(); NotifyCenter.deregisterSubscriber(changeNotifier); + } + private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper) { + if (changeNotifier.isSubscribed(groupName, serviceName)) { + ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, Constants.NULL); + InstancesChangeEvent event = transferToEvent(serviceInfo); + wrapper.notifyListener(event); + } + } + + private InstancesChangeEvent transferToEvent(ServiceInfo serviceInfo) { + if (serviceInfo == null) { + return null; + } + InstancesDiff diff = new InstancesDiff(); + diff.setAddedInstances(serviceInfo.getHosts()); + return new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(), + serviceInfo.getClusters(), serviceInfo.getHosts(), diff); } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java index 9086552ab..a4f2ce25c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java @@ -179,7 +179,7 @@ public class ServiceInfoUpdateService implements Closeable { long delayTime = DEFAULT_DELAY; try { - if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey( + if (!changeNotifier.isSubscribed(groupName, serviceName) && !futureMap.containsKey( serviceKey)) { NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters); isCancel = true; diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java index 336345fbf..8c9c7500c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java @@ -16,22 +16,18 @@ package com.alibaba.nacos.client.naming.event; -import com.alibaba.nacos.api.naming.listener.AbstractEventListener; -import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.utils.NamingUtils; -import com.alibaba.nacos.client.naming.listener.NamingChangeEvent; +import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper; +import com.alibaba.nacos.client.selector.SelectorManager; import com.alibaba.nacos.common.JustForTest; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.listener.Subscriber; -import com.alibaba.nacos.common.utils.CollectionUtils; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; /** * A subscriber to notify eventListener callback. @@ -43,7 +39,7 @@ public class InstancesChangeNotifier extends Subscriber { private final String eventScope; - private final Map> listenerMap = new ConcurrentHashMap<>(); + private final SelectorManager selectorManager = new SelectorManager<>(); @JustForTest public InstancesChangeNotifier() { @@ -59,13 +55,14 @@ public class InstancesChangeNotifier extends Subscriber { * * @param groupName group name * @param serviceName serviceName - * @param clusters clusters, concat by ','. such as 'xxx,yyy' - * @param listener custom listener + * @param wrapper selectorWrapper */ - public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) { - String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); - ConcurrentHashSet eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>()); - eventListeners.add(listener); + public void registerListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) { + if (wrapper == null) { + return; + } + String subId = NamingUtils.getGroupedName(serviceName, groupName); + selectorManager.addSelectorWrapper(subId, wrapper); } /** @@ -73,38 +70,31 @@ public class InstancesChangeNotifier extends Subscriber { * * @param groupName group name * @param serviceName serviceName - * @param clusters clusters, concat by ','. such as 'xxx,yyy' - * @param listener custom listener + * @param wrapper selectorWrapper */ - public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) { - String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); - ConcurrentHashSet eventListeners = listenerMap.get(key); - if (eventListeners == null) { + public void deregisterListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) { + if (wrapper == null) { return; } - eventListeners.remove(listener); - if (CollectionUtils.isEmpty(eventListeners)) { - listenerMap.remove(key); - } + String subId = NamingUtils.getGroupedName(serviceName, groupName); + selectorManager.removeSelectorWrapper(subId, wrapper); } /** - * check serviceName,clusters is subscribed. + * check serviceName,groupName is subscribed. * * @param groupName group name * @param serviceName serviceName - * @param clusters clusters, concat by ','. such as 'xxx,yyy' * @return is serviceName,clusters subscribed */ - public boolean isSubscribed(String groupName, String serviceName, String clusters) { - String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); - ConcurrentHashSet eventListeners = listenerMap.get(key); - return CollectionUtils.isNotEmpty(eventListeners); + public boolean isSubscribed(String groupName, String serviceName) { + String subId = NamingUtils.getGroupedName(serviceName, groupName); + return selectorManager.isSubscribed(subId); } public List getSubscribeServices() { List serviceInfos = new ArrayList<>(); - for (String key : listenerMap.keySet()) { + for (String key : selectorManager.getSubscriptions()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; @@ -112,26 +102,11 @@ public class InstancesChangeNotifier extends Subscriber { @Override public void onEvent(InstancesChangeEvent event) { - String key = ServiceInfo - .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); - ConcurrentHashSet eventListeners = listenerMap.get(key); - if (CollectionUtils.isEmpty(eventListeners)) { - return; + String subId = NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()); + Collection selectorWrappers = selectorManager.getSelectorWrappers(subId); + for (NamingSelectorWrapper selectorWrapper : selectorWrappers) { + selectorWrapper.notifyListener(event); } - for (final EventListener listener : eventListeners) { - final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); - if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { - ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); - } else { - listener.onEvent(namingEvent); - } - } - } - - private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent( - InstancesChangeEvent instancesChangeEvent) { - return new NamingChangeEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), - instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts(), instancesChangeEvent.getInstancesDiff()); } @Override diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapper.java b/client/src/main/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapper.java index f42a685a2..412b120f0 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapper.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapper.java @@ -36,102 +36,97 @@ import java.util.List; * @author lideyou */ public class NamingSelectorWrapper extends AbstractSelectorWrapper { - + private String serviceName; - + private String groupName; - + private String clusters; - + private final InnerNamingContext namingContext = new InnerNamingContext(); - + private class InnerNamingContext implements NamingContext { - + private List instances; - + @Override public String getServiceName() { return serviceName; } - + @Override public String getGroupName() { return groupName; } - + @Override public String getClusters() { return clusters; } - + @Override public List getInstances() { return instances; } - + private void setInstances(List instances) { this.instances = instances; } } - + public NamingSelectorWrapper(NamingSelector selector, EventListener listener) { super(selector, new NamingListenerInvoker(listener)); } - + + public NamingSelectorWrapper(String serviceName, String groupName, String clusters, NamingSelector selector, + EventListener listener) { + this(selector, listener); + this.serviceName = serviceName; + this.groupName = groupName; + this.clusters = clusters; + } + @Override protected boolean isSelectable(InstancesChangeEvent event) { - return event != null - && event.getHosts() != null - && event.getInstancesDiff() != null; + return event != null && event.getHosts() != null && event.getInstancesDiff() != null; } - + @Override public boolean isCallable(NamingEvent event) { if (event == null) { return false; } NamingChangeEvent changeEvent = (NamingChangeEvent) event; - return changeEvent.isAdded() - || changeEvent.isRemoved() - || changeEvent.isModified(); + return changeEvent.isAdded() || changeEvent.isRemoved() || changeEvent.isModified(); } - + @Override protected NamingEvent buildListenerEvent(InstancesChangeEvent event) { - this.serviceName = event.getServiceName(); - this.groupName = event.getGroupName(); - this.clusters = event.getClusters(); - List currentIns = Collections.emptyList(); if (CollectionUtils.isNotEmpty(event.getHosts())) { currentIns = doSelect(event.getHosts()); } - + InstancesDiff diff = event.getInstancesDiff(); InstancesDiff newDiff = new InstancesDiff(); if (diff.isAdded()) { - newDiff.setAddedInstances( - doSelect(diff.getAddedInstances())); + newDiff.setAddedInstances(doSelect(diff.getAddedInstances())); } if (diff.isRemoved()) { - newDiff.setRemovedInstances( - doSelect(diff.getRemovedInstances())); + newDiff.setRemovedInstances(doSelect(diff.getRemovedInstances())); } if (diff.isModified()) { - newDiff.setModifiedInstances( - doSelect(diff.getModifiedInstances())); + newDiff.setModifiedInstances(doSelect(diff.getModifiedInstances())); } - + return new NamingChangeEvent(serviceName, groupName, clusters, currentIns, newDiff); } - + private List doSelect(List instances) { NamingContext context = getNamingContext(instances); - return this.getSelector() - .select(context) - .getResult(); + return this.getSelector().select(context).getResult(); } - + private NamingContext getNamingContext(final List instances) { namingContext.setInstances(instances); return namingContext; diff --git a/client/src/main/java/com/alibaba/nacos/client/selector/SelectorFactory.java b/client/src/main/java/com/alibaba/nacos/client/selector/SelectorFactory.java index 497cdd974..11ebf5745 100644 --- a/client/src/main/java/com/alibaba/nacos/client/selector/SelectorFactory.java +++ b/client/src/main/java/com/alibaba/nacos/client/selector/SelectorFactory.java @@ -86,7 +86,7 @@ public final class SelectorFactory { } } - private static String getUniqueClusterString(Collection cluster) { + public static String getUniqueClusterString(Collection cluster) { TreeSet treeSet = new TreeSet<>(cluster); return StringUtils.join(treeSet, ","); } diff --git a/client/src/main/java/com/alibaba/nacos/client/selector/SelectorManager.java b/client/src/main/java/com/alibaba/nacos/client/selector/SelectorManager.java index bf7325650..a74d03a03 100644 --- a/client/src/main/java/com/alibaba/nacos/client/selector/SelectorManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/selector/SelectorManager.java @@ -19,8 +19,6 @@ package com.alibaba.nacos.client.selector; import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -86,8 +84,8 @@ public class SelectorManager> { * * @return all subscriptions */ - public List getSubscriptions() { - return new ArrayList<>(selectorMap.keySet()); + public Set getSubscriptions() { + return selectorMap.keySet(); } /** diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/NacosNamingServiceTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/NacosNamingServiceTest.java index e42323893..46e0467ed 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/NacosNamingServiceTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/NacosNamingServiceTest.java @@ -29,7 +29,9 @@ import com.alibaba.nacos.client.naming.event.InstancesChangeEvent; import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier; import com.alibaba.nacos.client.naming.remote.NamingClientProxy; import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy; +import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper; import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.alibaba.nacos.client.selector.SelectorFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -39,9 +41,11 @@ import org.junit.rules.ExpectedException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; +import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -710,8 +714,10 @@ public class NacosNamingServiceTest { }; //when client.subscribe(serviceName, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL, + SelectorFactory.newClusterSelector(Collections.emptyList()), listener); //then - verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, "", listener); + verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper); verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, ""); } @@ -725,8 +731,10 @@ public class NacosNamingServiceTest { }; //when client.subscribe(serviceName, groupName, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, Constants.NULL, + SelectorFactory.newClusterSelector(Collections.emptyList()), listener); //then - verify(changeNotifier, times(1)).registerListener(groupName, serviceName, "", listener); + verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper); verify(proxy, times(1)).subscribe(serviceName, groupName, ""); } @@ -740,10 +748,11 @@ public class NacosNamingServiceTest { }; //when client.subscribe(serviceName, clusterList, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL, + SelectorFactory.newClusterSelector(clusterList), listener); //then - verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2", - listener); - verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2"); + verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper); + verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL); } @Test @@ -757,9 +766,11 @@ public class NacosNamingServiceTest { }; //when client.subscribe(serviceName, groupName, clusterList, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, + getUniqueClusterString(clusterList), SelectorFactory.newClusterSelector(clusterList), listener); //then - verify(changeNotifier, times(1)).registerListener(groupName, serviceName, "cluster1,cluster2", listener); - verify(proxy, times(1)).subscribe(serviceName, groupName, "cluster1,cluster2"); + verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper); + verify(proxy, times(1)).subscribe(serviceName, groupName, Constants.NULL); } @Test @@ -769,12 +780,14 @@ public class NacosNamingServiceTest { EventListener listener = event -> { }; - when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP, "")).thenReturn(false); + when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP)).thenReturn(false); //when client.unsubscribe(serviceName, listener); //then - verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, "", listener); - verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, ""); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper( + SelectorFactory.newClusterSelector(Collections.emptyList()), listener); + verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper); + verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL); } @Test @@ -785,13 +798,15 @@ public class NacosNamingServiceTest { EventListener listener = event -> { }; - when(changeNotifier.isSubscribed(serviceName, groupName, "")).thenReturn(false); + when(changeNotifier.isSubscribed(serviceName, groupName)).thenReturn(false); //when client.unsubscribe(serviceName, groupName, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper( + SelectorFactory.newClusterSelector(Collections.emptyList()), listener); //then - verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, "", listener); - verify(proxy, times(1)).unsubscribe(serviceName, groupName, ""); + verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper); + verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL); } @Test @@ -802,14 +817,15 @@ public class NacosNamingServiceTest { EventListener listener = event -> { }; - when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2")).thenReturn(false); + when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP)).thenReturn(false); //when client.unsubscribe(serviceName, clusterList, listener); - //then - verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2", + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList), listener); - verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2"); + //then + verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper); + verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL); } @Test @@ -821,13 +837,15 @@ public class NacosNamingServiceTest { EventListener listener = event -> { }; - when(changeNotifier.isSubscribed(serviceName, groupName, "cluster1,cluster2")).thenReturn(false); + when(changeNotifier.isSubscribed(serviceName, groupName)).thenReturn(false); //when client.unsubscribe(serviceName, groupName, clusterList, listener); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList), + listener); //then - verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, "cluster1,cluster2", listener); - verify(proxy, times(1)).unsubscribe(serviceName, groupName, "cluster1,cluster2"); + verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper); + verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL); } @Test diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java index 1dece1702..1bbb25dde 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java @@ -19,11 +19,15 @@ package com.alibaba.nacos.client.naming.event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.selector.NamingSelector; +import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper; +import com.alibaba.nacos.client.selector.SelectorFactory; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.mockito.ArgumentMatchers.any; @@ -36,22 +40,25 @@ public class InstancesChangeNotifierTest { String eventScope = "scope-001"; String group = "a"; String name = "b"; - String clusters = "c"; + String clusterStr = "c"; + List clusters = Collections.singletonList(clusterStr); InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); - instancesChangeNotifier.registerListener(group, name, clusters, listener); + NamingSelector selector = SelectorFactory.newClusterSelector(clusters); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector, + listener); + instancesChangeNotifier.registerListener(group, name, wrapper); List subscribeServices = instancesChangeNotifier.getSubscribeServices(); Assert.assertEquals(1, subscribeServices.size()); Assert.assertEquals(group, subscribeServices.get(0).getGroupName()); Assert.assertEquals(name, subscribeServices.get(0).getName()); - Assert.assertEquals(clusters, subscribeServices.get(0).getClusters()); - + List hosts = new ArrayList<>(); Instance ins = new Instance(); hosts.add(ins); InstancesDiff diff = new InstancesDiff(); diff.setAddedInstances(hosts); - InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusters, hosts, diff); + InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusterStr, hosts, diff); Assert.assertEquals(true, instancesChangeNotifier.scopeMatches(event)); } @@ -60,14 +67,17 @@ public class InstancesChangeNotifierTest { String eventScope = "scope-001"; String group = "a"; String name = "b"; - String clusters = "c"; + String clusterStr = "c"; + List clusters = Collections.singletonList(clusterStr); InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); - instancesChangeNotifier.registerListener(group, name, clusters, listener); + NamingSelector selector = SelectorFactory.newClusterSelector(clusters); + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener); + instancesChangeNotifier.registerListener(group, name, wrapper); List subscribeServices = instancesChangeNotifier.getSubscribeServices(); Assert.assertEquals(1, subscribeServices.size()); - instancesChangeNotifier.deregisterListener(group, name, clusters, listener); + instancesChangeNotifier.deregisterListener(group, name, wrapper); List subscribeServices2 = instancesChangeNotifier.getSubscribeServices(); Assert.assertEquals(0, subscribeServices2.size()); @@ -78,13 +88,17 @@ public class InstancesChangeNotifierTest { String eventScope = "scope-001"; String group = "a"; String name = "b"; - String clusters = "c"; + String clusterStr = "c"; + List clusters = Collections.singletonList(clusterStr); InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); - Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name, clusters)); - - instancesChangeNotifier.registerListener(group, name, clusters, listener); - Assert.assertTrue(instancesChangeNotifier.isSubscribed(group, name, clusters)); + NamingSelector selector = SelectorFactory.newClusterSelector(clusters); + Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name)); + + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector, + listener); + instancesChangeNotifier.registerListener(group, name, wrapper); + Assert.assertTrue(instancesChangeNotifier.isSubscribed(group, name)); } @Test @@ -92,16 +106,20 @@ public class InstancesChangeNotifierTest { String eventScope = "scope-001"; String group = "a"; String name = "b"; - String clusters = "c"; + String clusterStr = "c"; + List clusters = Collections.singletonList(clusterStr); InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); + NamingSelector selector = SelectorFactory.newClusterSelector(clusters); EventListener listener = Mockito.mock(EventListener.class); - - instancesChangeNotifier.registerListener(group, name, clusters, listener); - InstancesChangeEvent event1 = Mockito.mock(InstancesChangeEvent.class); - Mockito.when(event1.getClusters()).thenReturn(clusters); - Mockito.when(event1.getGroupName()).thenReturn(group); - Mockito.when(event1.getServiceName()).thenReturn(name); - + + NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector, + listener); + instancesChangeNotifier.registerListener(group, name, wrapper); + Instance instance = new Instance(); + InstancesDiff diff = new InstancesDiff(null, Collections.singletonList(instance), null); + instance.setClusterName("c"); + InstancesChangeEvent event1 = new InstancesChangeEvent(null, name, group, clusterStr, Collections.emptyList(), + diff); instancesChangeNotifier.onEvent(event1); Mockito.verify(listener, times(1)).onEvent(any()); } diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapperTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapperTest.java index 91fe6d6e9..205a662af 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapperTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/selector/NamingSelectorWrapperTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; public class NamingSelectorWrapperTest { - + @Test public void testEquals() { EventListener listener = mock(EventListener.class); @@ -47,12 +47,12 @@ public class NamingSelectorWrapperTest { NamingSelectorWrapper sw1 = new NamingSelectorWrapper(selector1, listener); NamingSelectorWrapper sw2 = new NamingSelectorWrapper(selector2, listener); NamingSelectorWrapper sw3 = new NamingSelectorWrapper(selector1, listener); - + assertNotEquals(sw1.hashCode(), sw2.hashCode()); assertEquals(sw1.hashCode(), sw3.hashCode()); assertNotEquals(sw1, sw2); assertEquals(sw1, sw3); - + Set set = new HashSet<>(); set.add(sw1); assertFalse(set.contains(sw2)); @@ -60,8 +60,10 @@ public class NamingSelectorWrapperTest { assertTrue(set.add(sw2)); assertFalse(set.add(sw3)); assertTrue(set.remove(sw3)); + + assertEquals(sw1, new NamingSelectorWrapper("a", "b", "c", selector1, listener)); } - + @Test public void testSelectable() { NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(null, null); @@ -72,10 +74,11 @@ public class NamingSelectorWrapperTest { assertFalse(selectorWrapper.isSelectable(event2)); InstancesChangeEvent event3 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(), null); assertFalse(selectorWrapper.isSelectable(event3)); - InstancesChangeEvent event4 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(), new InstancesDiff()); + InstancesChangeEvent event4 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(), + new InstancesDiff()); assertTrue(selectorWrapper.isSelectable(event4)); } - + @Test public void testCallable() { NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(null, null); @@ -85,14 +88,15 @@ public class NamingSelectorWrapperTest { changeEvent.getRemovedInstances().clear(); assertFalse(selectorWrapper.isCallable(changeEvent)); } - + @Test public void testNotifyListener() { EventListener listener = mock(EventListener.class); - NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(new DefaultNamingSelector(Instance::isHealthy), listener); + NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper( + new DefaultNamingSelector(Instance::isHealthy), listener); InstancesDiff diff = new InstancesDiff(null, Collections.singletonList(new Instance()), null); - InstancesChangeEvent event = - new InstancesChangeEvent(null, "serviceName", "groupName", "clusters", Collections.emptyList(), diff); + InstancesChangeEvent event = new InstancesChangeEvent(null, "serviceName", "groupName", "clusters", + Collections.emptyList(), diff); selectorWrapper.notifyListener(event); verify(listener).onEvent(argThat(Objects::nonNull)); } diff --git a/client/src/test/java/com/alibaba/nacos/client/selector/SelectorManagerTest.java b/client/src/test/java/com/alibaba/nacos/client/selector/SelectorManagerTest.java index 180ee1c4b..31b3b6a31 100644 --- a/client/src/test/java/com/alibaba/nacos/client/selector/SelectorManagerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/selector/SelectorManagerTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.Set; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -54,8 +55,8 @@ public class SelectorManagerTest { assertTrue(selectorManager.isSubscribed(subId)); } - List subsList = selectorManager.getSubscriptions(); - for (String subId : subsList) { + Set subsSet = selectorManager.getSubscriptions(); + for (String subId : subsSet) { assertTrue(list.contains(subId)); }