* Select clusters using the selector * Add Event Cache * Update NacosNamingService,InstancesChangeNotifier
This commit is contained in:
parent
cc34b6ca0d
commit
4ad98d837d
@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.ListView;
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.api.selector.AbstractSelector;
|
||||
|
||||
import java.util.List;
|
||||
@ -490,6 +491,18 @@ public interface NamingService {
|
||||
*/
|
||||
void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
/**
|
||||
* Subscribe service to receive events of instances alteration.
|
||||
*
|
||||
* @param serviceName name of service
|
||||
* @param groupName group of service
|
||||
* @param selector selector of instances
|
||||
* @param listener event listener
|
||||
* @throws NacosException nacos exception
|
||||
*/
|
||||
void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
/**
|
||||
* Unsubscribe event listener of service.
|
||||
@ -531,6 +544,18 @@ public interface NamingService {
|
||||
*/
|
||||
void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
/**
|
||||
* Unsubscribe event listener of service.
|
||||
*
|
||||
* @param serviceName name of service
|
||||
* @param groupName group of service
|
||||
* @param selector selector of instances
|
||||
* @param listener event listener
|
||||
* @throws NacosException nacos exception
|
||||
*/
|
||||
void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
/**
|
||||
* Get all service names from server.
|
||||
|
@ -24,6 +24,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.ListView;
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.api.selector.AbstractSelector;
|
||||
import com.alibaba.nacos.client.env.NacosClientProperties;
|
||||
@ -31,11 +32,14 @@ import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
|
||||
import com.alibaba.nacos.client.naming.core.Balancer;
|
||||
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
|
||||
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
|
||||
import com.alibaba.nacos.client.naming.event.InstancesDiff;
|
||||
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
|
||||
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.client.naming.utils.InitUtils;
|
||||
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import com.alibaba.nacos.client.utils.ValidatorUtils;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
@ -46,6 +50,8 @@ import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;
|
||||
|
||||
/**
|
||||
* Nacos Naming Service.
|
||||
*
|
||||
@ -54,7 +60,7 @@ import java.util.UUID;
|
||||
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
|
||||
public class NacosNamingService implements NamingService {
|
||||
|
||||
private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log";
|
||||
private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log";
|
||||
|
||||
private static final String UP = "UP";
|
||||
|
||||
@ -93,13 +99,14 @@ public class NacosNamingService implements NamingService {
|
||||
InitUtils.initSerialization();
|
||||
InitUtils.initWebRootContext(nacosClientProperties);
|
||||
initLogName(nacosClientProperties);
|
||||
|
||||
|
||||
this.notifierEventScope = UUID.randomUUID().toString();
|
||||
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
|
||||
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
|
||||
NotifyCenter.registerSubscriber(changeNotifier);
|
||||
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
|
||||
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
|
||||
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties,
|
||||
changeNotifier);
|
||||
}
|
||||
|
||||
private void initLogName(NacosClientProperties properties) {
|
||||
@ -373,8 +380,8 @@ public class NacosNamingService implements NamingService {
|
||||
}
|
||||
return Balancer.RandomByWeight.selectHost(serviceInfo);
|
||||
} else {
|
||||
ServiceInfo serviceInfo = clientProxy
|
||||
.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
|
||||
ServiceInfo serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0,
|
||||
false);
|
||||
return Balancer.RandomByWeight.selectHost(serviceInfo);
|
||||
}
|
||||
}
|
||||
@ -397,12 +404,25 @@ public class NacosNamingService implements NamingService {
|
||||
@Override
|
||||
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException {
|
||||
if (null == listener) {
|
||||
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
|
||||
doSubscribe(serviceName, groupName, getUniqueClusterString(clusters), clusterSelector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException {
|
||||
doSubscribe(serviceName, groupName, Constants.NULL, selector, listener);
|
||||
}
|
||||
|
||||
private void doSubscribe(String serviceName, String groupName, String clusters, NamingSelector selector,
|
||||
EventListener listener) throws NacosException {
|
||||
if (selector == null || listener == null) {
|
||||
return;
|
||||
}
|
||||
String clusterString = StringUtils.join(clusters, ",");
|
||||
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
|
||||
clientProxy.subscribe(serviceName, groupName, clusterString);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, clusters, selector, listener);
|
||||
notifyIfSubscribed(serviceName, groupName, wrapper);
|
||||
changeNotifier.registerListener(groupName, serviceName, wrapper);
|
||||
clientProxy.subscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -423,10 +443,25 @@ public class NacosNamingService implements NamingService {
|
||||
@Override
|
||||
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException {
|
||||
String clustersString = StringUtils.join(clusters, ",");
|
||||
changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener);
|
||||
if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) {
|
||||
clientProxy.unsubscribe(serviceName, groupName, clustersString);
|
||||
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
|
||||
unsubscribe(serviceName, groupName, clusterSelector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException {
|
||||
doUnsubscribe(serviceName, groupName, selector, listener);
|
||||
}
|
||||
|
||||
private void doUnsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException {
|
||||
if (selector == null || listener == null) {
|
||||
return;
|
||||
}
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener);
|
||||
changeNotifier.deregisterListener(groupName, serviceName, wrapper);
|
||||
if (!changeNotifier.isSubscribed(groupName, serviceName)) {
|
||||
clientProxy.unsubscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -467,6 +502,23 @@ public class NacosNamingService implements NamingService {
|
||||
serviceInfoHolder.shutdown();
|
||||
clientProxy.shutdown();
|
||||
NotifyCenter.deregisterSubscriber(changeNotifier);
|
||||
}
|
||||
|
||||
private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper) {
|
||||
if (changeNotifier.isSubscribed(groupName, serviceName)) {
|
||||
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, Constants.NULL);
|
||||
InstancesChangeEvent event = transferToEvent(serviceInfo);
|
||||
wrapper.notifyListener(event);
|
||||
}
|
||||
}
|
||||
|
||||
private InstancesChangeEvent transferToEvent(ServiceInfo serviceInfo) {
|
||||
if (serviceInfo == null) {
|
||||
return null;
|
||||
}
|
||||
InstancesDiff diff = new InstancesDiff();
|
||||
diff.setAddedInstances(serviceInfo.getHosts());
|
||||
return new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
|
||||
serviceInfo.getClusters(), serviceInfo.getHosts(), diff);
|
||||
}
|
||||
}
|
||||
|
@ -179,7 +179,7 @@ public class ServiceInfoUpdateService implements Closeable {
|
||||
long delayTime = DEFAULT_DELAY;
|
||||
|
||||
try {
|
||||
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
|
||||
if (!changeNotifier.isSubscribed(groupName, serviceName) && !futureMap.containsKey(
|
||||
serviceKey)) {
|
||||
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
|
||||
isCancel = true;
|
||||
|
@ -16,22 +16,18 @@
|
||||
|
||||
package com.alibaba.nacos.client.naming.event;
|
||||
|
||||
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.client.naming.listener.NamingChangeEvent;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.selector.SelectorManager;
|
||||
import com.alibaba.nacos.common.JustForTest;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* A subscriber to notify eventListener callback.
|
||||
@ -43,7 +39,7 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
|
||||
|
||||
private final String eventScope;
|
||||
|
||||
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<>();
|
||||
private final SelectorManager<NamingSelectorWrapper> selectorManager = new SelectorManager<>();
|
||||
|
||||
@JustForTest
|
||||
public InstancesChangeNotifier() {
|
||||
@ -59,13 +55,14 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
|
||||
*
|
||||
* @param groupName group name
|
||||
* @param serviceName serviceName
|
||||
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
|
||||
* @param listener custom listener
|
||||
* @param wrapper selectorWrapper
|
||||
*/
|
||||
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
|
||||
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
|
||||
ConcurrentHashSet<EventListener> eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>());
|
||||
eventListeners.add(listener);
|
||||
public void registerListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) {
|
||||
if (wrapper == null) {
|
||||
return;
|
||||
}
|
||||
String subId = NamingUtils.getGroupedName(serviceName, groupName);
|
||||
selectorManager.addSelectorWrapper(subId, wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -73,38 +70,31 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
|
||||
*
|
||||
* @param groupName group name
|
||||
* @param serviceName serviceName
|
||||
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
|
||||
* @param listener custom listener
|
||||
* @param wrapper selectorWrapper
|
||||
*/
|
||||
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
|
||||
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
|
||||
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
|
||||
if (eventListeners == null) {
|
||||
public void deregisterListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) {
|
||||
if (wrapper == null) {
|
||||
return;
|
||||
}
|
||||
eventListeners.remove(listener);
|
||||
if (CollectionUtils.isEmpty(eventListeners)) {
|
||||
listenerMap.remove(key);
|
||||
}
|
||||
String subId = NamingUtils.getGroupedName(serviceName, groupName);
|
||||
selectorManager.removeSelectorWrapper(subId, wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* check serviceName,clusters is subscribed.
|
||||
* check serviceName,groupName is subscribed.
|
||||
*
|
||||
* @param groupName group name
|
||||
* @param serviceName serviceName
|
||||
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
|
||||
* @return is serviceName,clusters subscribed
|
||||
*/
|
||||
public boolean isSubscribed(String groupName, String serviceName, String clusters) {
|
||||
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
|
||||
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
|
||||
return CollectionUtils.isNotEmpty(eventListeners);
|
||||
public boolean isSubscribed(String groupName, String serviceName) {
|
||||
String subId = NamingUtils.getGroupedName(serviceName, groupName);
|
||||
return selectorManager.isSubscribed(subId);
|
||||
}
|
||||
|
||||
public List<ServiceInfo> getSubscribeServices() {
|
||||
List<ServiceInfo> serviceInfos = new ArrayList<>();
|
||||
for (String key : listenerMap.keySet()) {
|
||||
for (String key : selectorManager.getSubscriptions()) {
|
||||
serviceInfos.add(ServiceInfo.fromKey(key));
|
||||
}
|
||||
return serviceInfos;
|
||||
@ -112,26 +102,11 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
|
||||
|
||||
@Override
|
||||
public void onEvent(InstancesChangeEvent event) {
|
||||
String key = ServiceInfo
|
||||
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
|
||||
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
|
||||
if (CollectionUtils.isEmpty(eventListeners)) {
|
||||
return;
|
||||
String subId = NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName());
|
||||
Collection<NamingSelectorWrapper> selectorWrappers = selectorManager.getSelectorWrappers(subId);
|
||||
for (NamingSelectorWrapper selectorWrapper : selectorWrappers) {
|
||||
selectorWrapper.notifyListener(event);
|
||||
}
|
||||
for (final EventListener listener : eventListeners) {
|
||||
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
|
||||
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
|
||||
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
|
||||
} else {
|
||||
listener.onEvent(namingEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
|
||||
InstancesChangeEvent instancesChangeEvent) {
|
||||
return new NamingChangeEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
|
||||
instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts(), instancesChangeEvent.getInstancesDiff());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -36,102 +36,97 @@ import java.util.List;
|
||||
* @author lideyou
|
||||
*/
|
||||
public class NamingSelectorWrapper extends AbstractSelectorWrapper<NamingSelector, NamingEvent, InstancesChangeEvent> {
|
||||
|
||||
|
||||
private String serviceName;
|
||||
|
||||
|
||||
private String groupName;
|
||||
|
||||
|
||||
private String clusters;
|
||||
|
||||
|
||||
private final InnerNamingContext namingContext = new InnerNamingContext();
|
||||
|
||||
|
||||
private class InnerNamingContext implements NamingContext {
|
||||
|
||||
|
||||
private List<Instance> instances;
|
||||
|
||||
|
||||
@Override
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getGroupName() {
|
||||
return groupName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getClusters() {
|
||||
return clusters;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Instance> getInstances() {
|
||||
return instances;
|
||||
}
|
||||
|
||||
|
||||
private void setInstances(List<Instance> instances) {
|
||||
this.instances = instances;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public NamingSelectorWrapper(NamingSelector selector, EventListener listener) {
|
||||
super(selector, new NamingListenerInvoker(listener));
|
||||
}
|
||||
|
||||
|
||||
public NamingSelectorWrapper(String serviceName, String groupName, String clusters, NamingSelector selector,
|
||||
EventListener listener) {
|
||||
this(selector, listener);
|
||||
this.serviceName = serviceName;
|
||||
this.groupName = groupName;
|
||||
this.clusters = clusters;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isSelectable(InstancesChangeEvent event) {
|
||||
return event != null
|
||||
&& event.getHosts() != null
|
||||
&& event.getInstancesDiff() != null;
|
||||
return event != null && event.getHosts() != null && event.getInstancesDiff() != null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isCallable(NamingEvent event) {
|
||||
if (event == null) {
|
||||
return false;
|
||||
}
|
||||
NamingChangeEvent changeEvent = (NamingChangeEvent) event;
|
||||
return changeEvent.isAdded()
|
||||
|| changeEvent.isRemoved()
|
||||
|| changeEvent.isModified();
|
||||
return changeEvent.isAdded() || changeEvent.isRemoved() || changeEvent.isModified();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected NamingEvent buildListenerEvent(InstancesChangeEvent event) {
|
||||
this.serviceName = event.getServiceName();
|
||||
this.groupName = event.getGroupName();
|
||||
this.clusters = event.getClusters();
|
||||
|
||||
List<Instance> currentIns = Collections.emptyList();
|
||||
if (CollectionUtils.isNotEmpty(event.getHosts())) {
|
||||
currentIns = doSelect(event.getHosts());
|
||||
}
|
||||
|
||||
|
||||
InstancesDiff diff = event.getInstancesDiff();
|
||||
InstancesDiff newDiff = new InstancesDiff();
|
||||
if (diff.isAdded()) {
|
||||
newDiff.setAddedInstances(
|
||||
doSelect(diff.getAddedInstances()));
|
||||
newDiff.setAddedInstances(doSelect(diff.getAddedInstances()));
|
||||
}
|
||||
if (diff.isRemoved()) {
|
||||
newDiff.setRemovedInstances(
|
||||
doSelect(diff.getRemovedInstances()));
|
||||
newDiff.setRemovedInstances(doSelect(diff.getRemovedInstances()));
|
||||
}
|
||||
if (diff.isModified()) {
|
||||
newDiff.setModifiedInstances(
|
||||
doSelect(diff.getModifiedInstances()));
|
||||
newDiff.setModifiedInstances(doSelect(diff.getModifiedInstances()));
|
||||
}
|
||||
|
||||
|
||||
return new NamingChangeEvent(serviceName, groupName, clusters, currentIns, newDiff);
|
||||
}
|
||||
|
||||
|
||||
private List<Instance> doSelect(List<Instance> instances) {
|
||||
NamingContext context = getNamingContext(instances);
|
||||
return this.getSelector()
|
||||
.select(context)
|
||||
.getResult();
|
||||
return this.getSelector().select(context).getResult();
|
||||
}
|
||||
|
||||
|
||||
private NamingContext getNamingContext(final List<Instance> instances) {
|
||||
namingContext.setInstances(instances);
|
||||
return namingContext;
|
||||
|
@ -86,7 +86,7 @@ public final class SelectorFactory {
|
||||
}
|
||||
}
|
||||
|
||||
private static String getUniqueClusterString(Collection<String> cluster) {
|
||||
public static String getUniqueClusterString(Collection<String> cluster) {
|
||||
TreeSet<String> treeSet = new TreeSet<>(cluster);
|
||||
return StringUtils.join(treeSet, ",");
|
||||
}
|
||||
|
@ -19,8 +19,6 @@ package com.alibaba.nacos.client.selector;
|
||||
import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -86,8 +84,8 @@ public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {
|
||||
*
|
||||
* @return all subscriptions
|
||||
*/
|
||||
public List<String> getSubscriptions() {
|
||||
return new ArrayList<>(selectorMap.keySet());
|
||||
public Set<String> getSubscriptions() {
|
||||
return selectorMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -29,7 +29,9 @@ import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
|
||||
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
|
||||
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
|
||||
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -39,9 +41,11 @@ import org.junit.rules.ExpectedException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
@ -710,8 +714,10 @@ public class NacosNamingServiceTest {
|
||||
};
|
||||
//when
|
||||
client.subscribe(serviceName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, "", listener);
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, "");
|
||||
}
|
||||
|
||||
@ -725,8 +731,10 @@ public class NacosNamingServiceTest {
|
||||
};
|
||||
//when
|
||||
client.subscribe(serviceName, groupName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, "", listener);
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, "");
|
||||
}
|
||||
|
||||
@ -740,10 +748,11 @@ public class NacosNamingServiceTest {
|
||||
};
|
||||
//when
|
||||
client.subscribe(serviceName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(clusterList), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2",
|
||||
listener);
|
||||
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2");
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -757,9 +766,11 @@ public class NacosNamingServiceTest {
|
||||
};
|
||||
//when
|
||||
client.subscribe(serviceName, groupName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName,
|
||||
getUniqueClusterString(clusterList), SelectorFactory.newClusterSelector(clusterList), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, "cluster1,cluster2", listener);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, "cluster1,cluster2");
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -769,12 +780,14 @@ public class NacosNamingServiceTest {
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP, "")).thenReturn(false);
|
||||
when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP)).thenReturn(false);
|
||||
//when
|
||||
client.unsubscribe(serviceName, listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, "", listener);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, "");
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -785,13 +798,15 @@ public class NacosNamingServiceTest {
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
when(changeNotifier.isSubscribed(serviceName, groupName, "")).thenReturn(false);
|
||||
when(changeNotifier.isSubscribed(serviceName, groupName)).thenReturn(false);
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, groupName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, "", listener);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, "");
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -802,14 +817,15 @@ public class NacosNamingServiceTest {
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2")).thenReturn(false);
|
||||
when(changeNotifier.isSubscribed(serviceName, Constants.DEFAULT_GROUP)).thenReturn(false);
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, clusterList, listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2",
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList),
|
||||
listener);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2");
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -821,13 +837,15 @@ public class NacosNamingServiceTest {
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
when(changeNotifier.isSubscribed(serviceName, groupName, "cluster1,cluster2")).thenReturn(false);
|
||||
when(changeNotifier.isSubscribed(serviceName, groupName)).thenReturn(false);
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, groupName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList),
|
||||
listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, "cluster1,cluster2", listener);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, "cluster1,cluster2");
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -19,11 +19,15 @@ package com.alibaba.nacos.client.naming.event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
@ -36,22 +40,25 @@ public class InstancesChangeNotifierTest {
|
||||
String eventScope = "scope-001";
|
||||
String group = "a";
|
||||
String name = "b";
|
||||
String clusters = "c";
|
||||
String clusterStr = "c";
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
instancesChangeNotifier.registerListener(group, name, clusters, listener);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
|
||||
Assert.assertEquals(1, subscribeServices.size());
|
||||
Assert.assertEquals(group, subscribeServices.get(0).getGroupName());
|
||||
Assert.assertEquals(name, subscribeServices.get(0).getName());
|
||||
Assert.assertEquals(clusters, subscribeServices.get(0).getClusters());
|
||||
|
||||
|
||||
List<Instance> hosts = new ArrayList<>();
|
||||
Instance ins = new Instance();
|
||||
hosts.add(ins);
|
||||
InstancesDiff diff = new InstancesDiff();
|
||||
diff.setAddedInstances(hosts);
|
||||
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusters, hosts, diff);
|
||||
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusterStr, hosts, diff);
|
||||
Assert.assertEquals(true, instancesChangeNotifier.scopeMatches(event));
|
||||
}
|
||||
|
||||
@ -60,14 +67,17 @@ public class InstancesChangeNotifierTest {
|
||||
String eventScope = "scope-001";
|
||||
String group = "a";
|
||||
String name = "b";
|
||||
String clusters = "c";
|
||||
String clusterStr = "c";
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
instancesChangeNotifier.registerListener(group, name, clusters, listener);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
|
||||
Assert.assertEquals(1, subscribeServices.size());
|
||||
|
||||
instancesChangeNotifier.deregisterListener(group, name, clusters, listener);
|
||||
instancesChangeNotifier.deregisterListener(group, name, wrapper);
|
||||
|
||||
List<ServiceInfo> subscribeServices2 = instancesChangeNotifier.getSubscribeServices();
|
||||
Assert.assertEquals(0, subscribeServices2.size());
|
||||
@ -78,13 +88,17 @@ public class InstancesChangeNotifierTest {
|
||||
String eventScope = "scope-001";
|
||||
String group = "a";
|
||||
String name = "b";
|
||||
String clusters = "c";
|
||||
String clusterStr = "c";
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name, clusters));
|
||||
|
||||
instancesChangeNotifier.registerListener(group, name, clusters, listener);
|
||||
Assert.assertTrue(instancesChangeNotifier.isSubscribed(group, name, clusters));
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name));
|
||||
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
Assert.assertTrue(instancesChangeNotifier.isSubscribed(group, name));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -92,16 +106,20 @@ public class InstancesChangeNotifierTest {
|
||||
String eventScope = "scope-001";
|
||||
String group = "a";
|
||||
String name = "b";
|
||||
String clusters = "c";
|
||||
String clusterStr = "c";
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
|
||||
instancesChangeNotifier.registerListener(group, name, clusters, listener);
|
||||
InstancesChangeEvent event1 = Mockito.mock(InstancesChangeEvent.class);
|
||||
Mockito.when(event1.getClusters()).thenReturn(clusters);
|
||||
Mockito.when(event1.getGroupName()).thenReturn(group);
|
||||
Mockito.when(event1.getServiceName()).thenReturn(name);
|
||||
|
||||
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
Instance instance = new Instance();
|
||||
InstancesDiff diff = new InstancesDiff(null, Collections.singletonList(instance), null);
|
||||
instance.setClusterName("c");
|
||||
InstancesChangeEvent event1 = new InstancesChangeEvent(null, name, group, clusterStr, Collections.emptyList(),
|
||||
diff);
|
||||
instancesChangeNotifier.onEvent(event1);
|
||||
Mockito.verify(listener, times(1)).onEvent(any());
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class NamingSelectorWrapperTest {
|
||||
|
||||
|
||||
@Test
|
||||
public void testEquals() {
|
||||
EventListener listener = mock(EventListener.class);
|
||||
@ -47,12 +47,12 @@ public class NamingSelectorWrapperTest {
|
||||
NamingSelectorWrapper sw1 = new NamingSelectorWrapper(selector1, listener);
|
||||
NamingSelectorWrapper sw2 = new NamingSelectorWrapper(selector2, listener);
|
||||
NamingSelectorWrapper sw3 = new NamingSelectorWrapper(selector1, listener);
|
||||
|
||||
|
||||
assertNotEquals(sw1.hashCode(), sw2.hashCode());
|
||||
assertEquals(sw1.hashCode(), sw3.hashCode());
|
||||
assertNotEquals(sw1, sw2);
|
||||
assertEquals(sw1, sw3);
|
||||
|
||||
|
||||
Set<NamingSelectorWrapper> set = new HashSet<>();
|
||||
set.add(sw1);
|
||||
assertFalse(set.contains(sw2));
|
||||
@ -60,8 +60,10 @@ public class NamingSelectorWrapperTest {
|
||||
assertTrue(set.add(sw2));
|
||||
assertFalse(set.add(sw3));
|
||||
assertTrue(set.remove(sw3));
|
||||
|
||||
assertEquals(sw1, new NamingSelectorWrapper("a", "b", "c", selector1, listener));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSelectable() {
|
||||
NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(null, null);
|
||||
@ -72,10 +74,11 @@ public class NamingSelectorWrapperTest {
|
||||
assertFalse(selectorWrapper.isSelectable(event2));
|
||||
InstancesChangeEvent event3 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(), null);
|
||||
assertFalse(selectorWrapper.isSelectable(event3));
|
||||
InstancesChangeEvent event4 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(), new InstancesDiff());
|
||||
InstancesChangeEvent event4 = new InstancesChangeEvent(null, null, null, null, Collections.emptyList(),
|
||||
new InstancesDiff());
|
||||
assertTrue(selectorWrapper.isSelectable(event4));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCallable() {
|
||||
NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(null, null);
|
||||
@ -85,14 +88,15 @@ public class NamingSelectorWrapperTest {
|
||||
changeEvent.getRemovedInstances().clear();
|
||||
assertFalse(selectorWrapper.isCallable(changeEvent));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNotifyListener() {
|
||||
EventListener listener = mock(EventListener.class);
|
||||
NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(new DefaultNamingSelector(Instance::isHealthy), listener);
|
||||
NamingSelectorWrapper selectorWrapper = new NamingSelectorWrapper(
|
||||
new DefaultNamingSelector(Instance::isHealthy), listener);
|
||||
InstancesDiff diff = new InstancesDiff(null, Collections.singletonList(new Instance()), null);
|
||||
InstancesChangeEvent event =
|
||||
new InstancesChangeEvent(null, "serviceName", "groupName", "clusters", Collections.emptyList(), diff);
|
||||
InstancesChangeEvent event = new InstancesChangeEvent(null, "serviceName", "groupName", "clusters",
|
||||
Collections.emptyList(), diff);
|
||||
selectorWrapper.notifyListener(event);
|
||||
verify(listener).onEvent(argThat(Objects::nonNull));
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.junit.Test;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
@ -54,8 +55,8 @@ public class SelectorManagerTest {
|
||||
assertTrue(selectorManager.isSubscribed(subId));
|
||||
}
|
||||
|
||||
List<String> subsList = selectorManager.getSubscriptions();
|
||||
for (String subId : subsList) {
|
||||
Set<String> subsSet = selectorManager.getSubscriptions();
|
||||
for (String subId : subsSet) {
|
||||
assertTrue(list.contains(subId));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user