[ISSUE #8428] fix naming subscribe bug when multiple NamingService (#8433)

This commit is contained in:
liqipeng 2022-05-27 14:05:05 +08:00 committed by GitHub
parent e9094835cb
commit f7a28105b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 138 additions and 22 deletions

View File

@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/**
* Nacos Naming Service.
@ -71,6 +72,8 @@ public class NacosNamingService implements NamingService {
private NamingClientProxy clientProxy;
private String notifierEventScope;
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
@ -87,11 +90,12 @@ public class NacosNamingService implements NamingService {
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, properties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}

View File

@ -66,7 +66,9 @@ public class ServiceInfoHolder implements Closeable {
private String cacheDir;
public ServiceInfoHolder(String namespace, Properties properties) {
private String notifierEventScope;
public ServiceInfoHolder(String namespace, String notifierEventScope, Properties properties) {
initCacheDir(namespace, properties);
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
@ -75,6 +77,7 @@ public class ServiceInfoHolder implements Closeable {
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
this.notifierEventScope = notifierEventScope;
}
private void initCacheDir(String namespace, Properties properties) {
@ -165,7 +168,7 @@ public class ServiceInfoHolder implements Closeable {
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}

View File

@ -31,6 +31,8 @@ public class InstancesChangeEvent extends Event {
private static final long serialVersionUID = -8823087028212249603L;
private final String eventScope;
private final String serviceName;
private final String groupName;
@ -39,7 +41,8 @@ public class InstancesChangeEvent extends Event {
private final List<Instance> hosts;
public InstancesChangeEvent(String serviceName, String groupName, String clusters, List<Instance> hosts) {
public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List<Instance> hosts) {
this.eventScope = eventScope;
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
@ -62,4 +65,8 @@ public class InstancesChangeEvent extends Event {
return hosts;
}
@Override
public String scope() {
return this.eventScope;
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
@ -29,6 +30,7 @@ import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -39,10 +41,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
private final String eventScope;
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
private final Object lock = new Object();
@JustForTest
public InstancesChangeNotifier() {
this.eventScope = UUID.randomUUID().toString();
}
public InstancesChangeNotifier(String eventScope) {
this.eventScope = eventScope;
}
/**
* register listener.
*
@ -137,4 +150,8 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
return InstancesChangeEvent.class;
}
@Override
public boolean scopeMatches(InstancesChangeEvent event) {
return this.eventScope.equals(event.scope());
}
}

View File

@ -35,11 +35,13 @@ import java.util.concurrent.ScheduledExecutorService;
public class ServiceInfoHolderTest {
@Test
public void testGetServiceInfoMap() {
public void testGetServiceInfoMap() throws NoSuchFieldException, IllegalAccessException {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
Assert.assertEquals(0, holder.getServiceInfoMap().size());
Field fieldNotifierEventScope = ServiceInfoHolder.class.getDeclaredField("notifierEventScope");
fieldNotifierEventScope.setAccessible(true);
Assert.assertEquals("scope-001", fieldNotifierEventScope.get(holder));
}
@Test
@ -53,7 +55,7 @@ public class ServiceInfoHolderTest {
info.setHosts(hosts);
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
ServiceInfo actual1 = holder.processServiceInfo(info);
Assert.assertEquals(info, actual1);
@ -81,7 +83,7 @@ public class ServiceInfoHolderTest {
@Test
public void testProcessServiceInfo2() {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
String json = "{\"groupName\":\"a\",\"name\":\"b\",\"clusters\":\"c\"}";
ServiceInfo actual = holder.processServiceInfo(json);
@ -102,7 +104,7 @@ public class ServiceInfoHolderTest {
Properties prop = new Properties();
prop.setProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION, "true");
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
holder.processServiceInfo(oldInfo);
ServiceInfo newInfo = new ServiceInfo("a@@b@@c");
@ -122,7 +124,7 @@ public class ServiceInfoHolderTest {
info.setHosts(hosts);
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
ServiceInfo expect = holder.processServiceInfo(info);
String serviceName = "b";
@ -137,7 +139,7 @@ public class ServiceInfoHolderTest {
@Test
public void testShutdown() throws NacosException, NoSuchFieldException, IllegalAccessException {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
Field field = ServiceInfoHolder.class.getDeclaredField("failoverReactor");
field.setAccessible(true);
FailoverReactor reactor = (FailoverReactor) field.get(holder);

View File

@ -27,13 +27,15 @@ public class InstancesChangeEventTest {
@Test
public void testGetServiceName() {
String eventScope = "scope-001";
String serviceName = "a";
String groupName = "b";
String clusters = "c";
List<Instance> hosts = new ArrayList<>();
Instance ins = new Instance();
hosts.add(ins);
InstancesChangeEvent event = new InstancesChangeEvent(serviceName, groupName, clusters, hosts);
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, serviceName, groupName, clusters, hosts);
Assert.assertEquals(eventScope, event.scope());
Assert.assertEquals(serviceName, event.getServiceName());
Assert.assertEquals(clusters, event.getClusters());
Assert.assertEquals(groupName, event.getGroupName());

View File

@ -17,11 +17,13 @@
package com.alibaba.nacos.client.naming.event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
@ -31,10 +33,11 @@ public class InstancesChangeNotifierTest {
@Test
public void testRegisterListener() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
instancesChangeNotifier.registerListener(group, name, clusters, listener);
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
@ -42,15 +45,21 @@ public class InstancesChangeNotifierTest {
Assert.assertEquals(group, subscribeServices.get(0).getGroupName());
Assert.assertEquals(name, subscribeServices.get(0).getName());
Assert.assertEquals(clusters, subscribeServices.get(0).getClusters());
List<Instance> hosts = new ArrayList<>();
Instance ins = new Instance();
hosts.add(ins);
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusters, hosts);
Assert.assertEquals(true, instancesChangeNotifier.scopeMatches(event));
}
@Test
public void testDeregisterListener() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
instancesChangeNotifier.registerListener(group, name, clusters, listener);
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
@ -64,10 +73,11 @@ public class InstancesChangeNotifierTest {
@Test
public void testIsSubscribed() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name, clusters));
@ -77,10 +87,11 @@ public class InstancesChangeNotifierTest {
@Test
public void testOnEvent() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
instancesChangeNotifier.registerListener(group, name, clusters, listener);
@ -95,7 +106,8 @@ public class InstancesChangeNotifierTest {
@Test
public void testSubscribeType() {
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
String eventScope = "scope-001";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
Assert.assertEquals(InstancesChangeEvent.class, instancesChangeNotifier.subscribeType());
}
}

View File

@ -180,6 +180,10 @@ public class DefaultPublisher extends Thread implements EventPublisher {
// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",

View File

@ -43,5 +43,13 @@ public abstract class Event implements Serializable {
return sequence;
}
/**
* Event scope.
*
* @return event scope, return null if for all scope
*/
public String scope() {
return null;
}
}

