diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index 2929e2629..644d2e422 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.UUID; /** * Nacos Naming Service. @@ -71,6 +72,8 @@ public class NacosNamingService implements NamingService { private NamingClientProxy clientProxy; + private String notifierEventScope; + public NacosNamingService(String serverList) throws NacosException { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList); @@ -87,11 +90,12 @@ public class NacosNamingService implements NamingService { InitUtils.initSerialization(); InitUtils.initWebRootContext(properties); initLogName(properties); - - this.changeNotifier = new InstancesChangeNotifier(); + + this.notifierEventScope = UUID.randomUUID().toString(); + this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(changeNotifier); - this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties); + this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, properties); this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java index 1dae55147..ebc34b0db 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java @@ -66,7 +66,9 @@ public class ServiceInfoHolder implements Closeable { private String cacheDir; - public ServiceInfoHolder(String namespace, Properties properties) { + private String notifierEventScope; + + public ServiceInfoHolder(String namespace, String notifierEventScope, Properties properties) { initCacheDir(namespace, properties); if (isLoadCacheAtStart(properties)) { this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir)); @@ -75,6 +77,7 @@ public class ServiceInfoHolder implements Closeable { } this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushEmptyProtection = isPushEmptyProtect(properties); + this.notifierEventScope = notifierEventScope; } private void initCacheDir(String namespace, Properties properties) { @@ -165,7 +168,7 @@ public class ServiceInfoHolder implements Closeable { if (changed) { NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts())); - NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), + NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java index 170e07bd8..87cd89538 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java @@ -31,6 +31,8 @@ public class InstancesChangeEvent extends Event { private static final long serialVersionUID = -8823087028212249603L; + private final String eventScope; + private final String serviceName; private final String groupName; @@ -39,7 +41,8 @@ public class InstancesChangeEvent extends Event { private final List hosts; - public InstancesChangeEvent(String serviceName, String groupName, String clusters, List hosts) { + public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List hosts) { + this.eventScope = eventScope; this.serviceName = serviceName; this.groupName = groupName; this.clusters = clusters; @@ -62,4 +65,8 @@ public class InstancesChangeEvent extends Event { return hosts; } + @Override + public String scope() { + return this.eventScope; + } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java index 10167a1c1..6211d82f0 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.common.JustForTest; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.utils.CollectionUtils; @@ -29,6 +30,7 @@ import com.alibaba.nacos.common.utils.ConcurrentHashSet; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** @@ -39,10 +41,21 @@ import java.util.concurrent.ConcurrentHashMap; */ public class InstancesChangeNotifier extends Subscriber { + private final String eventScope; + private final Map> listenerMap = new ConcurrentHashMap>(); private final Object lock = new Object(); + @JustForTest + public InstancesChangeNotifier() { + this.eventScope = UUID.randomUUID().toString(); + } + + public InstancesChangeNotifier(String eventScope) { + this.eventScope = eventScope; + } + /** * register listener. * @@ -137,4 +150,8 @@ public class InstancesChangeNotifier extends Subscriber { return InstancesChangeEvent.class; } + @Override + public boolean scopeMatches(InstancesChangeEvent event) { + return this.eventScope.equals(event.scope()); + } } diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java index 7d02749a6..5e9497214 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java @@ -35,11 +35,13 @@ import java.util.concurrent.ScheduledExecutorService; public class ServiceInfoHolderTest { @Test - public void testGetServiceInfoMap() { + public void testGetServiceInfoMap() throws NoSuchFieldException, IllegalAccessException { Properties prop = new Properties(); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); Assert.assertEquals(0, holder.getServiceInfoMap().size()); - + Field fieldNotifierEventScope = ServiceInfoHolder.class.getDeclaredField("notifierEventScope"); + fieldNotifierEventScope.setAccessible(true); + Assert.assertEquals("scope-001", fieldNotifierEventScope.get(holder)); } @Test @@ -53,7 +55,7 @@ public class ServiceInfoHolderTest { info.setHosts(hosts); Properties prop = new Properties(); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); ServiceInfo actual1 = holder.processServiceInfo(info); Assert.assertEquals(info, actual1); @@ -81,7 +83,7 @@ public class ServiceInfoHolderTest { @Test public void testProcessServiceInfo2() { Properties prop = new Properties(); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); String json = "{\"groupName\":\"a\",\"name\":\"b\",\"clusters\":\"c\"}"; ServiceInfo actual = holder.processServiceInfo(json); @@ -102,7 +104,7 @@ public class ServiceInfoHolderTest { Properties prop = new Properties(); prop.setProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION, "true"); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); holder.processServiceInfo(oldInfo); ServiceInfo newInfo = new ServiceInfo("a@@b@@c"); @@ -122,7 +124,7 @@ public class ServiceInfoHolderTest { info.setHosts(hosts); Properties prop = new Properties(); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); ServiceInfo expect = holder.processServiceInfo(info); String serviceName = "b"; @@ -137,7 +139,7 @@ public class ServiceInfoHolderTest { @Test public void testShutdown() throws NacosException, NoSuchFieldException, IllegalAccessException { Properties prop = new Properties(); - ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop); + ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop); Field field = ServiceInfoHolder.class.getDeclaredField("failoverReactor"); field.setAccessible(true); FailoverReactor reactor = (FailoverReactor) field.get(holder); diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeEventTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeEventTest.java index c4c6d78cb..17cdfff75 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeEventTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeEventTest.java @@ -27,13 +27,15 @@ public class InstancesChangeEventTest { @Test public void testGetServiceName() { + String eventScope = "scope-001"; String serviceName = "a"; String groupName = "b"; String clusters = "c"; List hosts = new ArrayList<>(); Instance ins = new Instance(); hosts.add(ins); - InstancesChangeEvent event = new InstancesChangeEvent(serviceName, groupName, clusters, hosts); + InstancesChangeEvent event = new InstancesChangeEvent(eventScope, serviceName, groupName, clusters, hosts); + Assert.assertEquals(eventScope, event.scope()); Assert.assertEquals(serviceName, event.getServiceName()); Assert.assertEquals(clusters, event.getClusters()); Assert.assertEquals(groupName, event.getGroupName()); diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java index 79e8bcbc9..7f20b3af6 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifierTest.java @@ -17,11 +17,13 @@ package com.alibaba.nacos.client.naming.event; import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; import java.util.List; import static org.mockito.ArgumentMatchers.any; @@ -31,10 +33,11 @@ public class InstancesChangeNotifierTest { @Test public void testRegisterListener() { + String eventScope = "scope-001"; String group = "a"; String name = "b"; String clusters = "c"; - InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(); + InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); instancesChangeNotifier.registerListener(group, name, clusters, listener); List subscribeServices = instancesChangeNotifier.getSubscribeServices(); @@ -42,15 +45,21 @@ public class InstancesChangeNotifierTest { Assert.assertEquals(group, subscribeServices.get(0).getGroupName()); Assert.assertEquals(name, subscribeServices.get(0).getName()); Assert.assertEquals(clusters, subscribeServices.get(0).getClusters()); - + + List hosts = new ArrayList<>(); + Instance ins = new Instance(); + hosts.add(ins); + InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusters, hosts); + Assert.assertEquals(true, instancesChangeNotifier.scopeMatches(event)); } @Test public void testDeregisterListener() { + String eventScope = "scope-001"; String group = "a"; String name = "b"; String clusters = "c"; - InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(); + InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); instancesChangeNotifier.registerListener(group, name, clusters, listener); List subscribeServices = instancesChangeNotifier.getSubscribeServices(); @@ -64,10 +73,11 @@ public class InstancesChangeNotifierTest { @Test public void testIsSubscribed() { + String eventScope = "scope-001"; String group = "a"; String name = "b"; String clusters = "c"; - InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(); + InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name, clusters)); @@ -77,10 +87,11 @@ public class InstancesChangeNotifierTest { @Test public void testOnEvent() { + String eventScope = "scope-001"; String group = "a"; String name = "b"; String clusters = "c"; - InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(); + InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); EventListener listener = Mockito.mock(EventListener.class); instancesChangeNotifier.registerListener(group, name, clusters, listener); @@ -95,7 +106,8 @@ public class InstancesChangeNotifierTest { @Test public void testSubscribeType() { - InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(); + String eventScope = "scope-001"; + InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope); Assert.assertEquals(InstancesChangeEvent.class, instancesChangeNotifier.subscribeType()); } } \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index dd25a9488..389f2e2e8 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -180,6 +180,10 @@ public class DefaultPublisher extends Thread implements EventPublisher { // Notification single event listener for (Subscriber subscriber : subscribers) { + if (!subscriber.scopeMatches(event)) { + continue; + } + // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java index e5dc694f8..408b7a84b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -43,5 +43,13 @@ public abstract class Event implements Serializable { return sequence; } + /** + * Event scope. + * + * @return event scope, return null if for all scope + */ + public String scope() { + return null; + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java index 65a41d59e..2e6500b55 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java @@ -60,4 +60,15 @@ public abstract class Subscriber { public boolean ignoreExpireEvent() { return false; } + + /** + * Whether the event's scope matches current subscriber. Default implementation is all scopes matched. + * If you override this method, it better to override related {@link com.alibaba.nacos.common.notify.Event#scope()}. + * + * @param event {@link Event} + * @return Whether the event's scope matches current subscriber + */ + public boolean scopeMatches(T event) { + return event.scope() == null; + } } diff --git a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java index 85d97267f..e97eb66f0 100644 --- a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java +++ b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java @@ -22,6 +22,7 @@ import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.test.base.Params; import com.fasterxml.jackson.databind.JsonNode; @@ -262,5 +263,50 @@ public class Subscribe_ITCase extends NamingBase { // server will remove duplicate subscriber by ip port service app and so on Assert.assertEquals(1, body.get("subscribers").size()); } - + + @Test + public void subscribeSameServiceForTwoNamingService() throws Exception { + Properties properties1 = new Properties(); + properties1.setProperty("serverAddr", "127.0.0.1" + ":" + port); + properties1.setProperty("namespace", "ns-001"); + final NamingService naming1 = NamingFactory.createNamingService(properties1); + Properties properties2 = new Properties(); + properties2.setProperty("serverAddr", "127.0.0.1" + ":" + port); + properties2.setProperty("namespace", "ns-002"); + final NamingService naming2 = NamingFactory.createNamingService(properties2); + + final ConcurrentHashSet concurrentHashSet1 = new ConcurrentHashSet(); + final String serviceName = randomDomainName(); + + naming1.subscribe(serviceName, new EventListener() { + @Override + public void onEvent(Event event) { + System.out.println("Event from naming1: " + ((NamingEvent) event).getServiceName()); + System.out.println("Event from naming1: " + ((NamingEvent) event).getInstances()); + instances = ((NamingEvent) event).getInstances(); + } + }); + naming2.subscribe(serviceName, new EventListener() { + @Override + public void onEvent(Event event) { + System.out.println("Event from naming2: " + ((NamingEvent) event).getServiceName()); + System.out.println("Event from naming2: " + ((NamingEvent) event).getInstances()); + concurrentHashSet1.addAll(((NamingEvent) event).getInstances()); + } + }); + + naming1.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1"); + + while (instances.isEmpty()) { + Thread.sleep(1000L); + } + + try { + Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName))); + Assert.assertEquals(0, concurrentHashSet1.size()); + } finally { + naming1.shutDown(); + naming2.shutDown(); + } + } }