1. Support get all services from client; 2. Refactor dom class

This commit is contained in:
nkorange 2018-09-11 18:04:02 +08:00
parent d17e9f33b8
commit 899bee7ae7
15 changed files with 409 additions and 205 deletions

View File

@ -15,11 +15,13 @@
*/
package com.alibaba.nacos.api.naming;
import java.util.List;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import java.util.List;
/**
* @author dungu.zpf
@ -173,4 +175,22 @@ public interface NamingService {
* @throws NacosException
*/
void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException;
/**
* Get all service names from server
*
* @param pageNo page index
* @param pageSize page size
* @return list of service names
* @throws NacosException
*/
ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException;
/**
* Get all subscribed services of current client
*
* @return subscribed services
* @throws NacosException
*/
List<ServiceInfo> getSubscribeServices() throws NacosException;
}

View File

@ -0,0 +1,35 @@
package com.alibaba.nacos.api.naming.pojo;
import com.alibaba.fastjson.JSON;
import java.util.List;
/**
* @author dungu.zpf
*/
public class ListView<T> {
private List<T> data;
private int count;
public List<T> getData() {
return data;
}
public void setData(List<T> data) {
this.data = data;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.core;
package com.alibaba.nacos.api.naming.pojo;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.nacos.api.naming.pojo.Instance;
@ -27,7 +27,7 @@ import java.util.List;
/**
* @author dungu.zpf
*/
public class Domain {
public class ServiceInfo {
@JSONField(serialize = false)
private String jsonFromServer = StringUtils.EMPTY;
public static final String SPLITER = "@@";
@ -50,7 +50,7 @@ public class Domain {
private volatile boolean allIPs = false;
public Domain() {
public ServiceInfo() {
}
public boolean isAllIPs() {
@ -61,24 +61,24 @@ public class Domain {
this.allIPs = allIPs;
}
public Domain(String key) {
public ServiceInfo(String key) {
int maxKeySectionCount = 4;
int allIpFlagIndex = 3;
int envIndex = 2;
int clusterIndex = 1;
int domNameIndex = 0;
int serviceNameIndex = 0;
String[] keys = key.split(SPLITER);
if (keys.length >= maxKeySectionCount) {
this.name = keys[domNameIndex];
this.name = keys[serviceNameIndex];
this.clusters = keys[clusterIndex];
this.env = keys[envIndex];
if (StringUtils.equals(keys[allIpFlagIndex], UtilAndComs.ALL_IPS)) {
this.setAllIPs(true);
}
} else if (keys.length >= allIpFlagIndex) {
this.name = keys[domNameIndex];
this.name = keys[serviceNameIndex];
this.clusters = keys[clusterIndex];
if (StringUtils.equals(keys[envIndex], UtilAndComs.ALL_IPS)) {
this.setAllIPs(true);
@ -86,7 +86,7 @@ public class Domain {
this.env = keys[envIndex];
}
} else if (keys.length >= envIndex) {
this.name = keys[domNameIndex];
this.name = keys[serviceNameIndex];
if (StringUtils.equals(keys[clusterIndex], UtilAndComs.ALL_IPS)) {
this.setAllIPs(true);
} else {
@ -97,11 +97,11 @@ public class Domain {
this.name = keys[0];
}
public Domain(String name, String clusters) {
public ServiceInfo(String name, String clusters) {
this(name, clusters, StringUtils.EMPTY);
}
public Domain(String name, String clusters, String env) {
public ServiceInfo(String name, String clusters, String env) {
this.name = name;
this.clusters = clusters;
this.env = env;

View File

@ -21,9 +21,13 @@ import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.core.*;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.StringUtils;
@ -167,9 +171,9 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException {
Domain domain = hostReactor.getDom(serviceName, StringUtils.join(clusters, ","), StringUtils.EMPTY, false);
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","), StringUtils.EMPTY, false);
List<Instance> list;
if (domain == null || CollectionUtils.isEmpty(list = domain.getHosts())) {
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
throw new IllegalStateException("no host to srv for dom: " + serviceName);
}
return list;
@ -183,9 +187,9 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException {
Domain domain = hostReactor.getDom(serviceName, StringUtils.join(clusters, ","), StringUtils.EMPTY, false);
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","), StringUtils.EMPTY, false);
List<Instance> list;
if (domain == null || CollectionUtils.isEmpty(list = domain.getHosts())) {
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
throw new IllegalStateException("no host to srv for dom: " + serviceName);
}
@ -217,17 +221,17 @@ public class NacosNamingService implements NamingService {
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) {
return Balancer.RandomByWeight.selectHost(hostReactor.getDom(serviceName, StringUtils.join(clusters, ",")));
return Balancer.RandomByWeight.selectHost(hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ",")));
}
@Override
public void subscribe(String service, EventListener listener) {
eventDispatcher.addListener(hostReactor.getDom(service, StringUtils.EMPTY), StringUtils.EMPTY, listener);
eventDispatcher.addListener(hostReactor.getServiceInfo(service, StringUtils.EMPTY), StringUtils.EMPTY, listener);
}
@Override
public void subscribe(String service, List<String> clusters, EventListener listener) {
eventDispatcher.addListener(hostReactor.getDom(service, StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
eventDispatcher.addListener(hostReactor.getServiceInfo(service, StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
}
@Override
@ -239,4 +243,14 @@ public class NacosNamingService implements NamingService {
public void unsubscribe(String service, List<String> clusters, EventListener listener) {
eventDispatcher.removeListener(service, StringUtils.join(clusters, ","), listener);
}
@Override
public ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException {
return serverProxy.getServiceList(pageNo, pageSize);
}
@Override
public List<ServiceInfo> getSubscribeServices() {
return new ArrayList<>(hostReactor.getServiceInfoMap().values());
}
}

View File

@ -16,9 +16,9 @@
package com.alibaba.nacos.client.naming.backups;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.cache.ConcurrentDiskUtil;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.core.Domain;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.LogUtils;
@ -47,7 +47,7 @@ public class FailoverReactor {
this.init();
}
private Map<String, Domain> domainMap = new ConcurrentHashMap<String, Domain>();
private Map<String, ServiceInfo> serviceMap = new ConcurrentHashMap<String, ServiceInfo>();
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -144,7 +144,7 @@ public class FailoverReactor {
@Override
public void run() {
Map<String, Domain> domMap = new HashMap<String, Domain>(16);
Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {
@ -168,7 +168,7 @@ public class FailoverReactor {
continue;
}
Domain dom = new Domain(file.getName());
ServiceInfo dom = new ServiceInfo(file.getName());
try {
String dataString = ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString());
@ -177,7 +177,7 @@ public class FailoverReactor {
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JSON.parseObject(json, Domain.class);
dom = JSON.parseObject(json, ServiceInfo.class);
} catch (Exception e) {
LogUtils.LOG.error("NA", "error while parsing cached dom : " + json, e);
}
@ -203,24 +203,24 @@ public class FailoverReactor {
}
if (domMap.size() > 0) {
domainMap = domMap;
serviceMap = domMap;
}
}
}
class DiskFileWriter extends TimerTask {
public void run() {
Map<String, Domain> map = hostReactor.getDomMap();
for (Map.Entry<String, Domain> entry : map.entrySet()) {
Domain domain = entry.getValue();
if (StringUtils.equals(domain.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(domain.getName(), UtilAndComs.ENV_LIST_KEY)
|| StringUtils.equals(domain.getName(), "00-00---000-ENV_CONFIGS-000---00-00")
|| StringUtils.equals(domain.getName(), "vipclient.properties")
|| StringUtils.equals(domain.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY)
|| StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00")
|| StringUtils.equals(serviceInfo.getName(), "vipclient.properties")
|| StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
continue;
}
DiskCache.write(domain, failoverDir);
DiskCache.write(serviceInfo, failoverDir);
}
}
}
@ -229,14 +229,14 @@ public class FailoverReactor {
return Boolean.parseBoolean(switchParams.get("failover-mode"));
}
public Domain getDom(String key) {
Domain domain = domainMap.get(key);
public ServiceInfo getService(String key) {
ServiceInfo serviceInfo = serviceMap.get(key);
if (domain == null) {
domain = new Domain();
domain.setName(key);
if (serviceInfo == null) {
serviceInfo = new ServiceInfo();
serviceInfo.setName(key);
}
return domain;
return serviceInfo;
}
}

View File

@ -17,7 +17,7 @@ package com.alibaba.nacos.client.naming.cache;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.core.Domain;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.StringUtils;
@ -36,7 +36,7 @@ import java.util.Map;
*/
public class DiskCache {
public static void write(Domain dom, String dir) {
public static void write(ServiceInfo dom, String dir) {
try {
makeSureCacheDirExists(dir);
@ -72,8 +72,8 @@ public class DiskCache {
return lineSeparator;
}
public static Map<String, Domain> read(String cacheDir) {
Map<String, Domain> domMap = new HashMap<String, Domain>(16);
public static Map<String, ServiceInfo> read(String cacheDir) {
Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {
@ -87,12 +87,12 @@ public class DiskCache {
continue;
}
if (!(file.getName().endsWith(Domain.SPLITER + "meta") || file.getName().endsWith(Domain.SPLITER + "special-url"))) {
Domain dom = new Domain(file.getName());
if (!(file.getName().endsWith(ServiceInfo.SPLITER + "meta") || file.getName().endsWith(ServiceInfo.SPLITER + "special-url"))) {
ServiceInfo dom = new ServiceInfo(file.getName());
List<Instance> ips = new ArrayList<Instance>();
dom.setHosts(ips);
Domain newFormat = null;
ServiceInfo newFormat = null;
try {
String dataString = ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString());
@ -105,7 +105,7 @@ public class DiskCache {
continue;
}
newFormat = JSON.parseObject(json, Domain.class);
newFormat = JSON.parseObject(json, ServiceInfo.class);
if (StringUtils.isEmpty(newFormat.getName())) {
ips.add(JSON.parseObject(json, Instance.class));

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.utils.Chooser;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.LogUtils;
@ -23,10 +24,7 @@ import com.alibaba.nacos.client.naming.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author xuanyin
@ -36,33 +34,33 @@ public class Balancer {
/**
* report status to server
*/
public final static List<String> UNCONSISTENT_DOM_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();
public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();
public static class RandomByWeight {
public static List<Instance> selectAll(Domain dom) {
List<Instance> hosts = nothing(dom);
public static List<Instance> selectAll(ServiceInfo serviceInfo) {
List<Instance> hosts = nothing(serviceInfo);
if (CollectionUtils.isEmpty(hosts)) {
throw new IllegalStateException("no host to srv for dom: " + dom.getName());
throw new IllegalStateException("no host to srv for serviceInfo: " + serviceInfo.getName());
}
return hosts;
}
public static Instance selectHost(Domain dom) {
public static Instance selectHost(ServiceInfo dom) {
List<Instance> hosts = selectAll(dom);
if (CollectionUtils.isEmpty(hosts)) {
throw new IllegalStateException("no host to srv for dom: " + dom.getName());
throw new IllegalStateException("no host to srv for service: " + dom.getName());
}
return getHostByRandomWeight(hosts);
}
public static List<Instance> nothing(Domain dom) {
return dom.getHosts();
public static List<Instance> nothing(ServiceInfo serviceInfo) {
return serviceInfo.getHosts();
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.StringUtils;
@ -35,7 +36,7 @@ public class EventDispatcher {
private ExecutorService executor = null;
private BlockingQueue<Domain> changedDoms = new LinkedBlockingQueue<Domain>();
private BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
private ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>();
@ -54,26 +55,26 @@ public class EventDispatcher {
executor.execute(new Notifier());
}
public void addListener(Domain dom, String clusters, EventListener listener) {
addListener(dom, clusters, StringUtils.EMPTY, listener);
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
addListener(serviceInfo, clusters, StringUtils.EMPTY, listener);
}
public void addListener(Domain dom, String clusters, String env, EventListener listener) {
public void addListener(ServiceInfo serviceInfo, String clusters, String env, EventListener listener) {
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);
observers = observerMap.putIfAbsent(Domain.getKey(dom.getName(), clusters, env), observers);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters, env), observers);
if (observers != null) {
observers.add(listener);
}
domChanged(dom);
serviceChanged(serviceInfo);
}
public void removeListener(String dom, String clusters, EventListener listener) {
public void removeListener(String serviceName, String clusters, EventListener listener) {
String unit = "";
List<EventListener> observers = observerMap.get(Domain.getKey(dom, clusters, unit));
List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters, unit));
if (observers != null) {
Iterator<EventListener> iter = observers.iterator();
while (iter.hasNext()) {
@ -85,43 +86,43 @@ public class EventDispatcher {
}
}
public void domChanged(Domain dom) {
if (dom == null) {
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
changedDoms.add(dom);
changedServices.add(serviceInfo);
}
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
Domain dom = null;
ServiceInfo serviceInfo = null;
try {
dom = changedDoms.poll(5, TimeUnit.MINUTES);
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (dom == null) {
if (serviceInfo == null) {
continue;
}
try {
List<EventListener> listeners = observerMap.get(dom.getKey());
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(dom.getHosts());
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
if (!CollectionUtils.isEmpty(hosts)) {
listener.onEvent(new NamingEvent(dom.getName(), hosts));
listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
}
}
}
} catch (Exception e) {
LogUtils.LOG.error("NA", "notify error for dom: "
+ dom.getName() + ", clusters: " + dom.getClusters(), e);
LogUtils.LOG.error("NA", "notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}

View File

@ -16,8 +16,8 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.net.NamingProxy;
@ -41,7 +41,7 @@ public class HostReactor {
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private Map<String, Domain> domMap;
private Map<String, ServiceInfo> serviceInfoMap;
private PushRecver pushRecver;
@ -57,7 +57,7 @@ public class HostReactor {
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
this.domMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushRecver = new PushRecver(this);
}
@ -72,37 +72,37 @@ public class HostReactor {
}
});
public Map<String, Domain> getDomMap() {
return domMap;
public Map<String, ServiceInfo> getServiceInfoMap() {
return serviceInfoMap;
}
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
public Domain processDomJSON(String json) {
Domain domObj = JSON.parseObject(json, Domain.class);
Domain oldDom = domMap.get(domObj.getKey());
if (domObj.getHosts() == null || !domObj.validate()) {
public ServiceInfo processServiceJSON(String json) {
ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldDom;
return oldService;
}
if (oldDom != null) {
if (oldDom.getLastRefTime() > domObj.getLastRefTime()) {
LogUtils.LOG.warn("out of date data received, old-t: " + oldDom.getLastRefTime()
+ ", new-t: " + domObj.getLastRefTime());
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
LogUtils.LOG.warn("out of date data received, old-t: " + oldService.getLastRefTime()
+ ", new-t: " + serviceInfo.getLastRefTime());
}
domMap.put(domObj.getKey(), domObj);
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldDom.getHosts().size());
for (Instance host : oldDom.getHosts()) {
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(domObj.getHosts().size());
for (Instance host : domObj.getHosts()) {
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
@ -110,8 +110,8 @@ public class HostReactor {
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newDomHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newDomHosts) {
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
@ -141,174 +141,174 @@ public class HostReactor {
}
if (newHosts.size() > 0) {
LogUtils.LOG.info("new ips(" + newHosts.size() + ") dom: "
+ domObj.getName() + " -> " + JSON.toJSONString(newHosts));
LogUtils.LOG.info("new ips(" + newHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(newHosts));
}
if (remvHosts.size() > 0) {
LogUtils.LOG.info("removed ips(" + remvHosts.size() + ") dom: "
+ domObj.getName() + " -> " + JSON.toJSONString(remvHosts));
LogUtils.LOG.info("removed ips(" + remvHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(remvHosts));
}
if (modHosts.size() > 0) {
LogUtils.LOG.info("modified ips(" + modHosts.size() + ") dom: "
+ domObj.getName() + " -> " + JSON.toJSONString(modHosts));
LogUtils.LOG.info("modified ips(" + modHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(modHosts));
}
domObj.setJsonFromServer(json);
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.domChanged(domObj);
DiskCache.write(domObj, cacheDir);
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else {
LogUtils.LOG.info("new ips(" + domObj.ipCount() + ") dom: " + domObj.getName() + " -> " + JSON.toJSONString(domObj.getHosts()));
domMap.put(domObj.getKey(), domObj);
eventDispatcher.domChanged(domObj);
domObj.setJsonFromServer(json);
DiskCache.write(domObj, cacheDir);
LogUtils.LOG.info("new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
LogUtils.LOG.info("current ips:(" + domObj.ipCount() + ") dom: " + domObj.getName() +
" -> " + JSON.toJSONString(domObj.getHosts()));
LogUtils.LOG.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() +
" -> " + JSON.toJSONString(serviceInfo.getHosts()));
return domObj;
return serviceInfo;
}
private Domain getDom0(String dom, String clusters, String env) {
private ServiceInfo getSerivceInfo0(String serviceName, String clusters, String env) {
String key = Domain.getKey(dom, clusters, env, false);
String key = ServiceInfo.getKey(serviceName, clusters, env, false);
return domMap.get(key);
return serviceInfoMap.get(key);
}
private Domain getDom0(String dom, String clusters, String env, boolean allIPs) {
private ServiceInfo getSerivceInfo0(String serviceName, String clusters, String env, boolean allIPs) {
String key = Domain.getKey(dom, clusters, env, allIPs);
return domMap.get(key);
String key = ServiceInfo.getKey(serviceName, clusters, env, allIPs);
return serviceInfoMap.get(key);
}
public Domain getDom(String dom, String clusters, String env) {
return getDom(dom, clusters, env, false);
public ServiceInfo getServiceInfo(String serviceName, String clusters, String env) {
return getServiceInfo(serviceName, clusters, env, false);
}
public Domain getDom(String dom, String clusters) {
public ServiceInfo getServiceInfo(String serviceName, String clusters) {
String env = StringUtils.EMPTY;
return getDom(dom, clusters, env, false);
return getServiceInfo(serviceName, clusters, env, false);
}
public Domain getDom(final String dom, final String clusters, final String env, final boolean allIPs) {
public ServiceInfo getServiceInfo(final String serviceName, final String clusters, final String env, final boolean allIPs) {
LogUtils.LOG.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = Domain.getKey(dom, clusters, env, allIPs);
String key = ServiceInfo.getKey(serviceName, clusters, env, allIPs);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getDom(key);
return failoverReactor.getService(key);
}
Domain domObj = getDom0(dom, clusters, env, allIPs);
ServiceInfo serviceObj = getSerivceInfo0(serviceName, clusters, env, allIPs);
if (null == domObj) {
domObj = new Domain(dom, clusters, env);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters, env);
if (allIPs) {
domObj.setAllIPs(allIPs);
serviceObj.setAllIPs(allIPs);
}
domMap.put(domObj.getKey(), domObj);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
if (allIPs) {
updateDom4AllIPNow(dom, clusters, env);
updateService4AllIPNow(serviceName, clusters, env);
} else {
updateDomNow(dom, clusters, env);
updateServiceNow(serviceName, clusters, env);
}
} else if (domObj.getHosts().isEmpty()) {
} else if (serviceObj.getHosts().isEmpty()) {
if (updateHoldInterval > 0) {
// hold a moment waiting for update finish
synchronized (domObj) {
synchronized (serviceObj) {
try {
domObj.wait(updateHoldInterval);
serviceObj.wait(updateHoldInterval);
} catch (InterruptedException e) {
LogUtils.LOG.error("[getDom]", "dom:" + dom + ", clusters:" + clusters + ", allIPs:" + allIPs, e);
LogUtils.LOG.error("[getServiceInfo]", "serviceName:" + serviceName + ", clusters:" + clusters + ", allIPs:" + allIPs, e);
}
}
}
}
scheduleUpdateIfAbsent(dom, clusters, env, allIPs);
scheduleUpdateIfAbsent(serviceName, clusters, env, allIPs);
return domMap.get(domObj.getKey());
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String dom, String clusters, String env, boolean allIPs) {
if (futureMap.get(Domain.getKey(dom, clusters, env, allIPs)) != null) {
public void scheduleUpdateIfAbsent(String serviceName, String clusters, String env, boolean allIPs) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters, env, allIPs)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(Domain.getKey(dom, clusters, env, allIPs)) != null) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters, env, allIPs)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(dom, clusters, env, allIPs));
futureMap.put(Domain.getKey(dom, clusters, env, allIPs), future);
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters, env, allIPs));
futureMap.put(ServiceInfo.getKey(serviceName, clusters, env, allIPs), future);
}
}
public void updateDom4AllIPNow(String dom, String clusters, String env) {
updateDom4AllIPNow(dom, clusters, env, -1L);
public void updateService4AllIPNow(String serviceName, String clusters, String env) {
updateService4AllIPNow(serviceName, clusters, env, -1L);
}
@SuppressFBWarnings("NN_NAKED_NOTIFY")
public void updateDom4AllIPNow(String dom, String clusters, String env, long timeout) {
public void updateService4AllIPNow(String serviceName, String clusters, String env, long timeout) {
try {
Map<String, String> params = new HashMap<String, String>(8);
params.put("dom", dom);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
Domain oldDom = getDom0(dom, clusters, env, true);
if (oldDom != null) {
params.put("checksum", oldDom.getChecksum());
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters, env, true);
if (oldService != null) {
params.put("checksum", oldService.getChecksum());
}
String result = serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/api/srvAllIP", params);
if (StringUtils.isNotEmpty(result)) {
Domain domain = processDomJSON(result);
domain.setAllIPs(true);
ServiceInfo serviceInfo = processServiceJSON(result);
serviceInfo.setAllIPs(true);
}
if (oldDom != null) {
synchronized (oldDom) {
oldDom.notifyAll();
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
//else nothing has changed
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update dom: " + dom, e);
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
}
}
@SuppressFBWarnings("NN_NAKED_NOTIFY")
public void updateDomNow(String dom, String clusters, String env) {
Domain oldDom = getDom0(dom, clusters, env);
public void updateServiceNow(String serviceName, String clusters, String env) {
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters, env);
try {
Map<String, String> params = new HashMap<String, String>(8);
params.put("dom", dom);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("env", env);
params.put("clientIP", NetUtils.localIP());
StringBuilder stringBuilder = new StringBuilder();
for (String string : Balancer.UNCONSISTENT_DOM_WITH_ADDRESS_SERVER) {
for (String string : Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER) {
stringBuilder.append(string).append(",");
}
Balancer.UNCONSISTENT_DOM_WITH_ADDRESS_SERVER.clear();
Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER.clear();
params.put("unconsistentDom", stringBuilder.toString());
String envSpliter = ",";
@ -316,42 +316,42 @@ public class HostReactor {
params.put("useEnvId", "true");
}
if (oldDom != null) {
params.put("checksum", oldDom.getChecksum());
if (oldService != null) {
params.put("checksum", oldService.getChecksum());
}
String result = serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/api/srvIPXT", params);
if (StringUtils.isNotEmpty(result)) {
processDomJSON(result);
processServiceJSON(result);
}
//else nothing has changed
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update dom: " + dom, e);
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
} finally {
if (oldDom != null) {
synchronized (oldDom) {
oldDom.notifyAll();
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
public void refreshOnly(String dom, String clusters, String env, boolean allIPs) {
public void refreshOnly(String serviceName, String clusters, String env, boolean allIPs) {
try {
Map<String, String> params = new HashMap<String, String>(16);
params.put("dom", dom);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("unit", env);
params.put("clientIP", NetUtils.localIP());
String domSpliter = ",";
String serviceSpliter = ",";
StringBuilder stringBuilder = new StringBuilder();
for (String string : Balancer.UNCONSISTENT_DOM_WITH_ADDRESS_SERVER) {
stringBuilder.append(string).append(domSpliter);
for (String string : Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER) {
stringBuilder.append(string).append(serviceSpliter);
}
Balancer.UNCONSISTENT_DOM_WITH_ADDRESS_SERVER.clear();
Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER.clear();
params.put("unconsistentDom", stringBuilder.toString());
String envSpliter = ",";
@ -365,7 +365,7 @@ public class HostReactor {
serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/api/srvIPXT", params);
}
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update dom: " + dom, e);
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
}
}
@ -373,18 +373,18 @@ public class HostReactor {
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String dom;
private String serviceName;
private String env;
private boolean allIPs = false;
public UpdateTask(String dom, String clusters, String env) {
this.dom = dom;
public UpdateTask(String serviceName, String clusters, String env) {
this.serviceName = serviceName;
this.clusters = clusters;
this.env = env;
}
public UpdateTask(String dom, String clusters, String env, boolean allIPs) {
this.dom = dom;
public UpdateTask(String serviceName, String clusters, String env, boolean allIPs) {
this.serviceName = serviceName;
this.clusters = clusters;
this.env = env;
this.allIPs = allIPs;
@ -393,38 +393,38 @@ public class HostReactor {
@Override
public void run() {
try {
Domain domObj = domMap.get(Domain.getKey(dom, clusters, env, allIPs));
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters, env, allIPs));
if (domObj == null) {
if (serviceObj == null) {
if (allIPs) {
updateDom4AllIPNow(dom, clusters, env);
updateService4AllIPNow(serviceName, clusters, env);
} else {
updateDomNow(dom, clusters, env);
updateServiceNow(serviceName, clusters, env);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
return;
}
if (domObj.getLastRefTime() <= lastRefTime) {
if (serviceObj.getLastRefTime() <= lastRefTime) {
if (allIPs) {
updateDom4AllIPNow(dom, clusters, env);
domObj = domMap.get(Domain.getKey(dom, clusters, env, true));
updateService4AllIPNow(serviceName, clusters, env);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters, env, true));
} else {
updateDomNow(dom, clusters, env);
domObj = domMap.get(Domain.getKey(dom, clusters, env));
updateServiceNow(serviceName, clusters, env);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters, env));
}
} else {
// if dom already updated by push, we should not override it
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(dom, clusters, env, allIPs);
refreshOnly(serviceName, clusters, env, allIPs);
}
executor.schedule(this, domObj.getCacheMillis(), TimeUnit.MILLISECONDS);
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = domObj.getLastRefTime();
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
LogUtils.LOG.warn("NA", "failed to update dom: " + dom, e);
LogUtils.LOG.warn("NA", "failed to update serviceName: " + serviceName, e);
}
}

View File

@ -23,7 +23,6 @@ import com.alibaba.nacos.client.naming.utils.StringUtils;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.charset.Charset;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@ -78,8 +77,7 @@ public class PushRecver implements Runnable {
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type)) {
// dom update
hostReactor.processDomJSON(pushPacket.data);
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
@ -90,7 +88,7 @@ public class PushRecver implements Runnable {
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getDomMap()))
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only

View File

@ -16,8 +16,11 @@
package com.alibaba.nacos.client.naming.net;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.client.naming.utils.*;
import com.alibaba.nacos.common.util.UuidUtil;
@ -25,7 +28,10 @@ import java.io.IOException;
import java.io.StringReader;
import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* @author dungu.zpf
@ -227,6 +233,22 @@ public class NamingProxy {
}
}
public ListView<String> getServiceList(int pageNo, int pageSize) throws NacosException {
Map<String, String> params = new HashMap<String, String>(4);
params.put("pageNo", String.valueOf(pageNo));
params.put("pageSize", String.valueOf(pageSize));
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/service/list", params);
JSONObject json = JSON.parseObject(result);
ListView<String> listView = new ListView<>();
listView.setCount(json.getInteger("count"));
listView.setData(JSON.parseObject(json.getString("doms"), new TypeReference<List<String>>(){}));
return listView;
}
public String callAllServers(String api, Map<String, String> params) throws NacosException {
String result = "";

View File

@ -0,0 +1,16 @@
package com.alibaba.nacos.client;
import org.junit.Test;
/**
* @author dungu.zpf
*/
public class NamingTest {
@Test
public void testServiceList() {
}
}

View File

@ -15,14 +15,54 @@
*/
package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.naming.core.DomainsManager;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.BaseServlet;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
/**
* @author dungu.zpf
*/
@RestController
@RequestMapping("/service")
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service")
public class ServiceController {
@Autowired
protected DomainsManager domainsManager;
@RequestMapping(value = "/list", method = RequestMethod.GET)
public JSONObject list(HttpServletRequest request) throws Exception {
int pageNo = NumberUtils.toInt(BaseServlet.required(request, "pageNo"));
int pageSize = NumberUtils.toInt(BaseServlet.required(request, "pageSize"));
int start = (pageNo - 1) * pageSize;
int end = start + pageSize;
List<String> doms = domainsManager.getAllDomNamesList();
if (start < 0) {
start = 0;
}
if (end > doms.size()) {
end = doms.size();
}
JSONObject result = new JSONObject();
result.put("doms", doms.subList(start, end));
result.put("count", doms.size());
return result;
}
}

View File

@ -221,6 +221,10 @@ public class DomainsManager {
return new HashSet<String>(chooseDomMap().keySet());
}
public List<String> getAllDomNamesList() {
return new ArrayList<>(chooseDomMap().keySet());
}
public void setAllDomNames(List<String> allDomNames) {
this.allDomNames = new HashSet<>(allDomNames);
}

View File

@ -0,0 +1,56 @@
package com.alibaba.nacos.test.naming;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.naming.NamingApp;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author dungu.zpf
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NamingApp.class, properties = {"server.servlet.context-path=/nacos"},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ServiceListTest {
private NamingService naming;
@LocalServerPort
private int port;
@Before
public void init() throws Exception{
if (naming == null) {
TimeUnit.SECONDS.sleep(10);
naming = NamingFactory.createNamingService("127.0.0.1"+":"+port);
}
}
@Test
public void serviceList() throws NacosException {
naming.getServicesOfServer(1, 10);
}
@Test
public void getSubscribeServices() throws NacosException {
ListView<String> listView = naming.getServicesOfServer(1, 10);
if (listView != null && listView.getCount() > 0) {
naming.getAllInstances(listView.getData().get(0));
}
List<ServiceInfo> serviceInfoList = naming.getSubscribeServices();
System.out.println(serviceInfoList);
}
}