* Add default selectors * add tests * Update SubscribeSelector_ITCase * add unsubscribe test * Removes some methods for NamingSelectorFactor * Update SelectorManager
This commit is contained in:
parent
4ad98d837d
commit
263e223d94
@ -491,7 +491,17 @@ public interface NamingService {
|
||||
*/
|
||||
void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
|
||||
/**
|
||||
* Subscribe service to receive events of instances alteration.
|
||||
*
|
||||
* @param serviceName name of service
|
||||
* @param selector selector of instances
|
||||
* @param listener event listener
|
||||
* @throws NacosException nacos exception
|
||||
*/
|
||||
void subscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException;
|
||||
|
||||
/**
|
||||
* Subscribe service to receive events of instances alteration.
|
||||
*
|
||||
@ -544,7 +554,17 @@ public interface NamingService {
|
||||
*/
|
||||
void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException;
|
||||
|
||||
|
||||
/**
|
||||
* Unsubscribe event listener of service.
|
||||
*
|
||||
* @param serviceName name of service
|
||||
* @param selector selector of instances
|
||||
* @param listener event listener
|
||||
* @throws NacosException nacos exception
|
||||
*/
|
||||
void unsubscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException;
|
||||
|
||||
/**
|
||||
* Unsubscribe event listener of service.
|
||||
*
|
||||
|
@ -39,7 +39,7 @@ import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.client.naming.utils.InitUtils;
|
||||
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorFactory;
|
||||
import com.alibaba.nacos.client.utils.ValidatorUtils;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
@ -50,7 +50,7 @@ import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;
|
||||
import static com.alibaba.nacos.client.naming.selector.NamingSelectorFactory.getUniqueClusterString;
|
||||
|
||||
/**
|
||||
* Nacos Naming Service.
|
||||
@ -404,10 +404,15 @@ public class NacosNamingService implements NamingService {
|
||||
@Override
|
||||
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException {
|
||||
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector clusterSelector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
doSubscribe(serviceName, groupName, getUniqueClusterString(clusters), clusterSelector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException {
|
||||
subscribe(serviceName, Constants.DEFAULT_GROUP, selector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException {
|
||||
@ -443,10 +448,15 @@ public class NacosNamingService implements NamingService {
|
||||
@Override
|
||||
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
|
||||
throws NacosException {
|
||||
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector clusterSelector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
unsubscribe(serviceName, groupName, clusterSelector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException {
|
||||
unsubscribe(serviceName, Constants.DEFAULT_GROUP, selector, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
|
||||
throws NacosException {
|
||||
|
@ -14,40 +14,45 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.client.selector;
|
||||
package com.alibaba.nacos.client.naming.selector;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.DefaultNamingSelector;
|
||||
import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Selectors factory.
|
||||
*
|
||||
* @author lideyou
|
||||
*/
|
||||
public final class SelectorFactory {
|
||||
private static final NamingSelector EMPTY_SELECTOR = context -> context::getInstances;
|
||||
|
||||
public final class NamingSelectorFactory {
|
||||
|
||||
public static final NamingSelector EMPTY_SELECTOR = context -> context::getInstances;
|
||||
|
||||
public static final NamingSelector HEALTHY_SELECTOR = new DefaultNamingSelector(Instance::isHealthy);
|
||||
|
||||
/**
|
||||
* Cluster selector.
|
||||
*/
|
||||
private static class ClusterSelector extends DefaultNamingSelector {
|
||||
|
||||
private final String clusterString;
|
||||
|
||||
|
||||
public ClusterSelector(Predicate<Instance> filter, String clusterString) {
|
||||
super(filter);
|
||||
this.clusterString = clusterString;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
@ -59,16 +64,16 @@ public final class SelectorFactory {
|
||||
ClusterSelector that = (ClusterSelector) o;
|
||||
return Objects.equals(this.clusterString, that.clusterString);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(this.clusterString);
|
||||
}
|
||||
}
|
||||
|
||||
private SelectorFactory() {
|
||||
|
||||
private NamingSelectorFactory() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a cluster selector.
|
||||
*
|
||||
@ -85,9 +90,61 @@ public final class SelectorFactory {
|
||||
return EMPTY_SELECTOR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a IP selector.
|
||||
*
|
||||
* @param regex regular expression of IP
|
||||
* @return IP selector
|
||||
*/
|
||||
public static NamingSelector newIpSelector(String regex) {
|
||||
if (regex == null) {
|
||||
throw new IllegalArgumentException("The parameter 'regex' cannot be null.");
|
||||
}
|
||||
return new DefaultNamingSelector(instance -> Pattern.matches(regex, instance.getIp()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a metadata selector.
|
||||
*
|
||||
* @param metadata metadata that needs to be matched
|
||||
* @return metadata selector
|
||||
*/
|
||||
public static NamingSelector newMetadataSelector(Map<String, String> metadata) {
|
||||
return newMetadataSelector(metadata, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a metadata selector.
|
||||
*
|
||||
* @param metadata target metadata
|
||||
* @param isAny true if any of the metadata needs to be matched, false if all the metadata need to be matched.
|
||||
* @return metadata selector
|
||||
*/
|
||||
public static NamingSelector newMetadataSelector(Map<String, String> metadata, boolean isAny) {
|
||||
if (metadata == null) {
|
||||
throw new IllegalArgumentException("The parameter 'metadata' cannot be null.");
|
||||
}
|
||||
|
||||
Predicate<Instance> filter = instance -> instance.getMetadata().size() >= metadata.size();
|
||||
|
||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||
Predicate<Instance> nextFilter = instance -> {
|
||||
Map<String, String> map = instance.getMetadata();
|
||||
return Objects.equals(map.get(entry.getKey()), entry.getValue());
|
||||
};
|
||||
if (isAny) {
|
||||
filter = filter.or(nextFilter);
|
||||
} else {
|
||||
filter = filter.and(nextFilter);
|
||||
}
|
||||
}
|
||||
return new DefaultNamingSelector(filter);
|
||||
}
|
||||
|
||||
public static String getUniqueClusterString(Collection<String> cluster) {
|
||||
TreeSet<String> treeSet = new TreeSet<>(cluster);
|
||||
return StringUtils.join(treeSet, ",");
|
||||
}
|
||||
|
||||
}
|
@ -30,19 +30,25 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
* @author lideyou
|
||||
*/
|
||||
public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {
|
||||
|
||||
Map<String, Set<S>> selectorMap = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
/**
|
||||
* Add a selectorWrapper to subId.
|
||||
*
|
||||
* @param subId subscription id
|
||||
* @param selector selector wrapper
|
||||
* @param subId subscription id
|
||||
* @param wrapper selector wrapper
|
||||
*/
|
||||
public void addSelectorWrapper(String subId, S selector) {
|
||||
Set<S> selectors = selectorMap.computeIfAbsent(subId, key -> new ConcurrentHashSet<>());
|
||||
selectors.add(selector);
|
||||
public void addSelectorWrapper(String subId, S wrapper) {
|
||||
selectorMap.compute(subId, (k, v) -> {
|
||||
if (v == null) {
|
||||
v = new ConcurrentHashSet<>();
|
||||
}
|
||||
v.add(wrapper);
|
||||
return v;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get all SelectorWrappers by id.
|
||||
*
|
||||
@ -52,24 +58,20 @@ public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {
|
||||
public Set<S> getSelectorWrappers(String subId) {
|
||||
return selectorMap.get(subId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Remove a SelectorWrapper by id.
|
||||
*
|
||||
* @param subId subscription id
|
||||
* @param selector selector wrapper
|
||||
* @param subId subscription id
|
||||
* @param wrapper selector wrapper
|
||||
*/
|
||||
public void removeSelectorWrapper(String subId, S selector) {
|
||||
Set<S> selectors = selectorMap.get(subId);
|
||||
if (selectors == null) {
|
||||
return;
|
||||
}
|
||||
selectors.remove(selector);
|
||||
if (CollectionUtils.isEmpty(selectors)) {
|
||||
selectorMap.remove(subId);
|
||||
}
|
||||
public void removeSelectorWrapper(String subId, S wrapper) {
|
||||
selectorMap.computeIfPresent(subId, (k, v) -> {
|
||||
v.remove(wrapper);
|
||||
return v.isEmpty() ? null : v;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Remove a subscription by id.
|
||||
*
|
||||
@ -78,7 +80,7 @@ public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {
|
||||
public void removeSubscription(String subId) {
|
||||
selectorMap.remove(subId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get all subscriptions.
|
||||
*
|
||||
@ -87,7 +89,7 @@ public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {
|
||||
public Set<String> getSubscriptions() {
|
||||
return selectorMap.keySet();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Determine whether subId is subscribed.
|
||||
*
|
||||
|
@ -31,7 +31,7 @@ import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
|
||||
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -45,7 +45,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;
|
||||
import static com.alibaba.nacos.client.naming.selector.NamingSelectorFactory.getUniqueClusterString;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
@ -715,7 +715,7 @@ public class NacosNamingServiceTest {
|
||||
//when
|
||||
client.subscribe(serviceName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
NamingSelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, "");
|
||||
@ -732,7 +732,7 @@ public class NacosNamingServiceTest {
|
||||
//when
|
||||
client.subscribe(serviceName, groupName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
NamingSelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, "");
|
||||
@ -749,7 +749,7 @@ public class NacosNamingServiceTest {
|
||||
//when
|
||||
client.subscribe(serviceName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, Constants.DEFAULT_GROUP, Constants.NULL,
|
||||
SelectorFactory.newClusterSelector(clusterList), listener);
|
||||
NamingSelectorFactory.newClusterSelector(clusterList), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL);
|
||||
@ -767,7 +767,23 @@ public class NacosNamingServiceTest {
|
||||
//when
|
||||
client.subscribe(serviceName, groupName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName,
|
||||
getUniqueClusterString(clusterList), SelectorFactory.newClusterSelector(clusterList), listener);
|
||||
getUniqueClusterString(clusterList), NamingSelectorFactory.newClusterSelector(clusterList), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscribe5() throws NacosException {
|
||||
String serviceName = "service1";
|
||||
String groupName = "group1";
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
//when
|
||||
client.subscribe(serviceName, groupName, NamingSelectorFactory.HEALTHY_SELECTOR, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName,
|
||||
Constants.NULL, NamingSelectorFactory.HEALTHY_SELECTOR, listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).registerListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).subscribe(serviceName, groupName, Constants.NULL);
|
||||
@ -785,7 +801,7 @@ public class NacosNamingServiceTest {
|
||||
client.unsubscribe(serviceName, listener);
|
||||
//then
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
NamingSelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, Constants.NULL);
|
||||
}
|
||||
@ -803,7 +819,7 @@ public class NacosNamingServiceTest {
|
||||
//when
|
||||
client.unsubscribe(serviceName, groupName, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(
|
||||
SelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
NamingSelectorFactory.newClusterSelector(Collections.emptyList()), listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL);
|
||||
@ -821,7 +837,7 @@ public class NacosNamingServiceTest {
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList),
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(NamingSelectorFactory.newClusterSelector(clusterList),
|
||||
listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, wrapper);
|
||||
@ -841,7 +857,26 @@ public class NacosNamingServiceTest {
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, groupName, clusterList, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(SelectorFactory.newClusterSelector(clusterList),
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(NamingSelectorFactory.newClusterSelector(clusterList),
|
||||
listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper);
|
||||
verify(proxy, times(1)).unsubscribe(serviceName, groupName, Constants.NULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnSubscribe5() throws NacosException {
|
||||
//given
|
||||
String serviceName = "service1";
|
||||
String groupName = "group1";
|
||||
EventListener listener = event -> {
|
||||
|
||||
};
|
||||
when(changeNotifier.isSubscribed(serviceName, groupName)).thenReturn(false);
|
||||
|
||||
//when
|
||||
client.unsubscribe(serviceName, groupName, NamingSelectorFactory.HEALTHY_SELECTOR, listener);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(NamingSelectorFactory.HEALTHY_SELECTOR,
|
||||
listener);
|
||||
//then
|
||||
verify(changeNotifier, times(1)).deregisterListener(groupName, serviceName, wrapper);
|
||||
|
@ -21,7 +21,7 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
|
||||
import com.alibaba.nacos.client.selector.SelectorFactory;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
@ -44,7 +44,7 @@ public class InstancesChangeNotifierTest {
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector selector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
@ -71,7 +71,7 @@ public class InstancesChangeNotifierTest {
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector selector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener);
|
||||
instancesChangeNotifier.registerListener(group, name, wrapper);
|
||||
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
|
||||
@ -92,7 +92,7 @@ public class InstancesChangeNotifierTest {
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector selector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name));
|
||||
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
@ -109,7 +109,7 @@ public class InstancesChangeNotifierTest {
|
||||
String clusterStr = "c";
|
||||
List<String> clusters = Collections.singletonList(clusterStr);
|
||||
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
|
||||
NamingSelector selector = SelectorFactory.newClusterSelector(clusters);
|
||||
NamingSelector selector = NamingSelectorFactory.newClusterSelector(clusters);
|
||||
EventListener listener = Mockito.mock(EventListener.class);
|
||||
|
||||
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(name, group, clusterStr, selector,
|
||||
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.client.naming.selector;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingContext;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingResult;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class NamingSelectorFactoryTest {
|
||||
|
||||
@Test
|
||||
public void testNewClusterSelector1() {
|
||||
Instance ins1 = new Instance();
|
||||
ins1.setClusterName("a");
|
||||
Instance ins2 = new Instance();
|
||||
ins2.setClusterName("b");
|
||||
Instance ins3 = new Instance();
|
||||
ins3.setClusterName("c");
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
NamingSelector namingSelector1 = NamingSelectorFactory.newClusterSelector(Collections.singletonList("a"));
|
||||
NamingResult result1 = namingSelector1.select(namingContext);
|
||||
assertEquals("a", result1.getResult().get(0).getClusterName());
|
||||
|
||||
NamingSelector namingSelector2 = NamingSelectorFactory.newClusterSelector(Collections.emptyList());
|
||||
NamingResult result2 = namingSelector2.select(namingContext);
|
||||
assertEquals(3, result2.getResult().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewClusterSelector2() {
|
||||
NamingSelector namingSelector1 = NamingSelectorFactory.newClusterSelector(Arrays.asList("a", "b", "c"));
|
||||
NamingSelector namingSelector2 = NamingSelectorFactory.newClusterSelector(Arrays.asList("c", "b", "a"));
|
||||
NamingSelector namingSelector3 = NamingSelectorFactory.newClusterSelector(Arrays.asList("a", "b", "c", "c"));
|
||||
NamingSelector namingSelector4 = NamingSelectorFactory.newClusterSelector(Arrays.asList("d", "e"));
|
||||
|
||||
assertEquals(namingSelector1, namingSelector2);
|
||||
assertEquals(namingSelector1, namingSelector3);
|
||||
assertNotEquals(namingSelector1, namingSelector4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewIpSelector() {
|
||||
Instance ins1 = new Instance();
|
||||
ins1.setIp("172.18.137.120");
|
||||
Instance ins2 = new Instance();
|
||||
ins2.setIp("172.18.137.121");
|
||||
Instance ins3 = new Instance();
|
||||
ins3.setIp("172.18.136.111");
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
NamingSelector ipSelector = NamingSelectorFactory.newIpSelector("^172\\.18\\.137.*");
|
||||
NamingResult result = ipSelector.select(namingContext);
|
||||
List<Instance> list = result.getResult();
|
||||
|
||||
assertEquals(2, list.size());
|
||||
assertEquals(ins1.getIp(), list.get(0).getIp());
|
||||
assertEquals(ins2.getIp(), list.get(1).getIp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewMetadataSelector() {
|
||||
Instance ins1 = new Instance();
|
||||
ins1.addMetadata("a", "1");
|
||||
ins1.addMetadata("b", "2");
|
||||
Instance ins2 = new Instance();
|
||||
ins2.addMetadata("a", "1");
|
||||
Instance ins3 = new Instance();
|
||||
ins3.addMetadata("b", "2");
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
NamingSelector metadataSelector = NamingSelectorFactory.newMetadataSelector(new HashMap() {
|
||||
{
|
||||
put("a", "1");
|
||||
put("b", "2");
|
||||
}
|
||||
});
|
||||
List<Instance> result = metadataSelector.select(namingContext).getResult();
|
||||
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(ins1, result.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewMetadataSelector2() {
|
||||
Instance ins1 = new Instance();
|
||||
ins1.addMetadata("a", "1");
|
||||
ins1.addMetadata("c", "3");
|
||||
Instance ins2 = new Instance();
|
||||
ins2.addMetadata("b", "2");
|
||||
Instance ins3 = new Instance();
|
||||
ins3.addMetadata("c", "3");
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
NamingSelector metadataSelector = NamingSelectorFactory.newMetadataSelector(new HashMap() {
|
||||
{
|
||||
put("a", "1");
|
||||
put("b", "2");
|
||||
}
|
||||
}, true);
|
||||
List<Instance> result = metadataSelector.select(namingContext).getResult();
|
||||
|
||||
assertEquals(2, result.size());
|
||||
assertEquals(ins1, result.get(0));
|
||||
assertEquals(ins2, result.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHealthSelector() {
|
||||
Instance ins1 = new Instance();
|
||||
Instance ins2 = new Instance();
|
||||
Instance ins3 = new Instance();
|
||||
ins3.setHealthy(false);
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
List<Instance> result = NamingSelectorFactory.HEALTHY_SELECTOR.select(namingContext).getResult();
|
||||
|
||||
assertEquals(2, result.size());
|
||||
assertTrue(result.contains(ins1));
|
||||
assertTrue(result.contains(ins2));
|
||||
assertTrue(result.get(0).isHealthy());
|
||||
assertTrue(result.get(1).isHealthy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptySelector() {
|
||||
Instance ins1 = new Instance();
|
||||
Instance ins2 = new Instance();
|
||||
Instance ins3 = new Instance();
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
List<Instance> result = NamingSelectorFactory.EMPTY_SELECTOR.select(namingContext).getResult();
|
||||
|
||||
assertEquals(3, result.size());
|
||||
assertTrue(result.contains(ins1));
|
||||
assertTrue(result.contains(ins2));
|
||||
assertTrue(result.contains(ins3));
|
||||
}
|
||||
}
|
@ -1,66 +0,0 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.client.selector;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingContext;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingResult;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class SelectorFactoryTest {
|
||||
|
||||
@Test
|
||||
public void testNewClusterSelector1() {
|
||||
Instance ins1 = new Instance();
|
||||
ins1.setClusterName("a");
|
||||
Instance ins2 = new Instance();
|
||||
ins2.setClusterName("b");
|
||||
Instance ins3 = new Instance();
|
||||
ins3.setClusterName("c");
|
||||
|
||||
NamingContext namingContext = mock(NamingContext.class);
|
||||
when(namingContext.getInstances()).thenReturn(Arrays.asList(ins1, ins2, ins3));
|
||||
|
||||
NamingSelector namingSelector1 = SelectorFactory.newClusterSelector(Collections.singletonList("a"));
|
||||
NamingResult result1 = namingSelector1.select(namingContext);
|
||||
assertEquals("a", result1.getResult().get(0).getClusterName());
|
||||
|
||||
NamingSelector namingSelector2 = SelectorFactory.newClusterSelector(Collections.emptyList());
|
||||
NamingResult result2 = namingSelector2.select(namingContext);
|
||||
assertEquals(3, result2.getResult().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewClusterSelector2() {
|
||||
NamingSelector namingSelector1 = SelectorFactory.newClusterSelector(Arrays.asList("a", "b", "c"));
|
||||
NamingSelector namingSelector2 = SelectorFactory.newClusterSelector(Arrays.asList("c", "b", "a"));
|
||||
NamingSelector namingSelector3 = SelectorFactory.newClusterSelector(Arrays.asList("a", "b", "c", "c"));
|
||||
NamingSelector namingSelector4 = SelectorFactory.newClusterSelector(Arrays.asList("d", "e"));
|
||||
assertEquals(namingSelector1, namingSelector2);
|
||||
assertEquals(namingSelector1, namingSelector3);
|
||||
assertNotEquals(namingSelector1, namingSelector4);
|
||||
}
|
||||
}
|
@ -0,0 +1,167 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.test.naming;
|
||||
|
||||
import com.alibaba.nacos.Nacos;
|
||||
import com.alibaba.nacos.api.naming.NamingFactory;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.listener.Event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.DefaultNamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.NamingSelectorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author lideyou
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = Nacos.class, properties = {
|
||||
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
public class SubscribeSelector_ITCase extends NamingBase {
|
||||
|
||||
private NamingService naming;
|
||||
|
||||
private NamingSelector selector = new DefaultNamingSelector(instance -> instance.getIp().startsWith("172.18.137"));
|
||||
|
||||
@LocalServerPort
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
instances.clear();
|
||||
if (naming == null) {
|
||||
naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
|
||||
}
|
||||
}
|
||||
|
||||
private volatile List<Instance> instances = Collections.emptyList();
|
||||
|
||||
/**
|
||||
* Add IP and receive notification.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 10000L)
|
||||
public void subscribeAdd() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
naming.subscribe(serviceName, selector, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
naming.registerInstance(serviceName, "172.18.137.1", TEST_PORT);
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete IP and receive notification.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 10000L)
|
||||
public void subscribeDelete() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
naming.registerInstance(serviceName, "172.18.137.1", TEST_PORT, "c1");
|
||||
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
|
||||
naming.subscribe(serviceName, selector, new EventListener() {
|
||||
int index = 0;
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
if (index == 0) {
|
||||
index++;
|
||||
return;
|
||||
}
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
}
|
||||
});
|
||||
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
naming.deregisterInstance(serviceName, "172.18.137.1", TEST_PORT, "c1");
|
||||
|
||||
while (!instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(instances.isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add non target IP and do not receive notification.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void subscribeOtherIp() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
naming.subscribe(serviceName, selector, new EventListener() {
|
||||
int index = 0;
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
if (index == 0) {
|
||||
index++;
|
||||
return;
|
||||
}
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
}
|
||||
});
|
||||
|
||||
naming.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
int i = 0;
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
if (i++ > 10) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.test.naming;
|
||||
|
||||
import com.alibaba.nacos.Nacos;
|
||||
@ -22,11 +23,12 @@ import com.alibaba.nacos.api.naming.listener.Event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.client.naming.listener.AbstractNamingChangeListener;
|
||||
import com.alibaba.nacos.client.naming.listener.NamingChangeEvent;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.test.base.Params;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -42,6 +44,9 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Created by wangtong.wt on 2018/6/20.
|
||||
@ -50,14 +55,15 @@ import java.util.concurrent.TimeUnit;
|
||||
* @date 2018/6/20
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = Nacos.class, properties = {"server.servlet.context-path=/nacos"},
|
||||
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
@SpringBootTest(classes = Nacos.class, properties = {
|
||||
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
public class Subscribe_ITCase extends NamingBase {
|
||||
|
||||
|
||||
private NamingService naming;
|
||||
|
||||
@LocalServerPort
|
||||
private int port;
|
||||
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
instances.clear();
|
||||
@ -71,9 +77,9 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
String url = String.format("http://localhost:%d/", port);
|
||||
this.base = new URL(url);
|
||||
}
|
||||
|
||||
|
||||
private volatile List<Instance> instances = Collections.emptyList();
|
||||
|
||||
|
||||
/**
|
||||
* 添加IP,收到通知
|
||||
*
|
||||
@ -82,7 +88,7 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
@Test(timeout = 4 * TIME_OUT)
|
||||
public void subscribeAdd() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
@ -91,16 +97,16 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除IP,收到通知
|
||||
*
|
||||
@ -110,12 +116,12 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
public void subscribeDelete() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
|
||||
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
int index = 0;
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
if (index == 0) {
|
||||
@ -127,18 +133,18 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
|
||||
naming.deregisterInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (!instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(instances.isEmpty());
|
||||
|
||||
assertTrue(instances.isEmpty());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 添加不可用IP,收到通知
|
||||
*
|
||||
@ -147,7 +153,7 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
@Test(timeout = 4 * TIME_OUT)
|
||||
public void subscribeUnhealthy() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
@ -156,21 +162,21 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
}
|
||||
|
||||
@Test(timeout = 4 * TIME_OUT)
|
||||
public void subscribeEmpty() throws Exception {
|
||||
|
||||
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
@ -179,32 +185,32 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
|
||||
assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
naming.deregisterInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (!instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
|
||||
Assert.assertEquals(0, instances.size());
|
||||
Assert.assertEquals(0, naming.getAllInstances(serviceName).size());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void querySubscribers() throws Exception {
|
||||
|
||||
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
EventListener listener = new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
@ -213,30 +219,25 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
naming.subscribe(serviceName, listener);
|
||||
|
||||
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
|
||||
|
||||
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/subscribers",
|
||||
Params.newParams()
|
||||
.appendParam("serviceName", serviceName)
|
||||
.appendParam("pageNo", "1")
|
||||
.appendParam("pageSize", "10")
|
||||
.done(),
|
||||
String.class,
|
||||
HttpMethod.GET);
|
||||
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
|
||||
|
||||
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1")
|
||||
.appendParam("pageSize", "10").done(), String.class, HttpMethod.GET);
|
||||
assertTrue(response.getStatusCode().is2xxSuccessful());
|
||||
|
||||
JsonNode body = JacksonUtils.toObj(response.getBody());
|
||||
|
||||
|
||||
Assert.assertEquals(1, body.get("subscribers").size());
|
||||
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("namingRequestTimeout", "300000");
|
||||
properties.setProperty("serverAddr", "127.0.0.1" + ":" + port);
|
||||
NamingService naming2 = NamingFactory.createNamingService(properties);
|
||||
|
||||
|
||||
naming2.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
@ -245,21 +246,16 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
|
||||
|
||||
response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/subscribers",
|
||||
Params.newParams()
|
||||
.appendParam("serviceName", serviceName)
|
||||
.appendParam("pageNo", "1")
|
||||
.appendParam("pageSize", "10")
|
||||
.done(),
|
||||
String.class,
|
||||
HttpMethod.GET);
|
||||
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
|
||||
|
||||
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1")
|
||||
.appendParam("pageSize", "10").done(), String.class, HttpMethod.GET);
|
||||
assertTrue(response.getStatusCode().is2xxSuccessful());
|
||||
|
||||
body = JacksonUtils.toObj(response.getBody());
|
||||
|
||||
|
||||
// server will remove duplicate subscriber by ip port service app and so on
|
||||
Assert.assertEquals(1, body.get("subscribers").size());
|
||||
}
|
||||
@ -294,7 +290,7 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
concurrentHashSet1.addAll(((NamingEvent) event).getInstances());
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
naming1.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
@ -302,11 +298,73 @@ public class Subscribe_ITCase extends NamingBase {
|
||||
}
|
||||
|
||||
try {
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName)));
|
||||
assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName)));
|
||||
Assert.assertEquals(0, concurrentHashSet1.size());
|
||||
} finally {
|
||||
naming1.shutDown();
|
||||
naming2.shutDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeUsingAbstractNamingChangeListener() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
naming.subscribe(serviceName, new AbstractNamingChangeListener() {
|
||||
@Override
|
||||
public void onChange(NamingChangeEvent event) {
|
||||
System.out.println(event.getServiceName());
|
||||
System.out.println(event.getInstances());
|
||||
instances = event.getInstances();
|
||||
assertTrue(event.isAdded());
|
||||
}
|
||||
});
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListenerFirstCallback() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
count.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
naming.subscribe(serviceName, new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
count.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
int i = 0;
|
||||
while (count.get() < 2) {
|
||||
Thread.sleep(1000L);
|
||||
if (i++ > 10) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.test.naming;
|
||||
|
||||
import com.alibaba.nacos.Nacos;
|
||||
@ -22,7 +23,8 @@ import com.alibaba.nacos.api.naming.listener.Event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
import com.alibaba.nacos.api.naming.selector.NamingSelector;
|
||||
import com.alibaba.nacos.client.naming.selector.DefaultNamingSelector;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -35,7 +37,9 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static com.alibaba.nacos.test.naming.NamingBase.*;
|
||||
import static com.alibaba.nacos.test.naming.NamingBase.TEST_PORT;
|
||||
import static com.alibaba.nacos.test.naming.NamingBase.randomDomainName;
|
||||
import static com.alibaba.nacos.test.naming.NamingBase.verifyInstanceList;
|
||||
|
||||
/**
|
||||
* Created by wangtong.wt on 2018/6/20.
|
||||
@ -44,57 +48,59 @@ import static com.alibaba.nacos.test.naming.NamingBase.*;
|
||||
* @date 2018/6/20
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = Nacos.class, properties = {"server.servlet.context-path=/nacos"},
|
||||
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
@SpringBootTest(classes = Nacos.class, properties = {
|
||||
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
public class Unsubscribe_ITCase {
|
||||
|
||||
|
||||
private NamingService naming;
|
||||
|
||||
@LocalServerPort
|
||||
private int port;
|
||||
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
instances = Collections.emptyList();
|
||||
if (naming == null) {
|
||||
//TimeUnit.SECONDS.sleep(10);
|
||||
naming = NamingFactory.createNamingService("127.0.0.1"+":"+port);
|
||||
naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private volatile List<Instance> instances = Collections.emptyList();
|
||||
|
||||
|
||||
/**
|
||||
* 取消订阅,添加IP,不会收到通知
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void unsubscribe() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
EventListener listener = new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent)event).getServiceName());
|
||||
System.out.println(((NamingEvent)event).getInstances());
|
||||
instances = ((NamingEvent)event).getInstances();
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
naming.subscribe(serviceName, listener);
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
|
||||
naming.unsubscribe(serviceName, listener);
|
||||
|
||||
|
||||
instances = Collections.emptyList();
|
||||
naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT, "c1");
|
||||
|
||||
|
||||
int i = 0;
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
@ -102,42 +108,43 @@ public class Unsubscribe_ITCase {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 取消订阅,在指定cluster添加IP,不会收到通知
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void unsubscribeCluster() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
|
||||
EventListener listener = new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent)event).getServiceName());
|
||||
System.out.println(((NamingEvent)event).getInstances());
|
||||
instances = ((NamingEvent)event).getInstances();
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
naming.subscribe(serviceName, Arrays.asList("c1"), listener);
|
||||
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
|
||||
naming.unsubscribe(serviceName, Arrays.asList("c1"), listener);
|
||||
|
||||
|
||||
instances = Collections.emptyList();
|
||||
naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT, "c1");
|
||||
|
||||
|
||||
int i = 0;
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
@ -145,8 +152,54 @@ public class Unsubscribe_ITCase {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 取消订阅,添加选择器范围 IP,不会收到通知
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void unsubscribeSelector() throws Exception {
|
||||
String serviceName = randomDomainName();
|
||||
|
||||
EventListener listener = new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
System.out.println(((NamingEvent) event).getServiceName());
|
||||
System.out.println(((NamingEvent) event).getInstances());
|
||||
instances = ((NamingEvent) event).getInstances();
|
||||
}
|
||||
};
|
||||
|
||||
NamingSelector selector = new DefaultNamingSelector(instance -> instance.getIp().startsWith("127.0.0"));
|
||||
|
||||
naming.subscribe(serviceName, selector, listener);
|
||||
|
||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT);
|
||||
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyInstanceList(instances, naming.getAllInstances(serviceName)));
|
||||
|
||||
naming.unsubscribe(serviceName, selector, listener);
|
||||
|
||||
instances = Collections.emptyList();
|
||||
naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT);
|
||||
|
||||
int i = 0;
|
||||
while (instances.isEmpty()) {
|
||||
Thread.sleep(1000L);
|
||||
if (i++ > 10) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user