Refactor SubscribeManager with NamingSubscriberServiceAggregationImpl (#4578)

This commit is contained in:
杨翊 SionYang 2020-12-26 15:58:59 +08:00 committed by GitHub
parent 6e34f2886b
commit 9d1fb57794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 292 additions and 116 deletions

View File

@ -26,10 +26,10 @@ import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.ClientInfo; import com.alibaba.nacos.naming.push.v1.ClientInfo;
import com.alibaba.nacos.naming.push.DataSource; import com.alibaba.nacos.naming.push.v1.DataSource;
import com.alibaba.nacos.naming.push.NamingSubscriberServiceV1Impl; import com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.PushClient; import com.alibaba.nacos.naming.push.v1.PushClient;
import com.alibaba.nacos.naming.push.UdpPushService; import com.alibaba.nacos.naming.push.UdpPushService;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;

View File

@ -16,26 +16,15 @@
package com.alibaba.nacos.naming.core; package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.push.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers; import com.alibaba.nacos.naming.push.NamingSubscriberServiceAggregationImpl;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl; import com.alibaba.nacos.naming.push.NamingSubscriberServiceLocalImpl;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -48,35 +37,17 @@ import java.util.stream.Collectors;
* Subscribe manager. * Subscribe manager.
* *
* @author Nicholas * @author Nicholas
* @author xiweng.yy
* @since 1.0.1 * @since 1.0.1
*/ */
@Service @Service
public class SubscribeManager { public class SubscribeManager {
private static final String SUBSCRIBER_ON_SYNC_URL = "/service/subscribers"; @Autowired
private NamingSubscriberServiceLocalImpl localService;
@Autowired @Autowired
private NamingSubscriberServiceV1Impl subscriberServiceV1; private NamingSubscriberServiceAggregationImpl aggregationService;
@Autowired
private NamingSubscriberServiceV2Impl subscriberServiceV2;
@Autowired
private ServerMemberManager memberManager;
private List<Subscriber> getSubscribersFuzzy(String serviceName, String namespaceId) {
List<Subscriber> result = new LinkedList<>();
result.addAll(subscriberServiceV1.getFuzzySubscribers(namespaceId, serviceName));
result.addAll(subscriberServiceV2.getFuzzySubscribers(namespaceId, serviceName));
return result;
}
private List<Subscriber> getSubscribers(String serviceName, String namespaceId) {
List<Subscriber> result = new LinkedList<>();
result.addAll(subscriberServiceV1.getSubscribers(namespaceId, serviceName));
result.addAll(subscriberServiceV2.getSubscribers(namespaceId, serviceName));
return result;
}
/** /**
* Get subscribers. * Get subscribers.
@ -85,44 +56,14 @@ public class SubscribeManager {
* @param namespaceId namespace id * @param namespaceId namespace id
* @param aggregation aggregation * @param aggregation aggregation
* @return list of subscriber * @return list of subscriber
* @throws InterruptedException interrupted exception
*/ */
public List<Subscriber> getSubscribers(String serviceName, String namespaceId, boolean aggregation) public List<Subscriber> getSubscribers(String serviceName, String namespaceId, boolean aggregation) {
throws InterruptedException {
if (aggregation) { if (aggregation) {
// size = 1 means only myself in the list, we need at least one another server alive: Collection<Subscriber> result = aggregationService.getFuzzySubscribers(namespaceId, serviceName);
if (memberManager.getServerList().size() <= 1) { return CollectionUtils.isNotEmpty(result) ? result.stream().filter(distinctByKey(Subscriber::toString))
return getSubscribersFuzzy(serviceName, namespaceId); .collect(Collectors.toList()) : Collections.EMPTY_LIST;
}
List<Subscriber> subscriberList = new ArrayList<Subscriber>();
// try sync data from remote server:
for (Member server : memberManager.allMembers()) {
Map<String, String> paramValues = new HashMap<>(128);
paramValues.put(CommonParams.SERVICE_NAME, serviceName);
paramValues.put(CommonParams.NAMESPACE_ID, namespaceId);
paramValues.put("aggregation", String.valueOf(Boolean.FALSE));
if (NetUtils.localServer().equals(server.getAddress())) {
subscriberList.addAll(getSubscribersFuzzy(serviceName, namespaceId));
continue;
}
RestResult<String> result = HttpClient.httpGet(
"http://" + server.getAddress() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + SUBSCRIBER_ON_SYNC_URL, new ArrayList<>(),
paramValues);
if (result.ok()) {
Subscribers subscribers = JacksonUtils.toObj(result.getData(), Subscribers.class);
subscriberList.addAll(subscribers.getSubscribers());
}
}
return CollectionUtils.isNotEmpty(subscriberList) ? subscriberList.stream()
.filter(distinctByKey(Subscriber::toString)).collect(Collectors.toList()) : Collections.EMPTY_LIST;
} else { } else {
// local server return new LinkedList<>(localService.getFuzzySubscribers(namespaceId, serviceName));
return getSubscribersFuzzy(serviceName, namespaceId);
} }
} }

View File

@ -16,11 +16,23 @@
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl; import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.sys.env.EnvUtil;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
/** /**
* Aggregation naming subscriber service. Aggregate all implementation of {@link NamingSubscriberService} and * Aggregation naming subscriber service. Aggregate all implementation of {@link NamingSubscriberService} and
@ -31,33 +43,70 @@ import java.util.Collection;
@org.springframework.stereotype.Service @org.springframework.stereotype.Service
public class NamingSubscriberServiceAggregationImpl implements NamingSubscriberService { public class NamingSubscriberServiceAggregationImpl implements NamingSubscriberService {
private final NamingSubscriberServiceV1Impl subscriberServiceV1; private static final String SUBSCRIBER_ON_SYNC_URL = "/service/subscribers";
private final NamingSubscriberServiceV2Impl subscriberServiceV2; private final NamingSubscriberServiceLocalImpl subscriberServiceLocal;
public NamingSubscriberServiceAggregationImpl(NamingSubscriberServiceV1Impl subscriberServiceV1, private final ServerMemberManager memberManager;
NamingSubscriberServiceV2Impl subscriberServiceV2) {
this.subscriberServiceV1 = subscriberServiceV1; public NamingSubscriberServiceAggregationImpl(NamingSubscriberServiceLocalImpl subscriberServiceLocal,
this.subscriberServiceV2 = subscriberServiceV2; ServerMemberManager serverMemberManager) {
this.subscriberServiceLocal = subscriberServiceLocal;
this.memberManager = serverMemberManager;
} }
@Override @Override
public Collection<Subscriber> getSubscribers(String namespaceId, String serviceName) { public Collection<Subscriber> getSubscribers(String namespaceId, String serviceName) {
return null; Collection<Subscriber> result = new LinkedList<>(
subscriberServiceLocal.getSubscribers(namespaceId, serviceName));
if (memberManager.getServerList().size() > 1) {
getSubscribersFromRemotes(namespaceId, serviceName, result);
}
return result;
} }
@Override @Override
public Collection<Subscriber> getSubscribers(Service service) { public Collection<Subscriber> getSubscribers(Service service) {
return null; Collection<Subscriber> result = new LinkedList<>(subscriberServiceLocal.getSubscribers(service));
if (memberManager.getServerList().size() > 1) {
getSubscribersFromRemotes(service.getNamespace(), service.getGroupedServiceName(), result);
}
return result;
} }
@Override @Override
public Collection<Subscriber> getFuzzySubscribers(String namespaceId, String serviceName) { public Collection<Subscriber> getFuzzySubscribers(String namespaceId, String serviceName) {
return null; Collection<Subscriber> result = new LinkedList<>(
subscriberServiceLocal.getFuzzySubscribers(namespaceId, serviceName));
if (memberManager.getServerList().size() > 1) {
getSubscribersFromRemotes(namespaceId, serviceName, result);
}
return result;
} }
@Override @Override
public Collection<Subscriber> getFuzzySubscribers(Service service) { public Collection<Subscriber> getFuzzySubscribers(Service service) {
return null; Collection<Subscriber> result = new LinkedList<>(subscriberServiceLocal.getFuzzySubscribers(service));
if (memberManager.getServerList().size() > 1) {
getSubscribersFromRemotes(service.getNamespace(), service.getGroupedServiceName(), result);
}
return result;
}
private void getSubscribersFromRemotes(String namespaceId, String serviceName, Collection<Subscriber> result) {
for (Member server : memberManager.allMembersWithoutSelf()) {
Map<String, String> paramValues = new HashMap<>(128);
paramValues.put(CommonParams.SERVICE_NAME, serviceName);
paramValues.put(CommonParams.NAMESPACE_ID, namespaceId);
paramValues.put("aggregation", String.valueOf(Boolean.FALSE));
// TODO replace with gRPC
RestResult<String> response = HttpClient.httpGet(
"http://" + server.getAddress() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ SUBSCRIBER_ON_SYNC_URL, new ArrayList<>(), paramValues);
if (response.ok()) {
Subscribers subscribers = JacksonUtils.toObj(response.getData(), Subscribers.class);
result.addAll(subscribers.getSubscribers());
}
}
} }
} }

View File

@ -0,0 +1,78 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl;
import java.util.Collection;
import java.util.HashSet;
/**
* Naming subscriber service for local.
*
* @author xiweng.yy
* @deprecated Will be removed with {@link com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl}
*/
@org.springframework.stereotype.Service
@Deprecated
public class NamingSubscriberServiceLocalImpl implements NamingSubscriberService {
private final NamingSubscriberServiceV1Impl namingSubscriberServiceV1;
private final NamingSubscriberServiceV2Impl namingSubscriberServiceV2;
public NamingSubscriberServiceLocalImpl(NamingSubscriberServiceV1Impl namingSubscriberServiceV1,
NamingSubscriberServiceV2Impl namingSubscriberServiceV2) {
this.namingSubscriberServiceV1 = namingSubscriberServiceV1;
this.namingSubscriberServiceV2 = namingSubscriberServiceV2;
}
@Override
public Collection<Subscriber> getSubscribers(String namespaceId, String serviceName) {
Collection<Subscriber> result = new HashSet<>();
result.addAll(namingSubscriberServiceV1.getSubscribers(namespaceId, serviceName));
result.addAll(namingSubscriberServiceV2.getSubscribers(namespaceId, serviceName));
return result;
}
@Override
public Collection<Subscriber> getSubscribers(Service service) {
Collection<Subscriber> result = new HashSet<>();
result.addAll(namingSubscriberServiceV1.getSubscribers(service));
result.addAll(namingSubscriberServiceV2.getSubscribers(service));
return result;
}
@Override
public Collection<Subscriber> getFuzzySubscribers(String namespaceId, String serviceName) {
Collection<Subscriber> result = new HashSet<>();
result.addAll(namingSubscriberServiceV1.getFuzzySubscribers(namespaceId, serviceName));
result.addAll(namingSubscriberServiceV2.getFuzzySubscribers(namespaceId, serviceName));
return result;
}
@Override
public Collection<Subscriber> getFuzzySubscribers(Service service) {
Collection<Subscriber> result = new HashSet<>();
result.addAll(namingSubscriberServiceV1.getFuzzySubscribers(service));
result.addAll(namingSubscriberServiceV2.getFuzzySubscribers(service));
return result;
}
}

View File

@ -26,6 +26,10 @@ import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v1.ClientInfo;
import com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.v1.PushClient;
import com.alibaba.nacos.naming.push.v1.ServiceChangeEvent;
import com.alibaba.nacos.naming.remote.udp.AckEntry; import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.naming.remote.udp.AckPacket; import com.alibaba.nacos.naming.remote.udp.AckPacket;
import com.alibaba.nacos.naming.remote.udp.UdpConnector; import com.alibaba.nacos.naming.remote.udp.UdpConnector;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2020 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2020 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
/** /**
* Data source for naming push. * Data source for naming push.

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
@ -22,6 +22,7 @@ import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.NamingSubscriberService;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -36,8 +37,10 @@ import java.util.concurrent.TimeUnit;
* Naming subscriber service for v1.x. * Naming subscriber service for v1.x.
* *
* @author xiweng.yy * @author xiweng.yy
* @deprecated Will be removed in v2.1.x version
*/ */
@org.springframework.stereotype.Service @org.springframework.stereotype.Service
@Deprecated
public class NamingSubscriberServiceV1Impl implements NamingSubscriberService { public class NamingSubscriberServiceV1Impl implements NamingSubscriberService {
private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.sys.utils.ApplicationUtils;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2020 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.Service;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;

View File

@ -20,48 +20,40 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.NamingSubscriberServiceV1Impl; import com.alibaba.nacos.naming.push.NamingSubscriberServiceAggregationImpl;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl; import com.alibaba.nacos.naming.push.NamingSubscriberServiceLocalImpl;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.springframework.boot.test.context.SpringBootTest; import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockServletContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@SpringBootTest @RunWith(MockitoJUnitRunner.class)
@RunWith(SpringJUnit4ClassRunner.class) public class SubscribeManagerTest {
@ContextConfiguration(classes = MockServletContext.class)
public class SubscribeManagerTest extends BaseTest {
@Mock
private SubscribeManager subscribeManager; private SubscribeManager subscribeManager;
@Mock @Mock
private NamingSubscriberServiceV1Impl namingSubscriberService; private NamingSubscriberServiceAggregationImpl aggregation;
@Mock @Mock
private NamingSubscriberServiceV2Impl namingSubscriberServiceV2; private NamingSubscriberServiceLocalImpl local;
@Mock @Mock
private ServerMemberManager memberManager; private ServerMemberManager memberManager;
@Before @Before
public void before() { public void before() {
super.before();
subscribeManager = new SubscribeManager(); subscribeManager = new SubscribeManager();
ReflectionTestUtils.setField(subscribeManager, "subscriberServiceV1", namingSubscriberService); ReflectionTestUtils.setField(subscribeManager, "aggregationService", aggregation);
ReflectionTestUtils.setField(subscribeManager, "subscriberServiceV2", namingSubscriberServiceV2); ReflectionTestUtils.setField(subscribeManager, "localService", local);
} }
@Test @Test
@ -74,8 +66,7 @@ public class SubscribeManagerTest extends BaseTest {
Subscriber subscriber = new Subscriber("127.0.0.1:8080", "test", "app", "127.0.0.1", namespaceId, Subscriber subscriber = new Subscriber("127.0.0.1:8080", "test", "app", "127.0.0.1", namespaceId,
serviceName, 0); serviceName, 0);
clients.add(subscriber); clients.add(subscriber);
Mockito.when(namingSubscriberService.getFuzzySubscribers(Mockito.anyString(), Mockito.anyString())) Mockito.when(this.local.getFuzzySubscribers(Mockito.anyString(), Mockito.anyString())).thenReturn(clients);
.thenReturn(clients);
List<Subscriber> list = subscribeManager.getSubscribers(serviceName, namespaceId, aggregation); List<Subscriber> list = subscribeManager.getSubscribers(serviceName, namespaceId, aggregation);
Assert.assertNotNull(list); Assert.assertNotNull(list);
Assert.assertEquals(1, list.size()); Assert.assertEquals(1, list.size());
@ -95,7 +86,7 @@ public class SubscribeManagerTest extends BaseTest {
Subscriber subscriber = new Subscriber("127.0.0.1:8080", "test", "app", "127.0.0.1", namespaceId, Subscriber subscriber = new Subscriber("127.0.0.1:8080", "test", "app", "127.0.0.1", namespaceId,
"testGroupName@@test_subscriber", 0); "testGroupName@@test_subscriber", 0);
clients.add(subscriber); clients.add(subscriber);
Mockito.when(namingSubscriberService.getFuzzySubscribers(Mockito.anyString(), Mockito.anyString())) Mockito.when(this.aggregation.getFuzzySubscribers(Mockito.anyString(), Mockito.anyString()))
.thenReturn(clients); .thenReturn(clients);
List<Subscriber> list = subscribeManager.getSubscribers(serviceName, namespaceId, aggregation); List<Subscriber> list = subscribeManager.getSubscribers(serviceName, namespaceId, aggregation);
Assert.assertNotNull(list); Assert.assertNotNull(list);

View File

@ -0,0 +1,110 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NamingSubscriberServiceAggregationImplTest {
private final String namespace = "N";
private final String serviceName = "G@@S";
private final Service service = Service.newService(namespace, "G", "S");
@Mock
private ServerMemberManager memberManager;
@Mock
private NamingSubscriberServiceLocalImpl local;
@Mock
private ConfigurableEnvironment environment;
private HashMap<String, Member> members;
private NamingSubscriberServiceAggregationImpl aggregation;
@Before
public void setUp() throws Exception {
aggregation = new NamingSubscriberServiceAggregationImpl(local, memberManager);
Subscriber subscriber = new Subscriber("local", "", "", "", namespace, serviceName, 0);
when(local.getSubscribers(namespace, serviceName)).thenReturn(Collections.singletonList(subscriber));
when(local.getSubscribers(service)).thenReturn(Collections.singletonList(subscriber));
when(local.getFuzzySubscribers(namespace, serviceName)).thenReturn(Collections.singletonList(subscriber));
when(local.getFuzzySubscribers(service)).thenReturn(Collections.singletonList(subscriber));
members = new HashMap<>();
members.put("1", Mockito.mock(Member.class));
when(memberManager.getServerList()).thenReturn(members);
}
@Test
public void testGetSubscribersByStringWithLocal() {
Collection<Subscriber> actual = aggregation.getSubscribers(namespace, serviceName);
assertEquals(1, actual.size());
assertEquals("local", actual.iterator().next().getAddrStr());
}
@Test
public void testGetSubscribersByStringWithRemote() {
// TODO
}
@Test
public void testGetSubscribersByServiceWithLocal() {
Collection<Subscriber> actual = aggregation.getSubscribers(service);
assertEquals(1, actual.size());
assertEquals("local", actual.iterator().next().getAddrStr());
}
@Test
public void testGetSubscribersByServiceWithRemote() {
// TODO
}
@Test
public void testGetFuzzySubscribersByStringWithLocal() {
Collection<Subscriber> actual = aggregation.getFuzzySubscribers(namespace, serviceName);
assertEquals(1, actual.size());
assertEquals("local", actual.iterator().next().getAddrStr());
}
@Test
public void testGetFuzzySubscribersByServiceWithLocal() {
Collection<Subscriber> actual = aggregation.getFuzzySubscribers(service);
assertEquals(1, actual.size());
assertEquals("local", actual.iterator().next().getAddrStr());
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.junit.Test; import org.junit.Test;

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.push; package com.alibaba.nacos.naming.push.v1;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;