View File

@ -60,4 +60,15 @@ public abstract class Subscriber<T extends Event> {
public boolean ignoreExpireEvent() {
return false;
}
/**
* Whether the event's scope matches current subscriber. Default implementation is all scopes matched.
* If you override this method, it better to override related {@link com.alibaba.nacos.common.notify.Event#scope()}.
*
* @param event {@link Event}
* @return Whether the event's scope matches current subscriber
*/
public boolean scopeMatches(T event) {
return event.scope() == null;
}
}

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.test.base.Params;
import com.fasterxml.jackson.databind.JsonNode;
@ -262,5 +263,50 @@ public class Subscribe_ITCase extends NamingBase {
// server will remove duplicate subscriber by ip port service app and so on
Assert.assertEquals(1, body.get("subscribers").size());
}
@Test
public void subscribeSameServiceForTwoNamingService() throws Exception {
Properties properties1 = new Properties();
properties1.setProperty("serverAddr", "127.0.0.1" + ":" + port);
properties1.setProperty("namespace", "ns-001");
final NamingService naming1 = NamingFactory.createNamingService(properties1);
Properties properties2 = new Properties();
properties2.setProperty("serverAddr", "127.0.0.1" + ":" + port);
properties2.setProperty("namespace", "ns-002");
final NamingService naming2 = NamingFactory.createNamingService(properties2);
final ConcurrentHashSet<Instance> concurrentHashSet1 = new ConcurrentHashSet();
final String serviceName = randomDomainName();
naming1.subscribe(serviceName, new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println("Event from naming1: " + ((NamingEvent) event).getServiceName());
System.out.println("Event from naming1: " + ((NamingEvent) event).getInstances());
instances = ((NamingEvent) event).getInstances();
}
});
naming2.subscribe(serviceName, new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println("Event from naming2: " + ((NamingEvent) event).getServiceName());
System.out.println("Event from naming2: " + ((NamingEvent) event).getInstances());
concurrentHashSet1.addAll(((NamingEvent) event).getInstances());
}
});
naming1.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");
while (instances.isEmpty()) {
Thread.sleep(1000L);
}
try {
Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName)));
Assert.assertEquals(0, concurrentHashSet1.size());
} finally {
naming1.shutDown();
naming2.shutDown();
}
}
}