[ISSUE #3386] Refactor HostReactor.java to ServiceInfoHolder and ServiceInfoUpdateService (#3402)

* Refactor HostReactor.java to ServiceInfoHolder and ServiceInfoUpdateService

* Use http client as default implement of some uncompleted api
This commit is contained in:
杨翊 SionYang 2020-07-21 15:02:44 +08:00 committed by GitHub
parent 53031ba08b
commit 115c992c98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 998 additions and 1084 deletions

View File

@ -62,7 +62,7 @@ public class NacosNamingMaintainService implements NamingMaintainService {
InitUtils.initSerialization();
InitUtils.initWebRootContext();
ServerListManager serverListManager = new ServerListManager(properties);
serverProxy = new NamingHttpClientProxy(namespace, serverListManager, properties);
serverProxy = new NamingHttpClientProxy(namespace, serverListManager, properties, null);
}
@Override

View File

@ -26,18 +26,15 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
@ -60,13 +57,11 @@ public class NacosNamingService implements NamingService {
private String logName;
private HostReactor hostReactor;
private ServiceInfoHolder serviceInfoHolder;
private EventDispatcher eventDispatcher;
private NamingHttpClientProxy serverProxy;
private NamingGrpcClientProxy grpcClientProxy;
private NamingClientProxy clientProxy;
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
@ -85,12 +80,10 @@ public class NacosNamingService implements NamingService {
InitUtils.initWebRootContext();
initLogName(properties);
ServerListManager serverListManager = new ServerListManager(properties);
this.eventDispatcher = new EventDispatcher();
this.serverProxy = new NamingHttpClientProxy(this.namespace, serverListManager, properties);
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.namespace, properties);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, hostReactor);
grpcClientProxy.start(serverListManager);
this.serviceInfoHolder = new ServiceInfoHolder(eventDispatcher, namespace, properties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties,
eventDispatcher);
}
private void initLogName(Properties properties) {
@ -124,13 +117,11 @@ public class NacosNamingService implements NamingService {
@Override
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
registerInstance(serviceName, groupName, instance);
}
@ -141,9 +132,7 @@ public class NacosNamingService implements NamingService {
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// serverProxy.registerService(groupedServiceName, groupName, instance);
grpcClientProxy.registerService(serviceName, groupName, instance);
clientProxy.registerService(serviceName, groupName, instance);
}
@Override
@ -168,7 +157,6 @@ public class NacosNamingService implements NamingService {
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
deregisterInstance(serviceName, groupName, instance);
}
@ -179,8 +167,7 @@ public class NacosNamingService implements NamingService {
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
grpcClientProxy.deregisterService(serviceName, groupName, instance);
clientProxy.deregisterService(serviceName, groupName, instance);
}
@Override
@ -227,12 +214,9 @@ public class NacosNamingService implements NamingService {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
} else {
// serviceInfo = hostReactor
// .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
// StringUtils.join(clusters, ","));
serviceInfo = grpcClientProxy
serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
}
List<Instance> list;
@ -288,10 +272,10 @@ public class NacosNamingService implements NamingService {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(serviceName, groupName, StringUtils.join(clusters, ","));
serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
}
return selectInstances(serviceInfo, healthy);
}
@ -356,11 +340,12 @@ public class NacosNamingService implements NamingService {
boolean subscribe) throws NacosException {
if (subscribe) {
return Balancer.RandomByWeight
.selectHost(hostReactor.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ",")));
return Balancer.RandomByWeight.selectHost(
serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ",")));
} else {
return Balancer.RandomByWeight.selectHost(hostReactor
.getServiceInfoDirectlyFromServer(serviceName, groupName, StringUtils.join(clusters, ",")));
ServiceInfo serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
}
@ -382,10 +367,7 @@ public class NacosNamingService implements NamingService {
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
// eventDispatcher.addListener(hostReactor
// .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
// StringUtils.join(clusters, ","), listener);
eventDispatcher.addListener(grpcClientProxy.subscribe(serviceName, groupName, StringUtils.join(clusters, ",")),
eventDispatcher.addListener(clientProxy.subscribe(serviceName, groupName, StringUtils.join(clusters, ",")),
StringUtils.join(clusters, ","), listener);
}
@ -411,13 +393,13 @@ public class NacosNamingService implements NamingService {
String clustersString = StringUtils.join(clusters, ",");
eventDispatcher.removeListener(fullServiceName, clustersString, listener);
if (!eventDispatcher.isSubscribed(fullServiceName, clustersString)) {
grpcClientProxy.unsubscribe(fullServiceName, clustersString);
clientProxy.unsubscribe(serviceName, groupName, clustersString);
}
}
@Override
public ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException {
return serverProxy.getServiceList(pageNo, pageSize, Constants.DEFAULT_GROUP);
return getServicesOfServer(pageNo, pageSize, Constants.DEFAULT_GROUP);
}
@Override
@ -434,7 +416,7 @@ public class NacosNamingService implements NamingService {
@Override
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
return serverProxy.getServiceList(pageNo, pageSize, groupName, selector);
return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
@Override
@ -444,17 +426,13 @@ public class NacosNamingService implements NamingService {
@Override
public String getServerStatus() {
return serverProxy.serverHealthy() ? "UP" : "DOWN";
}
public BeatReactor getBeatReactor() {
return serverProxy.getBeatReactor();
return clientProxy.serverHealthy() ? "UP" : "DOWN";
}
@Override
public void shutDown() throws NacosException {
eventDispatcher.shutdown();
hostReactor.shutdown();
serverProxy.shutdown();
serviceInfoHolder.shutdown();
clientProxy.shutdown();
}
}

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.cache.ConcurrentDiskUtil;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.lifecycle.Closeable;
@ -54,12 +54,12 @@ public class FailoverReactor implements Closeable {
private final String failoverDir;
private final HostReactor hostReactor;
private final ServiceInfoHolder serviceInfoHolder;
private final ScheduledExecutorService executorService;
public FailoverReactor(HostReactor hostReactor, String cacheDir) {
this.hostReactor = hostReactor;
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
this.serviceInfoHolder = serviceInfoHolder;
this.failoverDir = cacheDir + "/failover";
// init executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@ -252,7 +252,7 @@ public class FailoverReactor implements Closeable {
@Override
public void run() {
Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils

View File

@ -0,0 +1,227 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.cache;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Naming client service information holder.
*
* @author xiweng.yy
*/
public class ServiceInfoHolder implements Closeable {
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
private final EventDispatcher eventDispatcher;
private final FailoverReactor failoverReactor;
private String cacheDir;
public ServiceInfoHolder(EventDispatcher eventDispatcher, String namespace, Properties properties) {
initCacheDir(namespace);
this.eventDispatcher = eventDispatcher;
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
}
private void initCacheDir(String namespace) {
cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
if (StringUtils.isEmpty(cacheDir)) {
cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
}
}
private boolean isLoadCacheAtStart(Properties properties) {
boolean loadCacheAtStart = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
loadCacheAtStart = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
}
return loadCacheAtStart;
}
public Map<String, ServiceInfo> getServiceInfoMap() {
return serviceInfoMap;
}
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo result = getServiceInfo0(groupedServiceName, clusters);
return null == result ? new ServiceInfo(groupedServiceName, clusters) : result;
}
private ServiceInfo getServiceInfo0(String groupedServiceName, String clusters) {
String key = ServiceInfo.getKey(groupedServiceName, clusters);
return serviceInfoMap.get(key);
}
/**
* Process service json.
*
* @param json service json
* @return service info
*/
public ServiceInfo processServiceInfo(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
return processServiceInfo(serviceInfo);
}
/**
* Process service info.
*
* @param serviceInfo new service info
* @return service info
*/
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
private boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
if (null == oldService) {
NAMING_LOGGER.info("init new ips(" + newService.ipCount() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(newService.getHosts()));
return true;
}
if (oldService.getLastRefTime() > newService.getLastRefTime()) {
NAMING_LOGGER
.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + newService
.getLastRefTime());
}
boolean changed = false;
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(newService.getHosts().size());
for (Instance host : newService.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER
.info("new ips(" + newHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils
.toJson(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(modHosts));
}
return changed;
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
failoverReactor.shutdown();
NAMING_LOGGER.info("{} do shutdown stop", className);
}
}

View File

@ -1,464 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Host reactor.
*
* @author xuanyin
*/
public class HostReactor implements Closeable {
private static final long DEFAULT_DELAY = 1000L;
private static final long UPDATE_HOLD_INTERVAL = 5000L;
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private final Map<String, ServiceInfo> serviceInfoMap;
private final Map<String, Object> updatingMap;
private final PushReceiver pushReceiver;
private final EventDispatcher eventDispatcher;
private final NamingHttpClientProxy serverProxy;
private final FailoverReactor failoverReactor;
private final ScheduledExecutorService executor;
private String cacheDir;
public HostReactor(EventDispatcher eventDispatcher, NamingHttpClientProxy serverProxy, String namespace,
Properties properties) {
initCacheDir(namespace);
// init executorService
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
}
private void initCacheDir(String namespace) {
cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
if (StringUtils.isEmpty(cacheDir)) {
cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
}
}
private int initPollingThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_POLLING_THREAD_COUNT;
}
return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_POLLING_THREAD_COUNT),
UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
private boolean isLoadCacheAtStart(Properties properties) {
boolean loadCacheAtStart = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
loadCacheAtStart = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
}
return loadCacheAtStart;
}
public Map<String, ServiceInfo> getServiceInfoMap() {
return serviceInfoMap;
}
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
/**
* Process service json.
*
* @param json service json
* @return service info
*/
public ServiceInfo processServiceInfo(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
return processServiceInfo(serviceInfo);
}
/**
* Process service info.
*
* @param serviceInfo new service info
* @return service info
*/
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
boolean changed = false;
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils
.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
serverProxy.updateBeatInfo(modHosts);
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(modHosts));
}
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else {
changed = true;
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
}
return serviceInfo;
}
private ServiceInfo getServiceInfo0(String groupedServiceName, String clusters) {
String key = ServiceInfo.getKey(groupedServiceName, clusters);
return serviceInfoMap.get(key);
}
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String groupName,
final String clusters) throws NacosException {
return serverProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
}
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(groupedServiceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(groupedServiceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingService(groupedServiceName);
updateServiceNow(serviceName, groupName, clusters);
finishUpdating(groupedServiceName);
} else if (updatingMap.containsKey(groupedServiceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, groupName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
/**
* Schedule update if absent.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters
*/
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
/**
* Update service now.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters
*/
public void updateServiceNow(String serviceName, String groupName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
ServiceInfo result = serverProxy
.queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
if (null != result) {
processServiceInfo(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
/**
* Refresh only.
*
* @param serviceName service name
* @param groupName group name
* @param clusters cluster
*/
public void refreshOnly(String serviceName, String groupName, String clusters) {
try {
serverProxy.queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
pushReceiver.shutdown();
failoverReactor.shutdown();
NAMING_LOGGER.info("{} do shutdown stop", className);
}
public void updatingService(String serviceName) {
updatingMap.put(serviceName, new Object());
}
public void finishUpdating(String serviceName) {
updatingMap.remove(serviceName);
}
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String serviceName;
private final String groupName;
private final String clusters;
private final String groupedServiceName;
private final String serviceKey;
public UpdateTask(String serviceName, String groupName, String clusters) {
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
@Override
public void run() {
long delayTime = -1;
try {
ServiceInfo serviceObj = serviceInfoMap.get(serviceKey);
if (serviceObj == null) {
updateServiceNow(serviceName, groupName, clusters);
delayTime = DEFAULT_DELAY;
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, groupName, clusters);
serviceObj = serviceInfoMap.get(serviceKey);
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, groupName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(groupedServiceName, clusters) && !futureMap.containsKey(serviceKey)) {
// abort the update task
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;
}
delayTime = serviceObj.getCacheMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
} finally {
if (delayTime > 0) {
executor.schedule(this, delayTime, TimeUnit.MILLISECONDS);
}
}
}
}
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
@ -45,13 +46,13 @@ public class PushReceiver implements Runnable, Closeable {
private DatagramSocket udpSocket;
private HostReactor hostReactor;
private ServiceInfoHolder serviceInfoHolder;
private volatile boolean closed = false;
public PushReceiver(HostReactor hostReactor) {
public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
try {
this.hostReactor = hostReactor;
this.serviceInfoHolder = serviceInfoHolder;
this.udpSocket = new DatagramSocket();
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
@ -86,7 +87,7 @@ public class PushReceiver implements Runnable, Closeable {
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceInfo(pushPacket.data);
serviceInfoHolder.processServiceInfo(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
@ -94,7 +95,7 @@ public class PushReceiver implements Runnable, Closeable {
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only

View File

@ -0,0 +1,190 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Service information update service.
*
* @author xiweng.yy
*/
public class ServiceInfoUpdateService implements Closeable {
private static final long DEFAULT_DELAY = 1000L;
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private final ServiceInfoHolder serviceInfoHolder;
private final ScheduledExecutorService executor;
private final NamingClientProxy namingClientProxy;
private final EventDispatcher eventDispatcher;
public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder,
NamingClientProxy namingClientProxy, EventDispatcher eventDispatcher) {
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
this.serviceInfoHolder = serviceInfoHolder;
this.namingClientProxy = namingClientProxy;
this.eventDispatcher = eventDispatcher;
}
private int initPollingThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_POLLING_THREAD_COUNT;
}
return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_POLLING_THREAD_COUNT),
UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
/**
* Schedule update if absent.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters
*/
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
/**
* Stop to schedule update if contain task.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters
*/
public void stopUpdateIfContain(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (!futureMap.containsKey(serviceKey)) {
return;
}
synchronized (futureMap) {
if (!futureMap.containsKey(serviceKey)) {
return;
}
futureMap.remove(serviceKey);
}
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
NAMING_LOGGER.info("{} do shutdown stop", className);
}
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String serviceName;
private final String groupName;
private final String clusters;
private final String groupedServiceName;
private final String serviceKey;
public UpdateTask(String serviceName, String groupName, String clusters) {
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
@Override
public void run() {
long delayTime = -1;
try {
if (!eventDispatcher.isSubscribed(groupedServiceName, clusters) && !futureMap.containsKey(serviceKey)) {
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;
}
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
delayTime = DEFAULT_DELAY;
lastRefTime = serviceObj.getLastRefTime();
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
}
lastRefTime = serviceObj.getLastRefTime();
delayTime = serviceObj.getCacheMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
} finally {
if (delayTime > 0) {
executor.schedule(this, delayTime, TimeUnit.MILLISECONDS);
}
}
}
}
}

View File

@ -142,11 +142,12 @@ public interface NamingClientProxy extends Closeable {
/**
* Unsubscribe service.
*
* @param serviceName full service name with group
* @param serviceName service name
* @param groupName group name
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @throws NacosException nacos exception
*/
void unsubscribe(String serviceName, String clusters) throws NacosException;
void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException;
/**
* Update beat info.
@ -154,4 +155,11 @@ public interface NamingClientProxy extends Closeable {
* @param modifiedInstances modified instances
*/
void updateBeatInfo(Set<Instance> modifiedInstances);
/**
* Check Server healthy.
*
* @return true if server is healthy
*/
boolean serverHealthy();
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import java.util.Properties;
import java.util.Set;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Delegate of naming client proxy.
*
* @author xiweng.yy
*/
public class NamingClientProxyDelegate implements NamingClientProxy {
private final ServerListManager serverListManager;
private final ServiceInfoUpdateService serviceInfoUpdateService;
private final ServiceInfoHolder serviceInfoHolder;
private final NamingHttpClientProxy httpClientProxy;
private final NamingGrpcClientProxy grpcClientProxy;
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
EventDispatcher eventDispatcher) throws NacosException {
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
eventDispatcher);
this.serverListManager = new ServerListManager(properties);
this.serviceInfoHolder = serviceInfoHolder;
this.httpClientProxy = new NamingHttpClientProxy(namespace, serverListManager, properties, serviceInfoHolder);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, serverListManager, serviceInfoHolder);
}
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy().registerService(serviceName, groupName, instance);
}
@Override
public void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy().deregisterService(serviceName, groupName, instance);
}
@Override
public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException {
}
@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
return getExecuteClientProxy().queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}
@Override
public Service queryService(String serviceName, String groupName) throws NacosException {
return null;
}
@Override
public void createService(Service service, AbstractSelector selector) throws NacosException {
}
@Override
public boolean deleteService(String serviceName, String groupName) throws NacosException {
return false;
}
@Override
public void updateService(Service service, AbstractSelector selector) throws NacosException {
}
@Override
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
return httpClientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result) {
result = getExecuteClientProxy().subscribe(serviceName, groupName, clusters);
}
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
serviceInfoHolder.processServiceInfo(result);
return result;
}
@Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
serviceInfoUpdateService.stopUpdateIfContain(serviceName, groupName, clusters);
getExecuteClientProxy().unsubscribe(serviceName, groupName, clusters);
}
@Override
public void updateBeatInfo(Set<Instance> modifiedInstances) {
httpClientProxy.updateBeatInfo(modifiedInstances);
}
@Override
public boolean serverHealthy() {
return httpClientProxy.serverHealthy();
}
private NamingClientProxy getExecuteClientProxy() {
return grpcClientProxy.isEnable() ? grpcClientProxy : httpClientProxy;
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
serviceInfoUpdateService.shutdown();
serverListManager.shutdown();
httpClientProxy.shutdown();
grpcClientProxy.shutdown();
NAMING_LOGGER.info("{} do shutdown stop", className);
}
}

View File

@ -31,7 +31,7 @@ import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.RpcClientFactory;
@ -50,26 +50,19 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
private final String namespaceId;
private HostReactor hostReactor;
private final RpcClient rpcClient;
private RpcClient rpcClient;
public NamingGrpcClientProxy(String namespaceId, HostReactor hostReactor) {
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.hostReactor = hostReactor;
this.rpcClient = RpcClientFactory.getClient("naming");
start(serverListFactory, serviceInfoHolder);
}
/**
* Start Grpc client proxy.
*
* @param serverListFactory server list factory
* @throws NacosException nacos exception
*/
public void start(ServerListFactory serverListFactory) throws NacosException {
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.init(serverListFactory);
rpcClient.start();
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(hostReactor));
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder));
}
@Override
@ -99,7 +92,8 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, NamingUtils.getGroupedName(serviceName, groupName));
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId,
NamingUtils.getGroupedName(serviceName, groupName));
request.setCluster(clusters);
request.setHealthyOnly(healthyOnly);
request.setUdpPort(udpPort);
@ -136,23 +130,16 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
ServiceInfo serviceInfo = new ServiceInfo(serviceNameWithGroup, clusters);
if (hostReactor.getServiceInfoMap().containsKey(serviceInfo.getKey())) {
return hostReactor.getServiceInfoMap().get(serviceInfo.getKey());
}
hostReactor.updatingService(serviceNameWithGroup);
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters,
true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
ServiceInfo result = response.getServiceInfo();
hostReactor.getServiceInfoMap().put(result.getKey(), result);
hostReactor.finishUpdating(serviceNameWithGroup);
return result;
return response.getServiceInfo();
}
@Override
public void unsubscribe(String serviceName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, false);
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId,
NamingUtils.getGroupedName(serviceName, groupName), clusters, false);
requestToServer(request, SubscribeServiceResponse.class);
}
@ -160,6 +147,12 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
public void updateBeatInfo(Set<Instance> modifiedInstances) {
}
@Override
public boolean serverHealthy() {
// TODO check server healthy by grpc
return false;
}
private <T extends Response> T requestToServer(Request request, Class<T> responseClass) throws NacosException {
try {
Response response = rpcClient.request(request);
@ -181,4 +174,8 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
public void shutdown() throws NacosException {
rpcClient.shutdown();
}
public boolean isEnable() {
return rpcClient.isRunning();
}
}

View File

@ -18,7 +18,7 @@ package com.alibaba.nacos.client.naming.remote.gprc;
import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.remote.ServerPushResponseHandler;
/**
@ -28,15 +28,15 @@ import com.alibaba.nacos.client.remote.ServerPushResponseHandler;
*/
public class NamingPushResponseHandler implements ServerPushResponseHandler<NotifySubscriberResponse> {
private final HostReactor hostReactor;
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushResponseHandler(HostReactor hostReactor) {
this.hostReactor = hostReactor;
public NamingPushResponseHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}
@Override
public void responseReply(Response response) {
NotifySubscriberResponse notifyResponse = (NotifySubscriberResponse) response;
hostReactor.processServiceInfo(notifyResponse.getServiceInfo());
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
}
}

View File

@ -33,6 +33,8 @@ import com.alibaba.nacos.client.config.impl.SpasAdapter;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.PushReceiver;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
@ -90,13 +92,18 @@ public class NamingHttpClientProxy implements NamingClientProxy {
private final BeatReactor beatReactor;
private final ServiceInfoHolder serviceInfoHolder;
private final PushReceiver pushReceiver;
private int serverPort = DEFAULT_SERVER_PORT;
private ScheduledExecutorService executorService;
private Properties properties;
public NamingHttpClientProxy(String namespaceId, ServerListManager serverListManager, Properties properties) {
public NamingHttpClientProxy(String namespaceId, ServerListManager serverListManager, Properties properties,
ServiceInfoHolder serviceInfoHolder) {
this.serverListManager = serverListManager;
this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
this.properties = properties;
@ -104,6 +111,8 @@ public class NamingHttpClientProxy implements NamingClientProxy {
this.namespaceId = namespaceId;
this.beatReactor = new BeatReactor(this, properties);
this.initRefreshTask();
this.pushReceiver = new PushReceiver(serviceInfoHolder);
this.serviceInfoHolder = serviceInfoHolder;
}
private void initRefreshTask() {
@ -140,7 +149,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
@ -196,8 +205,8 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));
@ -209,7 +218,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
if (StringUtils.isNotEmpty(result)) {
return JacksonUtils.toObj(result, ServiceInfo.class);
}
return null;
return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
}
@Override
@ -298,11 +307,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
return JacksonUtils.toObj(result);
}
/**
* Check Server healthy.
*
* @return true if server is healthy
*/
@Override
public boolean serverHealthy() {
try {
@ -316,10 +321,6 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
}
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {
return getServiceList(pageNo, pageSize, groupName, null);
}
@Override
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
@ -356,12 +357,11 @@ public class NamingHttpClientProxy implements NamingClientProxy {
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
return null;
return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
}
@Override
public void unsubscribe(String serviceName, String clusters) throws NacosException {
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
}
@Override

View File

@ -1,122 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacos.common.utils.JacksonUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class HostReactorTest {
@Mock
private NamingHttpClientProxy namingHttpClientProxy;
@Mock
private EventDispatcher eventDispatcher;
private HostReactor hostReactor;
@Before
public void setUp() throws Exception {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName("testName");
beatInfo.setIp("1.1.1.1");
beatInfo.setPort(1234);
beatInfo.setCluster("clusterName");
beatInfo.setWeight(1);
beatInfo.setMetadata(new HashMap<String, String>());
beatInfo.setScheduled(false);
beatInfo.setPeriod(1000L);
hostReactor = new HostReactor(eventDispatcher, namingHttpClientProxy, "public", null);
}
@Test
public void testProcessServiceJson() {
ServiceInfo actual = hostReactor.processServiceInfo(EXAMPLE);
assertServiceInfo(actual);
ServiceInfo actual2 = hostReactor.processServiceInfo(CHANGE_DATA_EXAMPLE);
assertEquals(2.0, actual2.getHosts().get(0).getWeight(), 0.0);
}
@Test
public void testGetServiceInfoDirectlyFromServer() throws NacosException {
when(namingHttpClientProxy.queryInstancesOfService("testName", "", "testClusters", 0, false))
.thenReturn(JacksonUtils.toObj(EXAMPLE, ServiceInfo.class));
ServiceInfo actual = hostReactor.getServiceInfoDirectlyFromServer("testName", "", "testClusters");
assertServiceInfo(actual);
}
private void assertServiceInfo(ServiceInfo actual) {
assertEquals("testName", actual.getName());
assertEquals("testClusters", actual.getClusters());
assertEquals("", actual.getChecksum());
assertEquals(1000, actual.getCacheMillis());
assertEquals(0, actual.getLastRefTime());
assertNull(actual.getGroupName());
assertTrue(actual.isValid());
assertFalse(actual.isAllIPs());
assertEquals(1, actual.getHosts().size());
assertInstance(actual.getHosts().get(0));
}
private void assertInstance(Instance actual) {
assertEquals("1.1.1.1", actual.getIp());
assertEquals("testClusters", actual.getClusterName());
assertEquals("testName", actual.getServiceName());
assertEquals(1234, actual.getPort());
}
private static final String EXAMPLE =
"{\n" + "\t\"name\": \"testName\",\n" + "\t\"clusters\": \"testClusters\",\n" + "\t\"cacheMillis\": 1000,\n"
+ "\t\"hosts\": [{\n" + "\t\t\"ip\": \"1.1.1.1\",\n" + "\t\t\"port\": 1234,\n"
+ "\t\t\"weight\": 1.0,\n" + "\t\t\"healthy\": true,\n" + "\t\t\"enabled\": true,\n"
+ "\t\t\"ephemeral\": true,\n" + "\t\t\"clusterName\": \"testClusters\",\n"
+ "\t\t\"serviceName\": \"testName\",\n" + "\t\t\"metadata\": {},\n"
+ "\t\t\"instanceHeartBeatInterval\": 5000,\n" + "\t\t\"instanceHeartBeatTimeOut\": 15000,\n"
+ "\t\t\"ipDeleteTimeout\": 30000,\n" + "\t\t\"instanceIdGenerator\": \"simple\"\n" + "\t}],\n"
+ "\t\"lastRefTime\": 0,\n" + "\t\"checksum\": \"\",\n" + "\t\"allIPs\": false,\n"
+ "\t\"valid\": true\n" + "}";
//the weight changed from 1.0 to 2.0
private static final String CHANGE_DATA_EXAMPLE =
"{\n" + "\t\"name\": \"testName\",\n" + "\t\"clusters\": \"testClusters\",\n" + "\t\"cacheMillis\": 1000,\n"
+ "\t\"hosts\": [{\n" + "\t\t\"ip\": \"1.1.1.1\",\n" + "\t\t\"port\": 1234,\n"
+ "\t\t\"weight\": 2.0,\n" + "\t\t\"healthy\": true,\n" + "\t\t\"enabled\": true,\n"
+ "\t\t\"ephemeral\": true,\n" + "\t\t\"clusterName\": \"testClusters\",\n"
+ "\t\t\"serviceName\": \"testName\",\n" + "\t\t\"metadata\": {},\n"
+ "\t\t\"instanceHeartBeatInterval\": 5000,\n" + "\t\t\"instanceHeartBeatTimeOut\": 15000,\n"
+ "\t\t\"ipDeleteTimeout\": 30000,\n" + "\t\t\"instanceIdGenerator\": \"simple\"\n" + "\t}],\n"
+ "\t\"lastRefTime\": 0,\n" + "\t\"checksum\": \"\",\n" + "\t\"allIPs\": false,\n"
+ "\t\"valid\": true\n" + "}";
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.naming;
import com.alibaba.nacos.Nacos;
@ -23,6 +24,7 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.NacosNamingService;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.test.utils.NamingTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -46,12 +48,14 @@ import static com.alibaba.nacos.test.naming.NamingBase.randomDomainName;
* @date 2018/11/13
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Nacos.class, properties = {"server.servlet.context-path=/nacos"},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = Nacos.class, properties = {
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class AutoDeregisterInstance_ITCase {
private NamingService naming;
private NamingService naming2;
@LocalServerPort
private int port;
@ -74,7 +78,7 @@ public class AutoDeregisterInstance_ITCase {
}
@After
public void destroy() throws Exception{
public void destroy() throws Exception {
NamingBase.destoryServer(port);
}
@ -99,8 +103,9 @@ public class AutoDeregisterInstance_ITCase {
NacosNamingService namingServiceImpl = (NacosNamingService) naming;
namingServiceImpl.getBeatReactor().
removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1", TEST_PORT);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1",
TEST_PORT);
verifyInstanceList(instances, 1, serviceName);
instances = naming.getAllInstances(serviceName);
@ -136,8 +141,9 @@ public class AutoDeregisterInstance_ITCase {
NacosNamingService namingServiceImpl = (NacosNamingService) naming;
namingServiceImpl.getBeatReactor().
removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1", TEST_PORT);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1",
TEST_PORT);
verifyInstanceList(instances, 1, serviceName);
instances = naming.getAllInstances(serviceName);
@ -168,8 +174,9 @@ public class AutoDeregisterInstance_ITCase {
NacosNamingService namingServiceImpl = (NacosNamingService) naming;
namingServiceImpl.getBeatReactor().
removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1", TEST_PORT);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1",
TEST_PORT);
verifyInstanceList(instances, 1, serviceName);
@ -181,8 +188,8 @@ public class AutoDeregisterInstance_ITCase {
beatInfo.setIp("127.0.0.1");
beatInfo.setPort(TEST_PORT);
namingServiceImpl.getBeatReactor().
addBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, beatInfo);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.addBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, beatInfo);
verifyInstanceList(instances, 2, serviceName);
@ -214,8 +221,9 @@ public class AutoDeregisterInstance_ITCase {
NacosNamingService namingServiceImpl = (NacosNamingService) naming;
namingServiceImpl.getBeatReactor().
removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1", TEST_PORT);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "127.0.0.1",
TEST_PORT);
verifyInstanceList(instances, 1, serviceName);
@ -228,8 +236,8 @@ public class AutoDeregisterInstance_ITCase {
beatInfo.setPort(TEST_PORT);
beatInfo.setCluster("c1");
namingServiceImpl.getBeatReactor().
addBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, beatInfo);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.addBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, beatInfo);
//TimeUnit.SECONDS.sleep(15);
verifyInstanceList(instances, 2, serviceName);

View File

@ -25,8 +25,8 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.NacosNamingService;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.test.base.Params;
import com.alibaba.nacos.test.utils.NamingTestUtils;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -35,7 +35,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.*;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
@ -46,19 +50,25 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.test.naming.NamingBase.*;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_GROUP_1;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_GROUP_2;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_PORT3_4_DOM_1;
import static com.alibaba.nacos.test.naming.NamingBase.randomDomainName;
/**
* @author nkorange
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Nacos.class, properties = {"server.servlet.context-path=/nacos"},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = Nacos.class, properties = {
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MultiTenant_InstanceAPI_ITCase {
private NamingService naming;
private NamingService naming1;
private NamingService naming2;
@LocalServerPort
private int port;
@ -92,7 +102,6 @@ public class MultiTenant_InstanceAPI_ITCase {
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port);
naming1 = NamingFactory.createNamingService(properties);
properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, "namespace-2");
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port);
@ -119,21 +128,14 @@ public class MultiTenant_InstanceAPI_ITCase {
String url = "/nacos/v1/ns/instance/list";
ResponseEntity<String> response = request(url,
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("namespaceId", "namespace-1")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", "namespace-1")
.done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals("11.11.11.11", json.get("hosts").get(0).get("ip").asText());
response = request(url,
Params.newParams()
.appendParam("serviceName", serviceName)
.done(),
String.class);
response = request(url, Params.newParams().appendParam("serviceName", serviceName).done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
@ -149,34 +151,27 @@ public class MultiTenant_InstanceAPI_ITCase {
public void multipleTenant_group_listInstance() throws Exception {
String serviceName = randomDomainName();
naming1.registerInstance(serviceName, TEST_GROUP_1,"11.11.11.11", 80);
naming1.registerInstance(serviceName, TEST_GROUP_1, "11.11.11.11", 80);
naming1.registerInstance(serviceName,"22.22.22.22", 80);
naming1.registerInstance(serviceName, "22.22.22.22", 80);
naming.registerInstance(serviceName, TEST_GROUP_1,"33.33.33.33", 8888);
naming.registerInstance(serviceName, TEST_GROUP_2,"44.44.44.44", 8888);
naming.registerInstance(serviceName, TEST_GROUP_1, "33.33.33.33", 8888);
naming.registerInstance(serviceName, TEST_GROUP_2, "44.44.44.44", 8888);
TimeUnit.SECONDS.sleep(5L);
String url = "/nacos/v1/ns/instance/list";
ResponseEntity<String> response = request(url,
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("namespaceId", "namespace-1")
.appendParam("groupName", TEST_GROUP_1)
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", "namespace-1")
.appendParam("groupName", TEST_GROUP_1).done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals("11.11.11.11", json.get("hosts").get(0).get("ip").asText());
response = request(url,
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("groupName", TEST_GROUP_1)
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("groupName", TEST_GROUP_1)
.done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
@ -203,22 +198,14 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
Params.newParams().appendParam("serviceName", serviceName)
.appendParam("ip", "33.33.33.33") //错误的IP隔离验证
.appendParam("port", "8888")
.appendParam("namespaceId", "namespace-2")
.done(),
String.class);
.appendParam("port", "8888").appendParam("namespaceId", "namespace-2").done(), String.class);
Assert.assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("clusters", "c1")
.appendParam("healthyOnly", "true")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("clusters", "c1")
.appendParam("healthyOnly", "true").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
@ -244,23 +231,14 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("groupName", TEST_GROUP_1)
Params.newParams().appendParam("serviceName", serviceName).appendParam("groupName", TEST_GROUP_1)
.appendParam("ip", "33.33.33.33") //不存在的IP隔离验证
.appendParam("port", "8888")
.appendParam("namespaceId", "namespace-2")
.done(),
String.class);
.appendParam("port", "8888").appendParam("namespaceId", "namespace-2").done(), String.class);
Assert.assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("clusters", "c2")
.appendParam("healthyOnly", "true")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("clusters", "c2")
.appendParam("healthyOnly", "true").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
@ -286,25 +264,19 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(3L);
//AP下通过HTTP删除实例前必须删除心跳
NacosNamingService namingServiceImpl = (NacosNamingService) naming2;
namingServiceImpl.getBeatReactor().
removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "33.33.33.33", 8888);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceName, "33.33.33.33",
8888);
TimeUnit.SECONDS.sleep(3L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("ip", "33.33.33.33")
.appendParam("port", "8888")
.appendParam("namespaceId", "namespace-1") //删除namespace-1中没有的IP
.done(),
String.class,
HttpMethod.DELETE);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "33.33.33.33")
.appendParam("port", "8888").appendParam("namespaceId", "namespace-1") //删除namespace-1中没有的IP
.done(), String.class, HttpMethod.DELETE);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(2, json.get("hosts").size());
@ -319,35 +291,27 @@ public class MultiTenant_InstanceAPI_ITCase {
public void multipleTenant_group_deleteInstance() throws Exception {
String serviceName = randomDomainName();
naming1.registerInstance(serviceName, TEST_GROUP_1,"11.11.11.11", 80);
naming1.registerInstance(serviceName, TEST_GROUP_1, "11.11.11.11", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2,"22.22.22.22", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2, "22.22.22.22", 80);
TimeUnit.SECONDS.sleep(5L);
//AP下通过HTTP删除实例前必须删除心跳
NacosNamingService namingServiceImpl = (NacosNamingService) naming2;
namingServiceImpl.getBeatReactor().
removeBeatInfo(TEST_GROUP_2 + Constants.SERVICE_INFO_SPLITER + serviceName, "22.22.22.22", 80);
NamingTestUtils.getBeatReactorByReflection(namingServiceImpl)
.removeBeatInfo(TEST_GROUP_2 + Constants.SERVICE_INFO_SPLITER + serviceName, "22.22.22.22", 80);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
Params.newParams().appendParam("serviceName", serviceName)
.appendParam("namespaceId", "namespace-2") //删除namespace-1中没有的IP
.appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22")
.appendParam("port", TEST_PORT3_4_DOM_1)
.done(),
String.class,
HttpMethod.DELETE);
.appendParam("groupName", TEST_GROUP_2).appendParam("ip", "22.22.22.22")
.appendParam("port", TEST_PORT3_4_DOM_1).done(), String.class, HttpMethod.DELETE);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.appendParam("groupName", TEST_GROUP_2)
.done(),
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").appendParam("groupName", TEST_GROUP_2).done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
@ -373,32 +337,21 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("ip", "33.33.33.33")
.appendParam("port", "8888")
.done(),
String.class,
HttpMethod.PUT);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "33.33.33.33")
.appendParam("port", "8888").done(), String.class, HttpMethod.PUT);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
//namespace-2个数
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
System.out.println(json);
@ -415,29 +368,20 @@ public class MultiTenant_InstanceAPI_ITCase {
String serviceName = randomDomainName();
naming1.registerInstance(serviceName, "11.11.11.11", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2,"22.22.22.22", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2, "22.22.22.22", 80);
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22")
.appendParam("port", "80")
.appendParam("namespaceId", "namespace-2")
.appendParam("weight", "8.0")
.done(),
String.class,
Params.newParams().appendParam("serviceName", serviceName).appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22").appendParam("port", "80")
.appendParam("namespaceId", "namespace-2").appendParam("weight", "8.0").done(), String.class,
HttpMethod.PUT);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.appendParam("groupName", TEST_GROUP_2)
.done(),
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").appendParam("groupName", TEST_GROUP_2).done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
@ -455,41 +399,26 @@ public class MultiTenant_InstanceAPI_ITCase {
String serviceName = randomDomainName();
naming1.registerInstance(serviceName, "11.11.11.11", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2,"22.22.22.22", 80);
naming2.registerInstance(serviceName, TEST_GROUP_2, "22.22.22.22", 80);
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22")
.appendParam("port", "80")
.appendParam("namespaceId", "namespace-2")
.appendParam("weight", "8.0")
.done(),
String.class,
Params.newParams().appendParam("serviceName", serviceName).appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22").appendParam("port", "80")
.appendParam("namespaceId", "namespace-2").appendParam("weight", "8.0").done(), String.class,
HttpMethod.PUT);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22")
.appendParam("port", "80")
.appendParam("namespaceId", "namespace-2")
.done(),
String.class,
HttpMethod.PATCH);
Params.newParams().appendParam("serviceName", serviceName).appendParam("groupName", TEST_GROUP_2)
.appendParam("ip", "22.22.22.22").appendParam("port", "80")
.appendParam("namespaceId", "namespace-2").done(), String.class, HttpMethod.PATCH);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.appendParam("groupName", TEST_GROUP_2)
.done(),
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").appendParam("groupName", TEST_GROUP_2).done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
@ -516,33 +445,22 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("ip", "33.33.33.33")
.appendParam("port", "8888")
.appendParam("namespaceId", "namespace-1") //新增
.done(),
String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "33.33.33.33")
.appendParam("port", "8888").appendParam("namespaceId", "namespace-1") //新增
.done(), String.class, HttpMethod.POST);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(2, json.get("hosts").size());
//namespace-2个数
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
@ -562,23 +480,14 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("ip", "33.33.33.33")
.appendParam("port", "8888")
.appendParam("namespaceId", "namespace-1") //新增
.appendParam("groupName", TEST_GROUP_1)
.done(),
String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "33.33.33.33")
.appendParam("port", "8888").appendParam("namespaceId", "namespace-1") //新增
.appendParam("groupName", TEST_GROUP_1).done(), String.class, HttpMethod.POST);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1")
.appendParam("groupName", TEST_GROUP_1)
.done(),
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1").appendParam("groupName", TEST_GROUP_1).done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
@ -602,33 +511,22 @@ public class MultiTenant_InstanceAPI_ITCase {
TimeUnit.SECONDS.sleep(5L);
ResponseEntity<String> response = request("/nacos/v1/ns/instance",
Params.newParams()
.appendParam("serviceName", serviceName)
.appendParam("ip", "11.11.11.11")
.appendParam("port", "80")
.appendParam("namespaceId", "namespace-1") //新增
.done(),
String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "11.11.11.11")
.appendParam("port", "80").appendParam("namespaceId", "namespace-1") //新增
.done(), String.class, HttpMethod.POST);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-1").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
//namespace-2个数
response = request("/nacos/v1/ns/instance/list",
Params.newParams()
.appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2")
.done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName) //获取naming中的实例
.appendParam("namespaceId", "namespace-2").done(), String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
Assert.assertEquals(1, json.get("hosts").size());
@ -636,7 +534,7 @@ public class MultiTenant_InstanceAPI_ITCase {
private void verifyInstanceListForNaming(NamingService naming, int size, String serviceName) throws Exception {
int i = 0;
while ( i < 20 ) {
while (i < 20) {
List<Instance> instances = naming.getAllInstances(serviceName);
if (instances.size() == size) {
break;
@ -651,7 +549,8 @@ public class MultiTenant_InstanceAPI_ITCase {
return request(path, params, clazz, HttpMethod.GET);
}
private <T> ResponseEntity<T> request(String path, MultiValueMap<String, String> params, Class<T> clazz, HttpMethod httpMethod) {
private <T> ResponseEntity<T> request(String path, MultiValueMap<String, String> params, Class<T> clazz,
HttpMethod httpMethod) {
HttpHeaders headers = new HttpHeaders();
@ -660,7 +559,6 @@ public class MultiTenant_InstanceAPI_ITCase {
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(this.base.toString() + path)
.queryParams(params);
return this.restTemplate.exchange(
builder.toUriString(), httpMethod, entity, clazz);
return this.restTemplate.exchange(builder.toUriString(), httpMethod, entity, clazz);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.utils;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import java.lang.reflect.Field;
public class NamingTestUtils {
public static BeatReactor getBeatReactorByReflection(NamingService namingService)
throws NoSuchFieldException, IllegalAccessException {
Field clientProxyField = namingService.getClass().getDeclaredField("clientProxy");
clientProxyField.setAccessible(true);
NamingClientProxy namingClientProxy = (NamingClientProxy) clientProxyField.get(namingService);
Field httpClientProxyField = namingClientProxy.getClass().getDeclaredField("httpClientProxy");
httpClientProxyField.setAccessible(true);
NamingHttpClientProxy httpClientProxy = (NamingHttpClientProxy) httpClientProxyField.get(namingClientProxy);
return httpClientProxy.getBeatReactor();
}
}