#498 Naming supports namespace

This commit is contained in:
nkorange 2018-12-25 16:48:27 +08:00
parent 2c2aff7f91
commit 478cb1533e
34 changed files with 750 additions and 950 deletions

View File

@ -128,6 +128,9 @@ public class Constants {
public static final String NAMING_DEFAULT_CLUSTER_NAME = "DEFAULT";
public static final String REQUEST_PARAM_TENANT_ID = "tid";
public static final String REQUEST_PARAM_NAMESPACE_ID = "namespaceId";
public static final String REQUEST_PARAM_DEFAULT_NAMESPACE_ID = "public";
public static final String REQUEST_PARAM_SERVICE_NAME = "serviceName";
public static final String REQUEST_PARAM_GROUP = "group";
public static final String REQUEST_PARAM_DEFAULT_GROUP = "DEFAULT_GROUP";
}

View File

@ -46,8 +46,6 @@ public class ServiceInfo {
private String checksum = "";
private String env = "";
private volatile boolean allIPs = false;
public ServiceInfo() {
@ -63,48 +61,22 @@ public class ServiceInfo {
public ServiceInfo(String key) {
int maxKeySectionCount = 4;
int allIpFlagIndex = 3;
int envIndex = 2;
int maxIndex = 2;
int clusterIndex = 1;
int serviceNameIndex = 0;
String[] keys = key.split(SPLITER);
if (keys.length >= maxKeySectionCount) {
if (keys.length >= maxIndex) {
this.name = keys[serviceNameIndex];
this.clusters = keys[clusterIndex];
this.env = keys[envIndex];
if (strEquals(keys[allIpFlagIndex], ALL_IPS)) {
this.setAllIPs(true);
}
} else if (keys.length >= allIpFlagIndex) {
this.name = keys[serviceNameIndex];
this.clusters = keys[clusterIndex];
if (strEquals(keys[envIndex], ALL_IPS)) {
this.setAllIPs(true);
} else {
this.env = keys[envIndex];
}
} else if (keys.length >= envIndex) {
this.name = keys[serviceNameIndex];
if (strEquals(keys[clusterIndex], ALL_IPS)) {
this.setAllIPs(true);
} else {
this.clusters = keys[clusterIndex];
}
}
this.name = keys[0];
}
public ServiceInfo(String name, String clusters) {
this(name, clusters, EMPTY);
}
public ServiceInfo(String name, String clusters, String env) {
this.name = name;
this.clusters = clusters;
this.env = env;
}
public int ipCount() {
@ -156,7 +128,6 @@ public class ServiceInfo {
}
public List<Instance> getHosts() {
return new ArrayList<Instance>(hosts);
}
@ -190,37 +161,25 @@ public class ServiceInfo {
@JSONField(serialize = false)
public String getKey() {
return getKey(name, clusters, env);
return getKey(name, clusters);
}
@JSONField(serialize = false)
public String getKeyEncoded() {
try {
return getKey(URLEncoder.encode(name, "UTF-8"), clusters, env);
return getKey(URLEncoder.encode(name, "UTF-8"), clusters);
} catch (UnsupportedEncodingException e) {
return getKey();
}
}
@JSONField(serialize = false)
public static String getKey(String name, String clusters, String unit) {
if (isEmpty(unit)) {
unit = EMPTY;
}
if (!isEmpty(clusters) && !isEmpty(unit)) {
return name + SPLITER + clusters + SPLITER + unit;
}
public static String getKey(String name, String clusters) {
if (!isEmpty(clusters)) {
return name + SPLITER + clusters;
}
if (!isEmpty(unit)) {
return name + SPLITER + EMPTY + SPLITER + unit;
}
return name;
}

View File

@ -189,8 +189,7 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException {
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","),
StringUtils.EMPTY);
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
@ -207,8 +206,7 @@ public class NacosNamingService implements NamingService {
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy)
throws NacosException {
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","),
StringUtils.EMPTY);
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();

View File

@ -15,15 +15,10 @@
*/
package com.alibaba.nacos.client.naming.beat;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.util.HttpMethod;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
@ -93,20 +88,9 @@ public class BeatReactor {
@Override
public void run() {
Map<String, String> params = new HashMap<String, String>(2);
params.put("beat", JSON.toJSONString(beatInfo));
params.put("serviceName", beatInfo.getServiceName());
try {
String result = serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/health", params, HttpMethod.POST);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
clientBeatInterval = jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
long result = serverProxy.sendBeat(beatInfo);
if (result > 0) {
clientBeatInterval = result;
}
}
}

View File

@ -57,16 +57,12 @@ public class EventDispatcher {
}
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
addListener(serviceInfo, clusters, StringUtils.EMPTY, listener);
}
public void addListener(ServiceInfo serviceInfo, String clusters, String env, EventListener listener) {
LogUtils.LOG.info("LISTENER", "adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters, env), observers);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
@ -77,9 +73,8 @@ public class EventDispatcher {
public void removeListener(String serviceName, String clusters, EventListener listener) {
LogUtils.LOG.info("LISTENER", "removing " + serviceName + " with " + clusters + " from listener map");
String unit = "";
List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters, unit));
List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
if (observers != null) {
Iterator<EventListener> iter = observers.iterator();
while (iter.hasNext()) {

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
@ -188,41 +189,30 @@ public class HostReactor {
return serviceInfo;
}
private ServiceInfo getSerivceInfo0(String serviceName, String clusters, String env) {
private ServiceInfo getSerivceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters, env);
String key = ServiceInfo.getKey(serviceName, clusters);
return serviceInfoMap.get(key);
}
private ServiceInfo getSerivceInfo0(String serviceName, String clusters, String env, boolean allIPs) {
String key = ServiceInfo.getKey(serviceName, clusters, env);
return serviceInfoMap.get(key);
}
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
return getServiceInfo(serviceName, clusters, StringUtils.EMPTY);
}
public ServiceInfo getServiceInfo(final String serviceName, final String clusters, final String env) {
LogUtils.LOG.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters, env);
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getSerivceInfo0(serviceName, clusters, env);
ServiceInfo serviceObj = getSerivceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters, env);
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters, env);
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
@ -240,94 +230,35 @@ public class HostReactor {
}
}
scheduleUpdateIfAbsent(serviceName, clusters, env);
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters, String env) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters, env)) != null) {
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters, env)) != null) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters, env));
futureMap.put(ServiceInfo.getKey(serviceName, clusters, env), future);
}
}
public void updateService4AllIPNow(String serviceName, String clusters, String env) {
updateService4AllIPNow(serviceName, clusters, env, -1L);
}
@SuppressFBWarnings("NN_NAKED_NOTIFY")
public void updateService4AllIPNow(String serviceName, String clusters, String env, long timeout) {
try {
Map<String, String> params = new HashMap<String, String>(8);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters, env, true);
if (oldService != null) {
params.put("checksum", oldService.getChecksum());
}
String result = serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/api/srvAllIP", params);
if (StringUtils.isNotEmpty(result)) {
ServiceInfo serviceInfo = processServiceJSON(result);
serviceInfo.setAllIPs(true);
}
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
//else nothing has changed
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
@SuppressFBWarnings("NN_NAKED_NOTIFY")
public void updateServiceNow(String serviceName, String clusters, String env) {
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters, env);
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters);
try {
Map<String, String> params = new HashMap<String, String>(8);
params.put("serviceName", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("env", env);
params.put("clientIP", NetUtils.localIP());
StringBuilder stringBuilder = new StringBuilder();
for (String string : Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER) {
stringBuilder.append(string).append(",");
}
Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER.clear();
params.put("unconsistentDom", stringBuilder.toString());
String envSpliter = ",";
if (!StringUtils.isEmpty(env) && !env.contains(envSpliter)) {
params.put("useEnvId", "true");
}
if (oldService != null) {
params.put("checksum", oldService.getChecksum());
}
String result = serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params);
String result = serverProxy.queryList(serviceName, clusters, pushRecver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
//else nothing has changed
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
} finally {
@ -339,30 +270,9 @@ public class HostReactor {
}
}
public void refreshOnly(String serviceName, String clusters, String env) {
public void refreshOnly(String serviceName, String clusters) {
try {
Map<String, String> params = new HashMap<String, String>(16);
params.put("serviceName", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("unit", env);
params.put("clientIP", NetUtils.localIP());
String serviceSpliter = ",";
StringBuilder stringBuilder = new StringBuilder();
for (String string : Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER) {
stringBuilder.append(string).append(serviceSpliter);
}
Balancer.UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER.clear();
params.put("unconsistentDom", stringBuilder.toString());
String envSpliter = ",";
if (!env.contains(envSpliter)) {
params.put("useEnvId", "true");
}
serverProxy.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params);
serverProxy.queryList(serviceName, clusters, pushRecver.getUDPPort(), false);
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
}
@ -372,32 +282,30 @@ public class HostReactor {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
private String env;
public UpdateTask(String serviceName, String clusters, String env) {
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
this.env = env;
}
@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters, env));
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters, env);
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters, env);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters, env));
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters, env);
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.api.selector.ExpressionSelector;
import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.utils.*;
import com.alibaba.nacos.common.util.HttpMethod;
import com.alibaba.nacos.common.util.UuidUtils;
@ -45,7 +46,7 @@ public class NamingProxy {
private static final int DEFAULT_SERVER_PORT = 8848;
private String namespace;
private String namespaceId;
private String endpoint;
@ -61,9 +62,9 @@ public class NamingProxy {
private ScheduledExecutorService executorService;
public NamingProxy(String namespace, String endpoint, String serverList) {
public NamingProxy(String namespaceId, String endpoint, String serverList) {
this.namespace = namespace;
this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
@ -159,7 +160,7 @@ public class NamingProxy {
LogUtils.LOG.info("REGISTER-SERVICE", "registering service " + serviceName + " with instance:" + instance);
final Map<String, String> params = new HashMap<String, String>(8);
params.put("tenant", namespace);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
@ -178,7 +179,7 @@ public class NamingProxy {
+ " with instance:" + ip + ":" + port + "@" + cluster);
final Map<String, String> params = new HashMap<String, String>(8);
params.put("tenant", namespace);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("ip", ip);
params.put("port", String.valueOf(port));
params.put("serviceName", serviceName);
@ -187,17 +188,41 @@ public class NamingProxy {
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.DELETE);
}
public String queryList(String serviceName, String clusters, boolean healthyOnly) throws NacosException {
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put("tenant", namespace);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("serviceName", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
public long sendBeat(BeatInfo beatInfo) {
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("serviceName", beatInfo.getServiceName());
try {
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/health", params, HttpMethod.POST);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
public boolean serverHealthy() {
try {
@ -218,6 +243,7 @@ public class NamingProxy {
Map<String, String> params = new HashMap<String, String>(4);
params.put("pageNo", String.valueOf(pageNo));
params.put("pageSize", String.valueOf(pageSize));
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
if (selector != null) {
switch (SelectorType.valueOf(selector.getType())) {
@ -243,27 +269,6 @@ public class NamingProxy {
return listView;
}
public String callAllServers(String api, Map<String, String> params) throws NacosException {
String result = "";
List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}
try {
result = reqAPI(api, params, snapshot);
} catch (Exception e) {
LogUtils.LOG.error("NA", "req api:" + api + " failed, servers: " + snapshot, e);
}
if (StringUtils.isNotEmpty(result)) {
return result;
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all sites(" + snapshot + ") tried");
}
public String reqAPI(String api, Map<String, String> params) throws NacosException {
@ -330,7 +335,7 @@ public class NamingProxy {
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(Constants.REQUEST_PARAM_TENANT_ID, getTenantId());
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
@ -367,11 +372,7 @@ public class NamingProxy {
}
private String getTenantId() {
if (UtilAndComs.DEFAULT_NAMESPACE_ID.equals(namespace)) {
return StringUtils.EMPTY;
}
return namespace;
public String getNamespaceId() {
return namespaceId;
}
}

View File

@ -34,7 +34,7 @@ public class UtilAndComs {
public static final String NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
public static final String DEFAULT_NAMESPACE_ID = "default";
public static final String DEFAULT_NAMESPACE_ID = "public";
public static final int REQUEST_DOMAIN_RETRY_COUNT = 3;

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.acl;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.Domain;
import com.alibaba.nacos.naming.core.DomainsManager;
@ -56,6 +57,9 @@ public class AuthChecker {
}
public void doAuth(Map<String, String[]> params, HttpServletRequest req) throws Exception {
String namespaceId = WebUtils.optional(req, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.optional(req, "name", "");
if (StringUtils.isEmpty(dom)) {
dom = WebUtils.optional(req, "dom", "");
@ -71,7 +75,7 @@ public class AuthChecker {
// we consider switch is a kind of special domain
domObj = Switch.getDom();
} else {
domObj = domainsManager.getDomain(dom);
domObj = domainsManager.getDomain(namespaceId, dom);
}
if (domObj == null) {

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
@ -64,6 +65,8 @@ public class CatalogController {
@RequestMapping(value = "/serviceList")
public JSONObject serviceList(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
JSONObject result = new JSONObject();
int page = Integer.parseInt(WebUtils.required(request, "startPg"));
@ -71,7 +74,7 @@ public class CatalogController {
String keyword = WebUtils.optional(request, "keyword", StringUtils.EMPTY);
List<Domain> doms = new ArrayList<>();
int total = domainsManager.getPagedDom(page - 1, pageSize, keyword, doms);
int total = domainsManager.getPagedDom(namespaceId, page - 1, pageSize, keyword, doms);
if (CollectionUtils.isEmpty(doms)) {
result.put("serviceList", Collections.emptyList());
@ -109,8 +112,10 @@ public class CatalogController {
@RequestMapping(value = "/serviceDetail")
public ServiceDetailView serviceDetail(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, "serviceName");
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.NOT_FOUND, "serivce " + serviceName + " is not found!");
}
@ -164,12 +169,14 @@ public class CatalogController {
@RequestMapping(value = "/instanceList")
public JSONObject instanceList(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, "serviceName");
String clusterName = WebUtils.required(request, "clusterName");
int page = Integer.parseInt(WebUtils.required(request, "startPg"));
int pageSize = Integer.parseInt(WebUtils.required(request, "pgSize"));
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.NOT_FOUND, "serivce " + serviceName + " is not found!");
}
@ -218,10 +225,12 @@ public class CatalogController {
@RequestMapping(value = "/services", method = RequestMethod.GET)
public List<ServiceDetailInfo> listDetail(HttpServletRequest request) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
List<ServiceDetailInfo> serviceDetailInfoList = new ArrayList<>();
domainsManager
.getRaftDomMap()
.getDomMap(namespaceId)
.forEach(
(serviceName, domain) -> {

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.Cluster;
@ -48,6 +49,8 @@ public class ClusterController {
@RequestMapping(value = {"/update", "/add"}, method = RequestMethod.POST)
public String update(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String clusterName = WebUtils.required(request, "clusterName");
String serviceName = WebUtils.required(request, "serviceName");
String healthChecker = WebUtils.required(request, "healthChecker");
@ -55,7 +58,7 @@ public class ClusterController {
String checkPort = WebUtils.required(request, "checkPort");
String useInstancePort4Check = WebUtils.required(request, "useInstancePort4Check");
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service not found:" + serviceName);
}

View File

@ -16,10 +16,8 @@
package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.ApiCommands;
import com.alibaba.nacos.naming.web.OverrideParameterRequestWrapper;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@ -36,6 +34,6 @@ public class HealthController extends ApiCommands {
@RequestMapping(value = "", method = RequestMethod.POST)
public JSONObject update(HttpServletRequest request) throws Exception {
return clientBeat(OverrideParameterRequestWrapper.buildRequest(request, "dom", WebUtils.required(request, "serviceName")));
return clientBeat(request);
}
}

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
@ -50,10 +51,9 @@ public class InstanceController extends ApiCommands {
// set service info:
if (StringUtils.isNotEmpty(serviceJson)) {
JSONObject service = JSON.parseObject(serviceJson);
requestWrapper.addParameter("dom", service.getString("name"));
} else {
requestWrapper.addParameter("dom", WebUtils.required(request, "serviceName"));
requestWrapper.addParameter("serviceName", service.getString("name"));
}
return regService(requestWrapper);
}
@ -64,7 +64,7 @@ public class InstanceController extends ApiCommands {
@RequestMapping(value = {"/instance/update", "instance"}, method = RequestMethod.POST)
public String update(HttpServletRequest request) throws Exception {
return regService(OverrideParameterRequestWrapper.buildRequest(request, "dom", WebUtils.required(request, "serviceName")));
return regService(request);
}
@RequestMapping(value = {"/instances", "/instance/list"}, method = RequestMethod.GET)
@ -75,12 +75,14 @@ public class InstanceController extends ApiCommands {
@RequestMapping(value = "/instance", method = RequestMethod.GET)
public JSONObject queryDetail(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, "serviceName");
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
String cluster = WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.required(request, "ip");
int port = Integer.parseInt(WebUtils.required(request, "port"));
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.NOT_FOUND, "no dom " + serviceName + " found!");
}

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.DomainsManager;
@ -50,9 +51,13 @@ public class ServiceController {
@RequestMapping(value = "", method = RequestMethod.PUT)
public String create(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, "serviceName");
if (domainsManager.getDomain(serviceName) != null) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
if (domainsManager.getDomain(namespaceId, serviceName) != null) {
throw new IllegalArgumentException("specified service already exists, serviceName : " + serviceName);
}
@ -73,6 +78,7 @@ public class ServiceController {
domObj.setEnableClientBeat(HealthCheckMode.client.name().equals(healthCheckMode.toLowerCase()));
domObj.setMetadata(metadataMap);
domObj.setSelector(parseSelector(selector));
domObj.setNamespaceId(namespaceId);
// now valid the dom. if failed, exception will be thrown
domObj.setLastModifiedMillis(System.currentTimeMillis());
@ -87,9 +93,11 @@ public class ServiceController {
@RequestMapping(value = "", method = RequestMethod.DELETE)
public String remove(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, "serviceName");
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
VirtualClusterDomain service = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain service = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (service == null) {
throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
}
@ -98,7 +106,7 @@ public class ServiceController {
throw new IllegalArgumentException("specified service has instances, serviceName : " + serviceName);
}
domainsManager.easyRemoveDom(serviceName);
domainsManager.easyRemoveDom(namespaceId, serviceName);
return "ok";
}
@ -106,14 +114,18 @@ public class ServiceController {
@RequestMapping(value = "", method = RequestMethod.GET)
public JSONObject detail(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, "serviceName");
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.NOT_FOUND, "serivce " + serviceName + " is not found!");
}
JSONObject res = new JSONObject();
res.put("name", serviceName);
res.put("namespaceId", domain.getNamespaceId());
res.put("protectThreshold", domain.getProtectThreshold());
res.put("healthCheckMode", HealthCheckMode.none.name());
@ -138,9 +150,11 @@ public class ServiceController {
int pageNo = NumberUtils.toInt(WebUtils.required(request, "pageNo"));
int pageSize = NumberUtils.toInt(WebUtils.required(request, "pageSize"));
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String selectorString = WebUtils.optional(request, "selector", StringUtils.EMPTY);
List<String> doms = domainsManager.getAllDomNamesList();
List<String> doms = domainsManager.getAllDomNamesList(namespaceId);
if (StringUtils.isNotBlank(selectorString)) {
@ -159,10 +173,10 @@ public class ServiceController {
String[] factors = terms[0].split("\\.");
switch (factors[0]) {
case "INSTANCE":
doms = filterInstanceMetadata(doms, factors[factors.length - 1], terms[1].replace("'", ""));
doms = filterInstanceMetadata(namespaceId, doms, factors[factors.length - 1], terms[1].replace("'", ""));
break;
case "SERVICE":
doms = filterServiceMetadata(doms, factors[factors.length - 1], terms[1].replace("'", ""));
doms = filterServiceMetadata(namespaceId, doms, factors[factors.length - 1], terms[1].replace("'", ""));
break;
default:
break;
@ -196,13 +210,15 @@ public class ServiceController {
@RequestMapping(value = "", method = RequestMethod.POST)
public String update(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, "serviceName");
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String serviceName = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
float protectThreshold = NumberUtils.toFloat(WebUtils.required(request, "protectThreshold"));
String healthCheckMode = WebUtils.required(request, "healthCheckMode");
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
String selector = WebUtils.optional(request, "selector", StringUtils.EMPTY);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(serviceName);
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, serviceName);
if (domain == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service " + serviceName + " not found!");
}
@ -238,11 +254,11 @@ public class ServiceController {
return "ok";
}
private List<String> filterInstanceMetadata(List<String> serivces, String key, String value) {
private List<String> filterInstanceMetadata(String namespaceId, List<String> serivces, String key, String value) {
List<String> filteredServices = new ArrayList<>();
for (String service : serivces) {
VirtualClusterDomain serviceObj = (VirtualClusterDomain) domainsManager.getDomain(service);
VirtualClusterDomain serviceObj = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, service);
if (serviceObj == null) {
continue;
}
@ -256,11 +272,11 @@ public class ServiceController {
return filteredServices;
}
private List<String> filterServiceMetadata(List<String> serivces, String key, String value) {
private List<String> filterServiceMetadata(String namespaceId, List<String> serivces, String key, String value) {
List<String> filteredServices = new ArrayList<>();
for (String service : serivces) {
VirtualClusterDomain serviceObj = (VirtualClusterDomain) domainsManager.getDomain(service);
VirtualClusterDomain serviceObj = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, service);
if (serviceObj == null) {
continue;
}

View File

@ -27,7 +27,6 @@ import com.alibaba.nacos.naming.raft.RaftCore;
import com.alibaba.nacos.naming.raft.RaftListener;
import com.alibaba.nacos.naming.raft.RaftPeer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -44,7 +43,10 @@ import java.util.concurrent.locks.ReentrantLock;
@Component
public class DomainsManager {
private Map<String, Domain> raftDomMap = new ConcurrentHashMap<>();
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Domain>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<DomainKey> toBeUpdatedDomsQueue = new LinkedBlockingDeque<>(1024 * 1024);
@ -69,7 +71,7 @@ public class DomainsManager {
* thread pool that processes getting domain detail from other server asynchronously
*/
private ExecutorService domainUpdateExecutor
= Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() {
= Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@ -79,8 +81,8 @@ public class DomainsManager {
}
});
public Map<String, Domain> chooseDomMap() {
return raftDomMap;
public Map<String, Domain> chooseDomMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
private void initConfig() {
@ -109,13 +111,13 @@ public class DomainsManager {
}
public void addUpdatedDom2Queue(String domName, String serverIP, String checksum) {
public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) {
lock.lock();
try {
toBeUpdatedDomsQueue.offer(new DomainKey(domName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
toBeUpdatedDomsQueue.offer(new DomainKey(namespaceId, domName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
} catch (Exception e) {
toBeUpdatedDomsQueue.poll();
toBeUpdatedDomsQueue.add(new DomainKey(domName, serverIP, checksum));
toBeUpdatedDomsQueue.add(new DomainKey(namespaceId, domName, serverIP, checksum));
Loggers.SRV_LOG.error("DOMAIN-STATUS", "Failed to add domain to be updatd to queue.", e);
} finally {
lock.unlock();
@ -129,7 +131,6 @@ public class DomainsManager {
public void run() {
String domName = null;
String serverIP = null;
String checksum;
try {
while (true) {
@ -147,9 +148,8 @@ public class DomainsManager {
domName = domainKey.getDomName();
serverIP = domainKey.getServerIP();
checksum = domainKey.getChecksum();
domainUpdateExecutor.execute(new DomUpdater(domName, serverIP));
domainUpdateExecutor.execute(new DomUpdater(domainKey.getNamespaceId(), domName, serverIP));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("UPDATE-DOMAIN", "Exception while update dom: " + domName + "from " + serverIP, e);
@ -158,10 +158,13 @@ public class DomainsManager {
}
private class DomUpdater implements Runnable {
String namespaceId;
String domName;
String serverIP;
public DomUpdater(String domName, String serverIP) {
public DomUpdater(String namespaceId, String domName, String serverIP) {
this.namespaceId = namespaceId;
this.domName = domName;
this.serverIP = serverIP;
}
@ -169,15 +172,15 @@ public class DomainsManager {
@Override
public void run() {
try {
updatedDom2(domName, serverIP);
updatedDom2(namespaceId, domName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("DOMAIN-UPDATER", "Exception while update dom: " + domName + "from " + serverIP, e);
}
}
}
public void updatedDom2(String domName, String serverIP) {
Message msg = synchronizer.get(serverIP, domName);
public void updatedDom2(String namespaceId, String domName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, domName));
JSONObject dom = JSON.parseObject(msg.getData());
JSONArray ipList = dom.getJSONArray("ips");
@ -189,7 +192,7 @@ public class DomainsManager {
ipsMap.put(strings[0], strings[1]);
}
VirtualClusterDomain raftVirtualClusterDomain = (VirtualClusterDomain) raftDomMap.get(domName);
VirtualClusterDomain raftVirtualClusterDomain = (VirtualClusterDomain) getDomain(namespaceId, domName);
if (raftVirtualClusterDomain == null) {
return;
@ -202,12 +205,12 @@ public class DomainsManager {
if (valid != ipAddress.isValid()) {
ipAddress.setValid(valid);
Loggers.EVT_LOG.info("{" + domName + "} {SYNC} " +
"{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp()
+ ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName());
"{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp()
+ ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName());
}
}
PushService.domChanged(raftVirtualClusterDomain.getName());
PushService.domChanged(raftVirtualClusterDomain.getNamespaceId(), raftVirtualClusterDomain.getName());
StringBuilder stringBuilder = new StringBuilder();
List<IpAddress> allIps = raftVirtualClusterDomain.allIPs();
for (IpAddress ipAddress : allIps) {
@ -218,68 +221,72 @@ public class DomainsManager {
}
public Set<String> getAllDomNames() {
return new HashSet<String>(chooseDomMap().keySet());
public Set<String> getAllDomNames(String namespaceId) {
return serviceMap.get(namespaceId).keySet();
}
public List<String> getAllDomNamesList() {
return new ArrayList<>(chooseDomMap().keySet());
}
public Map<String, Set<String>> getAllDomNames() {
public void setAllDomNames(List<String> allDomNames) {
this.allDomNames = new HashSet<>(allDomNames);
}
public Set<String> getAllDomNamesCache() {
if (Switch.isAllDomNameCache()) {
if (CollectionUtils.isNotEmpty(allDomNames)) {
return allDomNames;
} else {
allDomNames = getAllDomNames();
}
} else {
return getAllDomNames();
Map<String, Set<String>> namesMap = new HashMap<>(16);
for (String namespaceId : serviceMap.keySet()) {
namesMap.put(namespaceId, serviceMap.get(namespaceId).keySet());
}
return allDomNames;
return namesMap;
}
private Set<String> allDomNames;
public List<String> getAllDomNamesList(String namespaceId) {
return new ArrayList<>(chooseDomMap(namespaceId).keySet());
}
public List<Domain> getResponsibleDoms() {
List<Domain> result = new ArrayList<>();
Map<String, Domain> domainMap = chooseDomMap();
for (Map.Entry<String, Domain> entry : domainMap.entrySet()) {
Domain domain = entry.getValue();
if (DistroMapper.responsible(entry.getKey())) {
result.add(domain);
public Map<String, Set<Domain>> getResponsibleDoms() {
Map<String, Set<Domain>> result = new HashMap<>(16);
for (String namespaceId : serviceMap.keySet()) {
result.put(namespaceId, new HashSet<>());
for (Map.Entry<String, Domain> entry : serviceMap.get(namespaceId).entrySet()) {
Domain domain = entry.getValue();
if (DistroMapper.responsible(entry.getKey())) {
result.get(namespaceId).add(domain);
}
}
}
return result;
}
public int getResponsibleDomCount() {
int domCount = 0;
for (String namespaceId : serviceMap.keySet()) {
for (Map.Entry<String, Domain> entry : serviceMap.get(namespaceId).entrySet()) {
if (DistroMapper.responsible(entry.getKey())) {
domCount ++;
}
}
}
return domCount;
}
public int getResponsibleIPCount() {
List<Domain> responsibleDoms = getResponsibleDoms();
Map<String, Set<Domain>> responsibleDoms = getResponsibleDoms();
int count = 0;
for (Domain domain : responsibleDoms) {
count += domain.allIPs().size();
for (String namespaceId : responsibleDoms.keySet()) {
for (Domain domain : responsibleDoms.get(namespaceId)) {
count += domain.allIPs().size();
}
}
return count;
}
public void easyRemoveDom(String domName) throws Exception {
public void easyRemoveDom(String namespaceId, String serviceName) throws Exception {
Domain dom = getDomain(namespaceId, serviceName);
Domain dom = raftDomMap.get(domName);
if (dom != null) {
RaftCore.signalDelete(UtilsAndCommons.getDomStoreKey(dom));
}
}
public void easyAddOrReplaceDom(Domain newDom) throws Exception {
VirtualClusterDomain virtualClusterDomain = null;
VirtualClusterDomain virtualClusterDomain;
if (newDom instanceof VirtualClusterDomain) {
virtualClusterDomain = (VirtualClusterDomain) newDom;
newDom = virtualClusterDomain;
@ -287,9 +294,9 @@ public class DomainsManager {
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom));
}
public void easyAddIP4Dom(String domName, List<IpAddress> ips, long timestamp, long term) throws Exception {
public void easyAddIP4Dom(String namespaceId, String domName, List<IpAddress> ips, long timestamp, long term) throws Exception {
VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap().get(domName);
VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap(namespaceId).get(domName);
if (dom == null) {
throw new IllegalArgumentException("dom doesn't exist: " + domName);
}
@ -323,7 +330,7 @@ public class DomainsManager {
cluster.setDom(dom);
dom.getClusterMap().put(ipAddress.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: " + ipAddress.getClusterName() + " not found, ip: " + ipAddress.toJSON()
+ ", will create new cluster with default configuration.");
+ ", will create new cluster with default configuration.");
}
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
@ -331,7 +338,7 @@ public class DomainsManager {
if (ipAddressMap.size() <= 0) {
throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: "
+ JSON.toJSONString(ipAddressMap.values()));
+ JSON.toJSONString(ipAddressMap.values()));
}
String key = UtilsAndCommons.getIPListStoreKey(dom);
@ -378,15 +385,15 @@ public class DomainsManager {
return ipAddresses;
}
public void easyRemvIP4Dom(String domName, List<IpAddress> ips) throws Exception {
Lock lock = dom2LockMap.get(domName);
public void easyRemvIP4Dom(String namespaceId, String domName, List<IpAddress> ips) throws Exception {
Lock lock = dom2LockMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, domName));
if (lock == null) {
throw new IllegalStateException("no lock for " + domName + ", operation is disabled now.");
}
try {
lock.lock();
Domain dom = chooseDomMap().get(domName);
Domain dom = chooseDomMap(namespaceId).get(domName);
if (dom == null) {
throw new IllegalArgumentException("domain doesn't exist: " + domName);
}
@ -419,13 +426,24 @@ public class DomainsManager {
}
}
public Domain getDomain(String domName) {
return chooseDomMap().get(domName);
public Domain getDomain(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return serviceMap.get(namespaceId).get(serviceName);
}
public List<Domain> searchDomains(String regex) {
public void putDomain(VirtualClusterDomain domain) {
if (!serviceMap.containsKey(domain.getNamespaceId())) {
serviceMap.put(domain.getNamespaceId(), new ConcurrentHashMap<>(16));
}
serviceMap.get(domain.getNamespaceId()).put(domain.getName(), domain);
}
public List<Domain> searchDomains(String namespaceId, String regex) {
List<Domain> result = new ArrayList<Domain>();
for (Map.Entry<String, Domain> entry : chooseDomMap().entrySet()) {
for (Map.Entry<String, Domain> entry : chooseDomMap(namespaceId).entrySet()) {
Domain dom = entry.getValue();
String key = dom.getName() + ":" + ArrayUtils.toString(dom.getOwners());
@ -438,32 +456,34 @@ public class DomainsManager {
}
public int getDomCount() {
return chooseDomMap().size();
int domCount = 0;
for (String namespaceId : serviceMap.keySet()) {
domCount += serviceMap.get(namespaceId).size();
}
return domCount;
}
public int getIPCount() {
public int getInstanceCount() {
int total = 0;
List<String> doms = new ArrayList<String>(getAllDomNames());
for (String dom : doms) {
Domain domain = getDomain(dom);
total += (domain.allIPs().size());
for (String namespaceId : serviceMap.keySet()) {
for (Domain domain : serviceMap.get(namespaceId).values()) {
total += domain.allIPs().size();
}
}
return total;
}
public Map<String, Domain> getRaftDomMap() {
return raftDomMap;
public Map<String, Domain> getDomMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
public int getPagedDom(int startPage, int pageSize, String keyword, List<Domain> domainList) {
public int getPagedDom(String namespaceId, int startPage, int pageSize, String keyword, List<Domain> domainList) {
List<Domain> matchList;
if (StringUtils.isNotBlank(keyword)) {
matchList = searchDomains(".*" + keyword + ".*");
matchList = searchDomains(namespaceId, ".*" + keyword + ".*");
} else {
matchList = new ArrayList<Domain>(chooseDomMap().values());
matchList = new ArrayList<Domain>(chooseDomMap(namespaceId).values());
}
if (pageSize >= matchList.size()) {
@ -487,8 +507,14 @@ public class DomainsManager {
}
public static class DomainChecksum {
public String namespaceId;
public Map<String, String> domName2Checksum = new HashMap<String, String>();
public DomainChecksum(String namespaceId) {
this.namespaceId = namespaceId;
}
public void addItem(String domName, String checksum) {
if (StringUtils.isEmpty(domName) || StringUtils.isEmpty(checksum)) {
Loggers.SRV_LOG.warn("DOMAIN-CHECKSUM", "domName or checksum is empty,domName: " + domName + " checksum: " + checksum);
@ -505,46 +531,50 @@ public class DomainsManager {
public void run() {
try {
DomainChecksum checksum = new DomainChecksum();
List<String> allDomainNames = new ArrayList<String>(getAllDomNames());
Map<String, Set<String>> allDomainNames = getAllDomNames();
if (allDomainNames.size() <= 0) {
//ignore
return;
}
for (String domName : allDomainNames) {
if (!DistroMapper.responsible(domName)) {
continue;
for (String namespaceId : allDomainNames.keySet()) {
DomainChecksum checksum = new DomainChecksum(namespaceId);
for (String domName : allDomainNames.get(namespaceId)) {
if (!DistroMapper.responsible(domName)) {
continue;
}
Domain domain = getDomain(namespaceId, domName);
if (domain == null || domain instanceof SwitchDomain) {
continue;
}
domain.recalculateChecksum();
checksum.addItem(domName, domain.getChecksum());
}
Domain domain = getDomain(domName);
Message msg = new Message();
if (domain == null || domain instanceof SwitchDomain) {
continue;
msg.setData(JSON.toJSONString(checksum));
List<String> sameSiteServers = NamingProxy.getSameSiteServers().get("sameSite");
if (sameSiteServers == null || sameSiteServers.size() <= 0 || !NamingProxy.getServers().contains(NetUtils.localServer())) {
return;
}
domain.recalculateChecksum();
checksum.addItem(domName, domain.getChecksum());
}
Message msg = new Message();
msg.setData(JSON.toJSONString(checksum));
List<String> sameSiteServers = NamingProxy.getSameSiteServers().get("sameSite");
if (sameSiteServers == null || sameSiteServers.size() <= 0 || !NamingProxy.getServers().contains(NetUtils.localServer())) {
return;
}
for (String server : sameSiteServers) {
if (server.equals(NetUtils.localServer())) {
continue;
for (String server : sameSiteServers) {
if (server.equals(NetUtils.localServer())) {
continue;
}
synchronizer.send(server, msg);
}
synchronizer.send(server, msg);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("DOMAIN-STATUS", "Exception while sending domain status: ", e);
@ -602,33 +632,26 @@ public class DomainsManager {
throw new IllegalStateException("dom parsing failed, json: " + value);
}
if (StringUtils.isBlank(dom.getNamespaceId())) {
dom.setNamespaceId(UtilsAndCommons.getDefaultNamespaceId());
}
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key:" + key + ", value:" + value);
Domain oldDom = raftDomMap.get(dom.getName());
Domain oldDom = getDomain(dom.getNamespaceId(), dom.getName());
if (oldDom != null) {
oldDom.update(dom);
} else {
if (!dom2LockMap.containsKey(dom.getName())) {
dom2LockMap.put(dom.getName(), new ReentrantLock());
}
addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName()));
raftDomMap.put(dom.getName(), dom);
putDomain(dom);
dom.init();
Loggers.SRV_LOG.info("[NEW-DOM-raft] " + dom.toJSON());
}
Lock lock = dom2LockMap.get(dom.getName());
Condition condition = dom2ConditionMap.get(dom.getName());
try {
lock.lock();
condition.signalAll();
} catch (Exception ignore) {
} finally {
lock.unlock();
}
wakeUp(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName()));
} catch (Throwable e) {
Loggers.SRV_LOG.error("VIPSRV-DOM", "error while processing dom update", e);
@ -637,8 +660,10 @@ public class DomainsManager {
@Override
public void onDelete(String key, String value) throws Exception {
String name = StringUtils.removeStart(key, UtilsAndCommons.DOMAINS_DATA_ID + ".");
Domain dom = raftDomMap.remove(name);
String domKey = StringUtils.removeStart(key, UtilsAndCommons.DOMAINS_DATA_ID + ".");
String namespace = domKey.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[0];
String name = domKey.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[1];
Domain dom = chooseDomMap(namespace).remove(name);
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is deleted, key:" + key + ", value:" + value);
if (dom != null) {
@ -651,19 +676,38 @@ public class DomainsManager {
}
public Lock addLock(String domName) {
public void wakeUp(String key) {
Lock lock = dom2LockMap.get(key);
Condition condition = dom2ConditionMap.get(key);
try {
lock.lock();
condition.signalAll();
} catch (Exception ignore) {
} finally {
lock.unlock();
}
}
public Lock addLockIfAbsent(String key) {
if (dom2LockMap.containsKey(key)) {
return dom2LockMap.get(key);
}
Lock lock = new ReentrantLock();
dom2LockMap.put(domName, lock);
dom2LockMap.put(key, lock);
return lock;
}
public Condition addCondtion(String domName) {
Condition condition = dom2LockMap.get(domName).newCondition();
dom2ConditionMap.put(domName, condition);
public Condition addCondtion(String key) {
Condition condition = dom2LockMap.get(key).newCondition();
dom2ConditionMap.put(key, condition);
return condition;
}
private static class DomainKey {
private String namespaceId;
private String domName;
private String serverIP;
@ -679,9 +723,14 @@ public class DomainsManager {
return domName;
}
public String getNamespaceId() {
return namespaceId;
}
private String checksum;
public DomainKey(String domName, String serverIP, String checksum) {
public DomainKey(String namespaceId, String domName, String serverIP, String checksum) {
this.namespaceId = namespaceId;
this.domName = domName;
this.serverIP = serverIP;
this.checksum = checksum;

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
@ -64,6 +65,7 @@ public class VirtualClusterDomain implements Domain, RaftListener {
private Boolean enabled = true;
private Boolean enableClientBeat = false;
private Selector selector = new NoneSelector();
private String namespaceId;
/**
* IP will be deleted if it has not send beat for some time, default timeout is half an hour .
@ -158,12 +160,12 @@ public class VirtualClusterDomain implements Domain, RaftListener {
@Override
public boolean interests(String key) {
return StringUtils.equals(key, UtilsAndCommons.IPADDRESS_DATA_ID_PRE + name);
return StringUtils.equals(key, UtilsAndCommons.IPADDRESS_DATA_ID_PRE + namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + name);
}
@Override
public boolean matchUnlistenKey(String key) {
return StringUtils.equals(key, UtilsAndCommons.IPADDRESS_DATA_ID_PRE + name);
return StringUtils.equals(key, UtilsAndCommons.IPADDRESS_DATA_ID_PRE + namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + name);
}
@Override
@ -244,7 +246,7 @@ public class VirtualClusterDomain implements Domain, RaftListener {
clusterMap.get(entry.getKey()).updateIPs(entryIPs);
}
setLastModifiedMillis(System.currentTimeMillis());
PushService.domChanged(name);
PushService.domChanged(namespaceId, name);
StringBuilder stringBuilder = new StringBuilder();
for (IpAddress ipAddress : allIPs()) {
@ -452,6 +454,14 @@ public class VirtualClusterDomain implements Domain, RaftListener {
this.clusterMap = clusterMap;
}
public String getNamespaceId() {
return namespaceId;
}
public void setNamespaceId(String namespaceId) {
this.namespaceId = namespaceId;
}
@Override
public void update(Domain dom) {
if (!(dom instanceof VirtualClusterDomain)) {

View File

@ -212,7 +212,7 @@ public abstract class AbstractHealthCheckProcessor {
VirtualClusterDomain vDom = (VirtualClusterDomain) cluster.getDom();
vDom.setLastModifiedMillis(System.currentTimeMillis());
PushService.domChanged(vDom.getName());
PushService.domChanged(vDom.getNamespaceId(), vDom.getName());
addResult(new HealthCheckResult(vDom.getName(), ip));
Loggers.EVT_LOG.info("{" + cluster.getDom().getName() + "} {POS} {IP-ENABLED} valid: "
@ -254,7 +254,7 @@ public abstract class AbstractHealthCheckProcessor {
vDom.setLastModifiedMillis(System.currentTimeMillis());
addResult(new HealthCheckResult(vDom.getName(), ip));
PushService.domChanged(vDom.getName());
PushService.domChanged(vDom.getNamespaceId(), vDom.getName());
Loggers.EVT_LOG.info("{" + cluster.getDom().getName() + "} {POS} {IP-DISABLED} invalid: "
+ ip.getIp() + ":" + ip.getPort() + "@" + cluster.getName()
@ -291,7 +291,7 @@ public abstract class AbstractHealthCheckProcessor {
VirtualClusterDomain vDom = (VirtualClusterDomain) cluster.getDom();
vDom.setLastModifiedMillis(System.currentTimeMillis());
PushService.domChanged(vDom.getName());
PushService.domChanged(vDom.getNamespaceId(), vDom.getName());
addResult(new HealthCheckResult(vDom.getName(), ip));
Loggers.EVT_LOG.info("{" + cluster.getDom().getName() + "} {POS} {IP-DISABLED} invalid-now: "

View File

@ -60,7 +60,7 @@ public class ClientBeatCheckTask implements Runnable {
+ ipAddress.getIp() + ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName()
+ ", region: " + DistroMapper.LOCALHOST_SITE + ", msg: " + "client timeout after "
+ ClientBeatProcessor.CLIENT_BEAT_TIMEOUT + ", last beat: " + ipAddress.getLastBeat());
PushService.domChanged(domain.getName());
PushService.domChanged(domain.getNamespaceId(), domain.getName());
}
}
}

View File

@ -82,7 +82,7 @@ public class ClientBeatProcessor implements Runnable {
Loggers.EVT_LOG.info("{" + cluster.getDom().getName() + "} {POS} {IP-ENABLED} valid: "
+ ip+ ":" + port+ "@" + cluster.getName()
+ ", region: " + DistroMapper.LOCALHOST_SITE + ", msg: " + "client beat ok");
PushService.domChanged(domain.getName());
PushService.domChanged(virtualClusterDomain.getNamespaceId(), domain.getName());
}
}
}

View File

@ -19,6 +19,9 @@ package com.alibaba.nacos.naming.misc;
* @author nacos
*/
public class Message {
private String data;
public String getData() {
return data;
}
@ -26,6 +29,4 @@ public class Message {
public void setData(String data) {
this.data = data;
}
private String data;
}

View File

@ -20,8 +20,10 @@ import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
import com.alibaba.nacos.naming.core.Domain;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.exception.NacosException;
import com.alibaba.nacos.naming.healthcheck.JsonAdapter;
import com.alibaba.nacos.naming.selector.Selector;
@ -100,13 +102,11 @@ public class UtilsAndCommons {
public static final String API_SET_ALL_WEIGHTS = "/api/setWeight4AllIPs";
public static final String API_DOM_SERVE_STATUS = "/api/domServeStatus";
public static final String API_IP_FOR_DOM = "/api/ip4Dom";
public static final String API_DOM = "/api/dom";
public static final String SERVICE_TENANT_CONNECTOR = "::";
public static final String SERVICE_GROUP_CONNECTOR = "##";
public static final String INSTANCE_LIST_PERSISTED_PROPERTY_KEY = "nacos.instanceListPersisted";
@ -212,10 +212,18 @@ public class UtilsAndCommons {
public static String getIPListStoreKey(Domain dom) {
if (dom instanceof VirtualClusterDomain) {
return UtilsAndCommons.IPADDRESS_DATA_ID_PRE + ((VirtualClusterDomain) dom).getNamespaceId() +
UtilsAndCommons.SERVICE_GROUP_CONNECTOR + dom.getName();
}
return UtilsAndCommons.IPADDRESS_DATA_ID_PRE + dom.getName();
}
public static String getDomStoreKey(Domain dom) {
if (dom instanceof VirtualClusterDomain) {
return UtilsAndCommons.DOMAINS_DATA_ID + "." + ((VirtualClusterDomain) dom).getNamespaceId() +
UtilsAndCommons.SERVICE_GROUP_CONNECTOR + dom.getName();
}
return UtilsAndCommons.DOMAINS_DATA_ID + "." + dom.getName();
}
@ -244,4 +252,12 @@ public class UtilsAndCommons {
return metadataMap;
}
public static String getDefaultNamespaceId() {
return "public";
}
public static String assembleFullServiceName(String namespaceId, String serviceName) {
return namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + serviceName;
}
}

View File

@ -21,7 +21,6 @@ import com.alibaba.nacos.naming.misc.Switch;
import com.alibaba.nacos.naming.push.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -74,30 +73,16 @@ public class PerformanceLoggerThread {
PerformanceLogTask task = new PerformanceLogTask();
executor.scheduleWithFixedDelay(task, 30, PERIOD, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new HealthCheckSwitchTask(), 30, HEALTH_CHECK_PERIOD, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new AllDomNamesTask(), 60, 60, TimeUnit.SECONDS);
}
class AllDomNamesTask implements Runnable {
@Override
public void run() {
try {
domainsManager.setAllDomNames(new ArrayList<String>(domainsManager.getAllDomNames()));
Loggers.PERFORMANCE_LOG.debug("refresh all dom names: " + domainsManager.getAllDomNamesCache().size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
class PerformanceLogTask implements Runnable {
@Override
public void run() {
try {
int domCount = domainsManager.getDomCount();
int ipCount = domainsManager.getIPCount();
int ipCount = domainsManager.getInstanceCount();
long maxPushMaxCost = getMaxPushCost();
long avgPushCost = getAvgPushCost();
Loggers.PERFORMANCE_LOG.info("PERFORMANCE:" + "|" + domCount + "|" + ipCount + "|" + maxPushMaxCost + "|" + avgPushCost);

View File

@ -41,8 +41,8 @@ import java.util.zip.GZIPOutputStream;
public class PushService {
public static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);
private static final int MAX_RETRY_TIMES = 1;
private static BlockingQueue<String> QUEUE = new LinkedBlockingDeque<String>();
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap
= new ConcurrentHashMap<String, Receiver.AckEntry>();
@ -88,19 +88,10 @@ public class PushService {
try {
udpSocket = new DatagramSocket();
Sender sender;
Receiver receiver;
sender = new Sender();
Thread outThread;
Thread inThread;
outThread = new Thread(sender);
outThread.setDaemon(true);
outThread.setName("com.alibaba.nacos.naming.push.sender");
outThread.start();
receiver = new Receiver();
inThread = new Thread(receiver);
@ -128,7 +119,8 @@ public class PushService {
return totalPush;
}
public static void addClient(String dom,
public static void addClient(String namespaceId,
String dom,
String clusters,
String agent,
InetSocketAddress socketAddr,
@ -136,7 +128,8 @@ public class PushService {
String tenant,
String app) {
PushClient client = new PushService.PushClient(dom,
PushClient client = new PushService.PushClient(namespaceId,
dom,
clusters,
agent,
socketAddr,
@ -148,10 +141,12 @@ public class PushService {
public static void addClient(PushClient client) {
// client is stored by key 'dom' because notify event is driven by dom change
ConcurrentMap<String, PushClient> clients = clientMap.get(client.getDom());
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getDom());
ConcurrentMap<String, PushClient> clients =
clientMap.get(serviceKey);
if (clients == null) {
clientMap.putIfAbsent(client.getDom(), new ConcurrentHashMap<String, PushClient>(1024));
clients = clientMap.get(client.getDom());
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<String, PushClient>(1024));
clients = clientMap.get(serviceKey);
}
PushClient oldClient = clients.get(client.toString());
@ -213,8 +208,8 @@ public class PushService {
return dom + UtilsAndCommons.CACHE_KEY_SPLITER + agent;
}
public static void domChanged(final String dom) {
if (futureMap.containsKey(dom)) {
public static void domChanged(final String namespaceId, final String dom) {
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, dom))) {
return;
}
Future future = udpSender.schedule(new Runnable() {
@ -222,7 +217,7 @@ public class PushService {
public void run() {
try {
Loggers.PUSH.info(dom + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap.get(dom);
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
if (MapUtils.isEmpty(clients)) {
return;
}
@ -269,13 +264,13 @@ public class PushService {
Loggers.PUSH.error("VIPSRV-PUSH", "failed to push dom: " + dom + " to cleint", e);
} finally {
futureMap.remove(dom);
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(dom, future);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, dom), future);
}
public static boolean canEnablePush(String agent) {
@ -311,6 +306,7 @@ public class PushService {
}
public static class PushClient {
private String namespaceId;
private String dom;
private String clusters;
private String agent;
@ -330,25 +326,15 @@ public class PushService {
public long lastRefTime = System.currentTimeMillis();
public PushClient(String dom
, String clusters
, String agent
, InetSocketAddress socketAddr
, DataSource dataSource) {
this.dom = dom;
this.clusters = clusters;
this.agent = agent;
this.socketAddr = socketAddr;
this.dataSource = dataSource;
}
public PushClient(String dom,
public PushClient(String namespaceId,
String dom,
String clusters,
String agent,
InetSocketAddress socketAddr,
DataSource dataSource,
String tenant,
String app) {
this.namespaceId = namespaceId;
this.dom = dom;
this.clusters = clusters;
this.agent = agent;
@ -415,6 +401,14 @@ public class PushService {
this.clusters = clusters;
}
public String getNamespaceId() {
return namespaceId;
}
public void setNamespaceId(String namespaceId) {
this.namespaceId = namespaceId;
}
public String getDom() {
return dom;
}
@ -544,48 +538,6 @@ public class PushService {
}
}
private static class Sender implements Runnable {
@Override
public void run() {
while (true) {
try {
String dom;
try {
dom = QUEUE.take();
} catch (InterruptedException e) {
continue; //ignore
}
if (System.currentTimeMillis() - lastPushMillisMap.get(dom) < 1000) {
QUEUE.add(dom);
continue;
}
lastPushMillisMap.put(dom, System.currentTimeMillis());
ConcurrentMap<String, PushClient> clients = clientMap.get(dom);
if (MapUtils.isEmpty(clients)) {
continue;
}
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}
Loggers.PUSH.debug("push dom: " + dom + " to cleint");
Receiver.AckEntry ackEntry = prepareAckEntry(client, prepareHostsData(client), System.nanoTime());
Loggers.PUSH.info("sender", "dom: " + client.getDom() + " changed, schedule push for: "
+ client.getAddrStr() + ", agent: " + client.getAgent() + ", key: " + ackEntry.key);
udpPush(ackEntry);
}
} catch (Throwable t) {
Loggers.PUSH.error("VIPSRV-PUSH", "failed, caused by: ", t);
}
}
}
}
private static String getACKKey(String host, int port, long lastRefTime) {
return StringUtils.strip(host) + "," + port + "," + lastRefTime;
}

View File

@ -15,14 +15,16 @@
*/
package com.alibaba.nacos.naming.raft;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Properties;
@ -53,36 +55,16 @@ public class RaftStore {
CACHE_DIR = BASE_DIR + File.separator + "data";
}
public synchronized static void load() throws Exception{
public synchronized static void load() throws Exception {
long start = System.currentTimeMillis();
// load data
for (File cache : listCaches()) {
if (!cache.isFile()) {
Loggers.RAFT.warn("warning: encountered directory in cache dir: " + cache.getAbsolutePath());
}
ByteBuffer buffer;
FileChannel fc = null;
try {
fc = new FileInputStream(cache).getChannel();
buffer = ByteBuffer.allocate((int) cache.length());
fc.read(buffer);
String json = new String(buffer.array(), "UTF-8");
if (StringUtils.isBlank(json)) {
continue;
}
Datum datum = JSON.parseObject(json, Datum.class);
RaftCore.addDatum(datum);
} catch (Exception e) {
Loggers.RAFT.warn("waning: failed to deserialize key: " + cache.getName());
throw e;
} finally {
if (fc != null) {
fc.close();
if (cache.isDirectory() && cache.listFiles() != null) {
for (File datumFile : cache.listFiles()) {
readDatum(datumFile);
}
continue;
}
readDatum(cache);
}
// load meta
@ -99,7 +81,7 @@ public class RaftStore {
Loggers.RAFT.info("finish loading all datums, size: " + RaftCore.datumSize() + " cost " + (System.currentTimeMillis() - start) + "ms.");
}
public synchronized static void load(String key) throws Exception{
public synchronized static void load(String key) throws Exception {
long start = System.currentTimeMillis();
// load data
for (File cache : listCaches()) {
@ -110,36 +92,60 @@ public class RaftStore {
if (!StringUtils.equals(decodeFileName(cache.getName()), key)) {
continue;
}
ByteBuffer buffer;
FileChannel fc = null;
try {
fc = new FileInputStream(cache).getChannel();
buffer = ByteBuffer.allocate((int) cache.length());
fc.read(buffer);
String json = new String(buffer.array(), "UTF-8");
if (StringUtils.isBlank(json)) {
continue;
}
Datum datum = JSON.parseObject(json, Datum.class);
RaftCore.addDatum(datum);
} catch (Exception e) {
Loggers.RAFT.warn("waning: failed to deserialize key: " + cache.getName());
throw e;
} finally {
if (fc != null) {
fc.close();
}
}
readDatum(cache);
}
Loggers.RAFT.info("finish loading datum, key: " + key + " cost " + (System.currentTimeMillis() - start) + "ms.");
Loggers.RAFT.info("finish loading datum, key: " + key + " cost " + (System.currentTimeMillis() - start) + "ms.");
}
public synchronized static void readDatum(File file) throws IOException {
ByteBuffer buffer;
FileChannel fc = null;
try {
fc = new FileInputStream(file).getChannel();
buffer = ByteBuffer.allocate((int) file.length());
fc.read(buffer);
String json = new String(buffer.array(), "UTF-8");
if (StringUtils.isBlank(json)) {
return;
}
Datum datum = JSON.parseObject(json, Datum.class);
RaftCore.addDatum(datum);
} catch (Exception e) {
Loggers.RAFT.warn("waning: failed to deserialize key: " + file.getName());
throw e;
} finally {
if (fc != null) {
fc.close();
}
}
}
public synchronized static void write(final Datum datum) throws Exception {
File cacheFile = new File(CACHE_DIR + File.separator + encodeFileName(datum.key));
String namespaceId = null;
if (datum.key.contains(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)) {
String[] segments = datum.key.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[0].split("\\.");
namespaceId = segments[segments.length - 1];
String newKey;
segments = datum.key.split(namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR);
newKey = segments[0] + segments[1];
datum.key = newKey;
}
File cacheFile;
File oldCacheFile = null;
if (StringUtils.isNotBlank(namespaceId)) {
cacheFile = new File(CACHE_DIR + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
} else {
oldCacheFile = new File(CACHE_DIR + File.separator + encodeFileName(datum.key));
cacheFile = new File(CACHE_DIR + File.separator + UtilsAndCommons.getDefaultNamespaceId() + File.separator + encodeFileName(datum.key));
}
if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
throw new IllegalStateException("can not make cache file: " + cacheFile.getName());
}
@ -157,6 +163,10 @@ public class RaftStore {
}
}
if (oldCacheFile != null) {
oldCacheFile.delete();
}
}
private static File[] listCaches() throws Exception {

View File

@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
import com.alibaba.nacos.common.util.Md5Utils;
import com.alibaba.nacos.common.util.SystemUtils;
@ -43,7 +44,6 @@ import com.ning.http.client.Response;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.catalina.util.ParameterMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -103,9 +103,9 @@ public class ApiCommands {
JSONObject result = new JSONObject();
try {
result = ApiCommands.this.doSrvIPXT(client.getDom(), client.getAgent(), client.getClusters(),
inetAddress.getHostAddress(), 0, StringUtils.EMPTY, StringUtils.EMPTY, false,
StringUtils.EMPTY, StringUtils.EMPTY, false);
result = ApiCommands.this.doSrvIPXT(client.getNamespaceId(), client.getDom(), client.getAgent(),
client.getClusters(), inetAddress.getHostAddress(), 0, StringUtils.EMPTY,
false, StringUtils.EMPTY, StringUtils.EMPTY, false);
} catch (Exception e) {
Loggers.SRV_LOG.warn("PUSH-SERVICE: dom is not modified", e);
}
@ -121,14 +121,11 @@ public class ApiCommands {
@RequestMapping("/dom")
public JSONObject dom(HttpServletRequest request) throws NacosException {
// SDK before version 2.0,0 use 'name' instead of 'dom' here
String name = WebUtils.optional(request, "name", StringUtils.EMPTY);
if (StringUtils.isEmpty(name)) {
name = WebUtils.required(request, "dom");
}
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String name = WebUtils.required(request, "dom");
Loggers.SRV_LOG.info("[DOM] request dom:" + name);
Domain dom = domainsManager.getDomain(name);
Domain dom = domainsManager.getDomain(namespaceId, name);
if (dom == null) {
throw new NacosException(NacosException.NOT_FOUND, "Dom doesn't exist");
}
@ -147,10 +144,13 @@ public class ApiCommands {
@RequestMapping("/rt4Dom")
public JSONObject rt4Dom(HttpServletRequest request) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain domObj
= (VirtualClusterDomain) domainsManager.getDomain(dom);
= (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (domObj == null) {
throw new IllegalArgumentException("request dom doesn't exist");
}
@ -176,9 +176,12 @@ public class ApiCommands {
@RequestMapping("/ip4Dom2")
public JSONObject ip4Dom2(HttpServletRequest request) throws NacosException {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String domName = WebUtils.required(request, "dom");
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(domName);
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, domName);
if (dom == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom: " + domName + " not found.");
@ -202,11 +205,13 @@ public class ApiCommands {
JSONObject result = new JSONObject();
try {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String domName = WebUtils.required(request, "dom");
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String agent = WebUtils.optional(request, "header:Client-Version", StringUtils.EMPTY);
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(domName);
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, domName);
if (dom == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom: " + domName + " not found!");
@ -264,8 +269,10 @@ public class ApiCommands {
@RequestMapping("/regDom")
public String regDom(HttpServletRequest request) throws Exception {
String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(dom) != null) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
if (domainsManager.getDomain(namespaceId, dom) != null) {
throw new IllegalArgumentException("specified dom already exists, dom : " + dom);
}
@ -277,12 +284,14 @@ public class ApiCommands {
@RequestMapping("/clientBeat")
public JSONObject clientBeat(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String beat = WebUtils.required(request, "beat");
RsInfo clientBeat = JSON.parseObject(beat, RsInfo.class);
if (StringUtils.isBlank(clientBeat.getCluster())) {
clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
String dom = WebUtils.required(request, "dom");
String dom = WebUtils.required(request, "serviceName");
String app;
app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String clusterName = clientBeat.getCluster();
@ -293,7 +302,7 @@ public class ApiCommands {
Loggers.DEBUG_LOG.debug("[CLIENT-BEAT] full arguments: beat: " + clientBeat + ", serviceName:" + dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
Map<String, String[]> stringMap = new HashMap<>(16);
stringMap.put("dom", Arrays.asList(dom).toArray(new String[1]));
stringMap.put("enableClientBeat", Arrays.asList("true").toArray(new String[1]));
@ -307,7 +316,7 @@ public class ApiCommands {
Loggers.SRV_LOG.warn("dom not found, register it, dom:" + dom);
}
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
String ip = clientBeat.getIp();
int port = clientBeat.getPort();
@ -347,16 +356,14 @@ public class ApiCommands {
}
String url = "http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/clientBeat";
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/clientBeat";
HttpClient.HttpResult httpResult = HttpClient.httpGet(url, null, proxyParams);
if (httpResult.code != HttpURLConnection.HTTP_OK) {
throw new IllegalArgumentException("failed to proxy client beat to" + server + ", beat: " + beat);
}
} else {
if (virtualClusterDomain != null) {
virtualClusterDomain.processClientBeat(clientBeat);
}
virtualClusterDomain.processClientBeat(clientBeat);
}
JSONObject result = new JSONObject();
@ -369,7 +376,9 @@ public class ApiCommands {
private String addOrReplaceDom(HttpServletRequest request) throws Exception {
String dom = WebUtils.required(request, "dom");
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
String owners = WebUtils.optional(request, "owners", StringUtils.EMPTY);
String token = WebUtils.optional(request, "token", Md5Utils.getMD5(dom, "utf-8"));
@ -391,6 +400,7 @@ public class ApiCommands {
VirtualClusterDomain domObj = new VirtualClusterDomain();
domObj.setName(dom);
domObj.setNamespaceId(namespaceId);
domObj.setToken(token);
domObj.setOwners(Arrays.asList(owners.split(",")));
domObj.setProtectThreshold(protectThreshold);
@ -505,12 +515,11 @@ public class ApiCommands {
@RequestMapping("/deRegService")
public String deRegService(HttpServletRequest request) throws Exception {
IpAddress ipAddress = getIPAddress(request);
String dom = WebUtils.optional(request, "serviceName", StringUtils.EMPTY);
if (StringUtils.isEmpty(dom)) {
dom = WebUtils.required(request, "dom");
}
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, Constants.REQUEST_PARAM_SERVICE_NAME);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (virtualClusterDomain == null) {
return "ok";
}
@ -529,13 +538,14 @@ public class ApiCommands {
@RequestMapping("/regService")
public String regService(HttpServletRequest request) throws Exception {
String dom = WebUtils.required(request, "dom");
String dom = WebUtils.required(request, "serviceName");
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
String app = WebUtils.optional(request, "app", "DEFAULT");
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID, UtilsAndCommons.getDefaultNamespaceId());
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
IpAddress ipAddress = getIPAddress(request);
ipAddress.setApp(app);
@ -550,8 +560,8 @@ public class ApiCommands {
if (virtualClusterDomain == null) {
Lock lock = domainsManager.addLock(dom);
Condition condition = domainsManager.addCondtion(dom);
Lock lock = domainsManager.addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
Condition condition = domainsManager.addCondtion(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
UtilsAndCommons.RAFT_PUBLISH_EXECUTOR.execute(new Runnable() {
@Override
public void run() {
@ -569,7 +579,7 @@ public class ApiCommands {
lock.unlock();
}
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
}
if (virtualClusterDomain != null) {
@ -597,9 +607,10 @@ public class ApiCommands {
@NeedAuth
@RequestMapping("/updateDom")
public String updateDom(HttpServletRequest request) throws Exception {
// dom
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String name = WebUtils.required(request, "dom");
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(name);
VirtualClusterDomain dom = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, name);
if (dom == null) {
throw new IllegalStateException("dom not found");
}
@ -633,7 +644,7 @@ public class ApiCommands {
String setSiteGroupForce = WebUtils.optional(request, "setSiteGroupForce", StringUtils.EMPTY);
if (!StringUtils.isEmpty(sitegroup) || !StringUtils.isEmpty(setSiteGroupForce)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -644,7 +655,7 @@ public class ApiCommands {
String cktype = WebUtils.optional(request, "cktype", StringUtils.EMPTY);
if (!StringUtils.isEmpty(cktype)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -673,7 +684,7 @@ public class ApiCommands {
String defIPPort = WebUtils.optional(request, "defIPPort", StringUtils.EMPTY);
if (!StringUtils.isEmpty(defIPPort)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -684,7 +695,7 @@ public class ApiCommands {
String submask = WebUtils.optional(request, "submask", StringUtils.EMPTY);
if (!StringUtils.isEmpty(submask)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -695,7 +706,7 @@ public class ApiCommands {
String ipPort4Check = WebUtils.optional(request, "ipPort4Check", StringUtils.EMPTY);
if (!StringUtils.isEmpty(ipPort4Check)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -706,7 +717,7 @@ public class ApiCommands {
String defCkPort = WebUtils.optional(request, "defCkPort", StringUtils.EMPTY);
if (!StringUtils.isEmpty(defCkPort)) {
Cluster cluster
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
if (cluster == null) {
throw new IllegalStateException("cluster not found");
}
@ -758,8 +769,8 @@ public class ApiCommands {
@RequestMapping("/hello")
public JSONObject hello(HttpServletRequest request) {
JSONObject result = new JSONObject();
result.put("msg", "Hello! I am Nacos-Naming and healthy! total services: raft " + domainsManager.getRaftDomMap().size()
+ ", local port:" + RunningConfig.getServerPort());
result.put("msg", "Hello! I am Nacos-Naming and healthy! total services: raft " + domainsManager.getDomCount()
+ ", local port:" + RunningConfig.getServerPort());
return result;
}
@ -767,12 +778,15 @@ public class ApiCommands {
@NeedAuth
@RequestMapping("/remvDom")
public String remvDom(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(dom) == null) {
if (domainsManager.getDomain(namespaceId, dom) == null) {
throw new IllegalStateException("specified domain doesn't exists.");
}
domainsManager.easyRemoveDom(dom);
domainsManager.easyRemoveDom(namespaceId, dom);
return "ok";
}
@ -782,18 +796,21 @@ public class ApiCommands {
String ip = WebUtils.required(request, "ip");
Set<String> doms = new HashSet<String>();
for (String dom : domainsManager.getAllDomNames()) {
Domain domObj = domainsManager.getDomain(dom);
Map<String, Set<String>> domMap = domainsManager.getAllDomNames();
List<IpAddress> ipObjs = domObj.allIPs();
for (IpAddress ipObj : ipObjs) {
if (ip.contains(":")) {
if (StringUtils.equals(ipObj.getIp() + ":" + ipObj.getPort(), ip)) {
doms.add(domObj.getName());
}
} else {
if (StringUtils.equals(ipObj.getIp(), ip)) {
doms.add(domObj.getName());
for (String namespaceId : domMap.keySet()) {
for (String dom : domMap.get(namespaceId)) {
Domain domObj = domainsManager.getDomain(namespaceId, dom);
List<IpAddress> ipObjs = domObj.allIPs();
for (IpAddress ipObj : ipObjs) {
if (ip.contains(":")) {
if (StringUtils.equals(ipObj.getIp() + ":" + ipObj.getPort(), ip)) {
doms.add(namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + domObj.getName());
}
} else {
if (StringUtils.equals(ipObj.getIp(), ip)) {
doms.add(namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + domObj.getName());
}
}
}
}
@ -818,22 +835,24 @@ public class ApiCommands {
if (!RaftCore.isLeader(clientIP)) {
Loggers.RAFT.warn("peer(" + JSON.toJSONString(clientIP) + ") tried to publish " +
"data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader()));
"data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader()));
throw new IllegalStateException("peer(" + clientIP + ") tried to publish " +
"data but wasn't leader");
"data but wasn't leader");
}
if (term < RaftCore.getPeerSet().local().term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: "
+ JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local()));
+ JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local()));
throw new IllegalStateException("out of date publish, pub-term:"
+ term + ", cur-term: " + RaftCore.getPeerSet().local().term.get());
+ term + ", cur-term: " + RaftCore.getPeerSet().local().term.get());
}
RaftCore.getPeerSet().local().resetLeaderDue();
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
final String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(dom) == null) {
if (domainsManager.getDomain(namespaceId, dom) == null) {
throw new IllegalStateException("dom doesn't exist: " + dom);
}
@ -862,14 +881,14 @@ public class ApiCommands {
if (updateOnly) {
//make sure every IP is in the dom, otherwise refuse update
List<IpAddress> oldIPs = domainsManager.getDomain(dom).allIPs();
List<IpAddress> oldIPs = domainsManager.getDomain(namespaceId, dom).allIPs();
Collection diff = CollectionUtils.subtract(newIPs, oldIPs);
if (diff.size() != 0) {
throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray())
+ ", if you want to add them, remove updateOnly flag");
+ ", if you want to add them, remove updateOnly flag");
}
}
domainsManager.easyAddIP4Dom(dom, newIPs, timestamp, term);
domainsManager.easyAddIP4Dom(namespaceId, dom, newIPs, timestamp, term);
return "ok";
}
@ -881,6 +900,9 @@ public class ApiCommands {
throw new AccessControlException("Adding IP for dom is forbidden now.");
}
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
Map<String, String> proxyParams = new HashMap<>(16);
for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) {
proxyParams.put(entry.getKey(), entry.getValue()[0]);
@ -918,7 +940,7 @@ public class ApiCommands {
}
String url = "http://" + server
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/addIP4Dom";
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/addIP4Dom";
HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams);
if (result1.code != HttpURLConnection.HTTP_OK) {
@ -930,7 +952,7 @@ public class ApiCommands {
}
final String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(dom) == null) {
if (domainsManager.getDomain(namespaceId, dom) == null) {
throw new IllegalStateException("dom doesn't exist: " + dom);
}
@ -942,20 +964,20 @@ public class ApiCommands {
if (updateOnly) {
//make sure every IP is in the dom, otherwise refuse update
List<IpAddress> oldIPs = domainsManager.getDomain(dom).allIPs();
List<IpAddress> oldIPs = domainsManager.getDomain(namespaceId, dom).allIPs();
Collection diff = CollectionUtils.subtract(newIPs, oldIPs);
if (diff.size() != 0) {
throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray())
+ ", if you want to add them, remove updateOnly flag");
+ ", if you want to add them, remove updateOnly flag");
}
}
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom));
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(namespaceId, dom));
Datum datum = RaftCore.getDatum(key);
if (datum == null) {
try {
domainsManager.getDom2LockMap().get(dom).lock();
domainsManager.getDom2LockMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom)).lock();
datum = RaftCore.getDatum(key);
if (datum == null) {
datum = new Datum();
@ -963,7 +985,7 @@ public class ApiCommands {
RaftCore.addDatum(datum);
}
} finally {
domainsManager.getDom2LockMap().get(dom).unlock();
domainsManager.getDom2LockMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom)).unlock();
}
}
@ -971,7 +993,7 @@ public class ApiCommands {
if (RaftCore.isLeader()) {
try {
domainsManager.getDom2LockMap().get(dom).lock();
domainsManager.getDom2LockMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom)).lock();
proxyParams.put("clientIP", NetUtils.localServer());
proxyParams.put("notify", "true");
@ -991,7 +1013,7 @@ public class ApiCommands {
}
String url = "http://" + server
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/onAddIP4Dom";
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/onAddIP4Dom";
try {
HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() {
@ -999,8 +1021,8 @@ public class ApiCommands {
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("failed to add ip for dom: " + dom
+ ",ipList = " + ipList + ",code: " + response.getStatusCode()
+ ", caused " + response.getResponseBody() + ", server: " + peer.ip);
+ ",ipList = " + ipList + ",code: " + response.getStatusCode()
+ ", caused " + response.getResponseBody() + ", server: " + peer.ip);
return 1;
}
return 0;
@ -1014,10 +1036,10 @@ public class ApiCommands {
}
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-ADD}" + " new: "
+ Arrays.toString(ipList.toArray()) + " operatorIP: "
+ WebUtils.optional(request, "clientIP", "unknown"));
+ Arrays.toString(ipList.toArray()) + " operatorIP: "
+ WebUtils.optional(request, "clientIP", "unknown"));
} finally {
domainsManager.getDom2LockMap().get(dom).unlock();
domainsManager.getDom2LockMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom)).unlock();
}
}
@ -1030,71 +1052,11 @@ public class ApiCommands {
return doAddIP4Dom(request);
}
@RequestMapping("/srvAllIP")
public JSONObject srvAllIP(HttpServletRequest request) throws Exception {
public JSONObject doSrvIPXT(String namespaceId, String dom, String agent, String clusters, String clientIP, int udpPort,
String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
JSONObject result = new JSONObject();
if (DistroMapper.getLocalhostIP().equals(UtilsAndCommons.LOCAL_HOST_IP)) {
throw new Exception("invalid localhost ip: " + DistroMapper.getLocalhostIP());
}
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(dom);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
if (domObj == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom not found: " + dom);
}
checkIfDisabled(domObj);
long cacheMillis = Switch.getCacheMillis(dom);
List<IpAddress> srvedIPs;
if (StringUtils.isEmpty(clusters)) {
srvedIPs = domObj.allIPs();
} else {
srvedIPs = domObj.allIPs(Arrays.asList(clusters.split(",")));
}
JSONArray ipArray = new JSONArray();
for (IpAddress ip : srvedIPs) {
JSONObject ipObj = new JSONObject();
ipObj.put("ip", ip.getIp());
ipObj.put("port", ip.getPort());
ipObj.put("valid", ip.isValid());
ipObj.put("weight", ip.getWeight());
ipObj.put("doubleWeight", ip.getWeight());
ipObj.put("instanceId", ip.getInstanceId());
ipObj.put("metadata", ip.getMetadata());
ipArray.add(ipObj);
}
result.put("hosts", ipArray);
result.put("dom", dom);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", domObj.getChecksum());
result.put("allIPs", "true");
return result;
}
public JSONObject doSrvIPXT(String dom, String agent, String clusters, String clientIP, int udpPort,
String error, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
JSONObject result = new JSONObject();
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(dom);
if (!StringUtils.isEmpty(error)) {
Loggers.ROLE_LOG.info("ENV-NOT-CONSISTENT", error);
}
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (domObj == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom not found: " + dom);
@ -1107,7 +1069,7 @@ public class ApiCommands {
// now try to enable the push
try {
if (udpPort > 0 && PushService.canEnablePush(agent)) {
PushService.addClient(dom,
PushService.addClient(namespaceId, dom,
clusters,
agent,
new InetSocketAddress(clientIP, udpPort),
@ -1217,15 +1179,15 @@ public class ApiCommands {
throw new Exception("invalid localhost ip: " + DistroMapper.getLocalhostIP());
}
String dom = WebUtils.required(request, "dom");
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(dom);
String dom = WebUtils.required(request, "dom");
String agent = request.getHeader("Client-Version");
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
String error = WebUtils.optional(request, "unconsistentDom", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
@ -1234,13 +1196,16 @@ public class ApiCommands {
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIPXT(dom, agent, clusters, clientIP, udpPort, error, env, isCheck, app, tenant, healthyOnly);
return doSrvIPXT(namespaceId, dom, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);
}
@NeedAuth
@RequestMapping("/remvIP4Dom")
public String remvIP4Dom(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
String ipListString = WebUtils.required(request, "ipList");
@ -1264,11 +1229,11 @@ public class ApiCommands {
}
}
domainsManager.easyRemvIP4Dom(dom, ipObjList);
domainsManager.easyRemvIP4Dom(namespaceId, dom, ipObjList);
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " dead: "
+ Arrays.toString(ipList.toArray()) + " operator: "
+ WebUtils.optional(request, "clientIP", "unknown"));
+ Arrays.toString(ipList.toArray()) + " operator: "
+ WebUtils.optional(request, "clientIP", "unknown"));
return "ok";
}
@ -1349,21 +1314,21 @@ public class ApiCommands {
SwitchDomain dom = JSON.parseObject(WebUtils.required(request, "json"), SwitchDomain.class);
dom.setEnableStandalone(Switch.isEnableStandalone());
if (dom.httpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN
|| dom.tcpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) {
|| dom.tcpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) {
throw new IllegalArgumentException("min check time for http or tcp is too small(<500)");
}
if (dom.httpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX
|| dom.tcpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX) {
|| dom.tcpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX) {
throw new IllegalArgumentException("max check time for http or tcp is too small(<3000)");
}
if (dom.httpHealthParams.getFactor() < 0
|| dom.httpHealthParams.getFactor() > 1
|| dom.tcpHealthParams.getFactor() < 0
|| dom.tcpHealthParams.getFactor() > 1) {
|| dom.httpHealthParams.getFactor() > 1
|| dom.tcpHealthParams.getFactor() < 0
|| dom.tcpHealthParams.getFactor() > 1) {
throw new IllegalArgumentException("malformed factor");
}
@ -1696,25 +1661,6 @@ public class ApiCommands {
}
@RequestMapping("/checkStatus")
public JSONObject checkStatus(HttpServletRequest request) {
JSONObject result = new JSONObject();
result.put("healthCheckEnabled", Switch.isHealthCheckEnabled());
result.put("allDoms", domainsManager.getAllDomNames());
List<String> doms = new ArrayList<String>();
for (String dom : domainsManager.getAllDomNames()) {
if (DistroMapper.responsible(dom)) {
doms.add(dom);
}
}
result.put("respDoms", doms);
return result;
}
public void checkIfDisabled(VirtualClusterDomain domObj) throws Exception {
if (!domObj.getEnabled()) {
throw new Exception("domain is disabled now.");
@ -1723,7 +1669,6 @@ public class ApiCommands {
@RequestMapping("/switches")
public JSONObject switches(HttpServletRequest request) {
return JSON.parseObject(Switch.getDom().toJSON());
}
@ -1736,7 +1681,7 @@ public class ApiCommands {
properties.load(is);
try (InputStreamReader releaseNode =
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
Properties properties1 = new Properties();
properties1.load(releaseNode);
@ -1752,7 +1697,7 @@ public class ApiCommands {
JSONObject result = new JSONObject();
try (InputStreamReader releaseNode =
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
Properties properties1 = new Properties();
properties1.load(releaseNode);
@ -1769,18 +1714,16 @@ public class ApiCommands {
public JSONObject allDomNames(HttpServletRequest request) throws Exception {
boolean responsibleOnly = Boolean.parseBoolean(WebUtils.optional(request, "responsibleOnly", "false"));
boolean withOwner = Boolean.parseBoolean((WebUtils.optional(request, "withOwner", "false")));
List<String> doms = new ArrayList<String>();
Set<String> domSet;
Map<String, Set<String>> doms = new HashMap<>(16);
domSet = domainsManager.getAllDomNames();
for (String dom : domSet) {
if (DistroMapper.responsible(dom) || !responsibleOnly) {
if (withOwner) {
doms.add(dom + ":" + ArrayUtils.toString(domainsManager.getDomain(dom).getOwners()));
} else {
doms.add(dom);
Map<String, Set<String>> domMap = domainsManager.getAllDomNames();
for (String namespaceId : domMap.keySet()) {
doms.put(namespaceId, new HashSet<>());
for (String dom : domMap.get(namespaceId)) {
if (DistroMapper.responsible(dom) || !responsibleOnly) {
doms.get(namespaceId).add(dom);
}
}
}
@ -1797,10 +1740,12 @@ public class ApiCommands {
public JSONObject searchDom(HttpServletRequest request) {
JSONObject result = new JSONObject();
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String expr = WebUtils.required(request, "expr");
List<Domain> doms
= domainsManager.searchDomains(".*" + expr + ".*");
= domainsManager.searchDomains(namespaceId, ".*" + expr + ".*");
if (CollectionUtils.isEmpty(doms)) {
result.put("doms", Collections.emptyList());
@ -1817,52 +1762,6 @@ public class ApiCommands {
return result;
}
@RequestMapping("/getWeightsByIP")
public JSONObject getWeightsByIP(HttpServletRequest request) {
String ip = WebUtils.required(request, "ip");
Map<String, List<IpAddress>> dom2IPList = new HashMap<String, List<IpAddress>>(1024);
for (String dom : domainsManager.getAllDomNames()) {
Domain domObj = domainsManager.getDomain(dom);
List<IpAddress> ipObjs = domObj.allIPs();
for (IpAddress ipObj : ipObjs) {
if (StringUtils.startsWith(ipObj.getIp() + ":" + ipObj.getPort(), ip)) {
List<IpAddress> list = dom2IPList.get(domObj.getName());
if (CollectionUtils.isEmpty(list)) {
list = new ArrayList<>();
dom2IPList.put(domObj.getName(), list);
}
list.add(ipObj);
}
}
}
JSONObject result = new JSONObject();
JSONArray ipArray = new JSONArray();
for (Map.Entry<String, List<IpAddress>> entry : dom2IPList.entrySet()) {
for (IpAddress ipAddress : entry.getValue()) {
JSONObject packet = new JSONObject();
packet.put("dom", entry.getKey());
packet.put("ip", ipAddress.getIp());
packet.put("weight", ipAddress.getWeight());
packet.put("port", ipAddress.getPort());
packet.put("cluster", ipAddress.getClusterName());
ipArray.add(packet);
}
}
result.put("ips", ipArray);
result.put("code", 200);
result.put("successful", "success");
return result;
}
private Cluster getClusterFromJson(String json) {
JSONObject object = JSON.parseObject(json);
@ -1887,10 +1786,12 @@ public class ApiCommands {
public String doAddCluster4Dom(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
String json = WebUtils.optional(request, "clusterJson", StringUtils.EMPTY);
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain domObj = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (domObj == null) {
throw new IllegalArgumentException("dom not found: " + dom);
@ -2010,9 +1911,9 @@ public class ApiCommands {
JSONObject result = new JSONObject();
int domCount = domainsManager.getDomCount();
int ipCount = domainsManager.getIPCount();
int ipCount = domainsManager.getInstanceCount();
int responsibleDomCount = domainsManager.getResponsibleDoms().size();
int responsibleDomCount = domainsManager.getResponsibleDomCount();
int responsibleIPCount = domainsManager.getResponsibleIPCount();
result.put("domCount", domCount);
@ -2114,8 +2015,11 @@ public class ApiCommands {
@RequestMapping("/reCalculateCheckSum4Dom")
public JSONObject reCalculateCheckSum4Dom(HttpServletRequest request) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (virtualClusterDomain == null) {
throw new IllegalArgumentException("dom not found");
@ -2130,26 +2034,12 @@ public class ApiCommands {
return result;
}
@RequestMapping("/getDomString4MD5")
public JSONObject getDomString4MD5(HttpServletRequest request) throws NacosException {
JSONObject result = new JSONObject();
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
if (virtualClusterDomain == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom not found");
}
result.put("domString", virtualClusterDomain.getDomString());
return result;
}
@RequestMapping("/getResponsibleServer4Dom")
public JSONObject getResponsibleServer4Dom(HttpServletRequest request) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (virtualClusterDomain == null) {
throw new IllegalArgumentException("dom not found");
@ -2173,8 +2063,10 @@ public class ApiCommands {
@RequestMapping("/responsible")
public JSONObject responsible(HttpServletRequest request) {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (virtualClusterDomain == null) {
throw new IllegalArgumentException("dom not found");
@ -2187,50 +2079,6 @@ public class ApiCommands {
return result;
}
@RequestMapping("/domServeStatus")
public JSONObject domServeStatus(HttpServletRequest request) {
JSONObject result = new JSONObject();
//all ips, sites, disabled site, checkserver, appName
String dom = WebUtils.required(request, "dom");
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
Map<String, Object> data = new HashMap<>(2);
if (virtualClusterDomain == null) {
result.put("success", false);
result.put("data", data);
result.put("errMsg", "dom does not exisit.");
return result;
}
List<IpAddress> ipAddresses = virtualClusterDomain.allIPs();
List<Map<String, Object>> allIPs = new ArrayList<>();
for (IpAddress ip : ipAddresses) {
Map<String, Object> ipPac = new HashMap<>(16);
ipPac.put("ip", ip.getIp());
ipPac.put("valid", ip.isValid());
ipPac.put("port", ip.getPort());
ipPac.put("marked", ip.isMarked());
ipPac.put("cluster", ip.getClusterName());
ipPac.put("weight", ip.getWeight());
allIPs.add(ipPac);
}
List<String> checkServers = Arrays.asList(DistroMapper.mapSrv(dom));
data.put("ips", allIPs);
data.put("checkers", checkServers);
result.put("data", data);
result.put("success", true);
result.put("errMsg", StringUtils.EMPTY);
return result;
}
@RequestMapping("/domStatus")
public String domStatus(HttpServletRequest request) {
//format: dom1@@checksum@@@dom2@@checksum
@ -2254,7 +2102,7 @@ public class ApiCommands {
}
String dom = entry.getKey();
String checksum = entry.getValue();
Domain domain = domainsManager.getDomain(dom);
Domain domain = domainsManager.getDomain(checksums.namespaceId, dom);
if (domain == null) {
continue;
@ -2264,7 +2112,7 @@ public class ApiCommands {
if (!checksum.equals(domain.getChecksum())) {
Loggers.SRV_LOG.debug("checksum of " + dom + " is not consistent, remote: " + serverIP + ",checksum: " + checksum + ", local: " + domain.getChecksum());
domainsManager.addUpdatedDom2Queue(dom, serverIP, checksum);
domainsManager.addUpdatedDom2Queue(checksums.namespaceId, dom, serverIP, checksum);
}
}
} catch (Exception e) {
@ -2284,7 +2132,7 @@ public class ApiCommands {
String state = WebUtils.optional(request, "state", StringUtils.EMPTY);
Loggers.SRV_LOG.info("[CONTAINER_NOTFY] received notify event, type:" + type + ", domain:" + domain +
", ip:" + ip + ", port:" + port + ", state:" + state);
", ip:" + ip + ", port:" + port + ", state:" + state);
return "ok";
}

View File

@ -116,11 +116,6 @@ public class DistroFilter implements Filter {
}
public boolean canDistro(String urlString) {
if (urlString.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.API_DOM_SERVE_STATUS)) {
return false;
}
return urlString.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.API_IP_FOR_DOM) ||
urlString.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.API_DOM);
}

View File

@ -31,7 +31,7 @@ import java.io.IOException;
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 0.8.0
*/
public class TenantFilter implements Filter {
public class NamespaceFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
@ -40,24 +40,7 @@ public class TenantFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;
String tenantId = WebUtils.optional(req, Constants.REQUEST_PARAM_TENANT_ID, StringUtils.EMPTY);
String serviceName = WebUtils.optional(req, Constants.REQUEST_PARAM_SERVICE_NAME, StringUtils.EMPTY);
if (StringUtils.isBlank(tenantId) || StringUtils.isBlank(serviceName)) {
chain.doFilter(req, resp);
return;
}
OverrideParameterRequestWrapper requestWrapper = new OverrideParameterRequestWrapper(req);
requestWrapper.addParameter(Constants.REQUEST_PARAM_SERVICE_NAME,
serviceName + UtilsAndCommons.SERVICE_TENANT_CONNECTOR + tenantId);
chain.doFilter(requestWrapper, resp);
chain.doFilter(request, response);
}
@Override

View File

@ -55,9 +55,9 @@ public class NamingConfig {
public FilterRegistrationBean tenantFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setFilter(tenantFilter());
registration.setFilter(namespaceFilter());
registration.addUrlPatterns("/v1/ns/instance/*", "/v1/ns/service/*", "/v1/ns/cluster/*", "/v1/ns/health/*");
registration.setName("tenantFilter");
registration.setName("namespaceFilter");
registration.setOrder(4);
return registration;
@ -74,7 +74,7 @@ public class NamingConfig {
}
@Bean
public Filter tenantFilter() {
return new TenantFilter();
public Filter namespaceFilter() {
return new NamespaceFilter();
}
}

View File

@ -174,7 +174,7 @@ public class RaftCommands {
response.setHeader("Content-Encode", "gzip");
JSONObject result = new JSONObject();
result.put("doms", domainsManager.getRaftDomMap().size());
result.put("doms", domainsManager.getDomCount());
result.put("peers", RaftCore.getPeers());
return result;

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.IpAddress;
@ -85,9 +86,11 @@ public class InstanceControllerTest extends BaseTest {
ipList.add(ipAddress);
domain.updateIPs(ipList);
Mockito.when(domainsManager.getDomain("nacos.test.1")).thenReturn(domain);
Mockito.when(domainsManager.getDomain(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")).thenReturn(domain);
Mockito.when(domainsManager.addLock("nacos.test.1")).thenReturn(new ReentrantLock());
Mockito.when(domainsManager.addLockIfAbsent(
UtilsAndCommons.assembleFullServiceName(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")))
.thenReturn(new ReentrantLock());
MockHttpServletRequestBuilder builder =
MockMvcRequestBuilders.put("/naming/instance")
@ -132,7 +135,7 @@ public class InstanceControllerTest extends BaseTest {
ipList.add(ipAddress);
domain.updateIPs(ipList);
Mockito.when(domainsManager.getDomain("nacos.test.1")).thenReturn(domain);
Mockito.when(domainsManager.getDomain(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")).thenReturn(domain);
MockHttpServletRequestBuilder builder =
MockMvcRequestBuilders.get("/v1/ns/instances")

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -46,7 +47,7 @@ public class DomainsManagerTest extends BaseTest {
@Test
public void easyRemoveDom() throws Exception {
domainsManager.easyRemoveDom("nacos.test.1");
domainsManager.easyRemoveDom(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1");
}
@Test
@ -55,14 +56,14 @@ public class DomainsManagerTest extends BaseTest {
VirtualClusterDomain domain = new VirtualClusterDomain();
domain.setName("nacos.test.1");
domainsManager.chooseDomMap().put("nacos.test.1", domain);
domainsManager.chooseDomMap(UtilsAndCommons.getDefaultNamespaceId()).put("nacos.test.1", domain);
IpAddress ipAddress = new IpAddress();
ipAddress.setIp("1.1.1.1");
List<IpAddress> ipList = new ArrayList<IpAddress>();
ipList.add(ipAddress);
domainsManager.addLock("nacos.test.1");
domainsManager.easyRemvIP4Dom("nacos.test.1", ipList);
domainsManager.addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1"));
domainsManager.easyRemvIP4Dom(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1", ipList);
}
@Test
@ -70,9 +71,9 @@ public class DomainsManagerTest extends BaseTest {
VirtualClusterDomain domain = new VirtualClusterDomain();
domain.setName("nacos.test.1");
domainsManager.chooseDomMap().put("nacos.test.1", domain);
domainsManager.chooseDomMap(UtilsAndCommons.getDefaultNamespaceId()).put("nacos.test.1", domain);
List<Domain> list = domainsManager.searchDomains("nacos.test.*");
List<Domain> list = domainsManager.searchDomains(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.*");
Assert.assertNotNull(list);
Assert.assertEquals(1, list.size());
Assert.assertEquals("nacos.test.1", list.get(0).getName());

View File

@ -49,7 +49,7 @@ import java.util.ArrayList;
import java.util.List;
/**
*
*
* @author en.xuze@alipay.com
* @version $Id: APICommandsTest.java, v 0.1 2018年5月14日 下午4:31:13 en.xuze@alipay.com Exp $
*/
@ -85,7 +85,7 @@ public class APICommandsTest {
public void dom() throws Exception {
VirtualClusterDomain domain = new VirtualClusterDomain();
domain.setName("nacos.domain.1");
Mockito.when(domainsManager.getDomain("nacos.domain.1")).thenReturn(domain);
Mockito.when(domainsManager.getDomain(UtilsAndCommons.getDefaultNamespaceId(), "nacos.domain.1")).thenReturn(domain);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get("/naming/api/dom")
.param("dom", "nacos.domain.1");
@ -113,7 +113,7 @@ public class APICommandsTest {
domain.onChange("iplist", JSON.toJSONString(list));
Mockito.when(domainsManager.getDomain("nacos.domain.1")).thenReturn(domain);
Mockito.when(domainsManager.getDomain(UtilsAndCommons.getDefaultNamespaceId(), "nacos.domain.1")).thenReturn(domain);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get("/naming/api/ip4Dom")
.param("dom", "nacos.domain.1");

View File

@ -0,0 +1,123 @@
package com.alibaba.nacos.test.naming;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.NamingApp;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.test.naming.NamingBase.*;
/**
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NamingApp.class, properties = {"server.servlet.context-path=/nacos"},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MultipleTenantTest {
private NamingService naming;
private NamingService naming1;
private NamingService naming2;
@LocalServerPort
private int port;
private volatile List<Instance> instances = Collections.emptyList();
@Before
public void init() throws Exception {
TimeUnit.SECONDS.sleep(10);
naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, "namespace-1");
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port);
naming1 = NamingFactory.createNamingService(properties);
properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, "namespace-2");
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port);
naming2 = NamingFactory.createNamingService(properties);
}
@Test
public void registerInstance() throws Exception {
String serviceName = randomDomainName();
System.out.println(serviceName);
naming1.registerInstance(serviceName, "11.11.11.11", 80);
naming2.registerInstance(serviceName, "22.22.22.22", 80);
naming.registerInstance(serviceName, "33.33.33.33", 8888);
naming.registerInstance(serviceName, "44.44.44.44", 8888);
TimeUnit.SECONDS.sleep(8L);
List<Instance> instances = naming1.getAllInstances(serviceName);
Assert.assertEquals(1, instances.size());
Assert.assertEquals("11.11.11.11", instances.get(0).getIp());
Assert.assertEquals(80, instances.get(0).getPort());
instances = naming2.getAllInstances(serviceName);
Assert.assertEquals(1, instances.size());
Assert.assertEquals("22.22.22.22", instances.get(0).getIp());
Assert.assertEquals(80, instances.get(0).getPort());
instances = naming.getAllInstances(serviceName);
Assert.assertEquals(2, instances.size());
}
@Test
public void subscribeAdd() throws Exception {
String serviceName = randomDomainName();
naming1.subscribe(serviceName, new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
instances = ((NamingEvent) event).getInstances();
}
});
naming1.registerInstance(serviceName, "11.11.11.11", TEST_PORT, "c1");
naming1.registerInstance(serviceName, "22.22.22.22", TEST_PORT, "c1");
naming2.registerInstance(serviceName, "33.33.33.33", TEST_PORT, "c1");
naming2.registerInstance(serviceName, "44.44.44.44", TEST_PORT, "c1");
while (instances.size() != 2) {
Thread.sleep(1000L);
}
Set<String> ips = new HashSet<String>();
ips.add(instances.get(0).getIp());
ips.add(instances.get(1).getIp());
Assert.assertTrue(ips.contains("11.11.11.11"));
Assert.assertTrue(ips.contains("22.22.22.22"));
Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName)));
}
}

View File

@ -18,7 +18,9 @@ package com.alibaba.nacos.test.naming;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.naming.NamingApp;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -276,26 +278,6 @@ public class RestAPI_ITCase {
}
@Test
public void srvAllIP() throws Exception {
ResponseEntity<String> response = request("/nacos/v1/ns/api/srvAllIP",
Params.newParams()
.appendParam("dom", NamingBase.TEST_DOM_1)
.done(), String.class);
assertTrue(response.getStatusCode().is2xxSuccessful());
JSONObject json = JSON.parseObject(response.getBody());
Assert.assertEquals(NamingBase.TEST_DOM_1, json.getString("dom"));
JSONArray hosts = json.getJSONArray("hosts");
Assert.assertNotNull(hosts);
Assert.assertEquals(1, hosts.size());
Assert.assertEquals(NamingBase.TEST_IP_4_DOM_1, hosts.getJSONObject(0).getString("ip"));
Assert.assertEquals(NamingBase.TEST_PORT_4_DOM_1, hosts.getJSONObject(0).getString("port"));
}
@Test
public void remvIP4Dom() throws Exception {
@ -418,17 +400,6 @@ public class RestAPI_ITCase {
Assert.assertEquals("3000", switches.getString("defaultCacheMillis"));
}
@Test
public void checkStatus() throws Exception {
ResponseEntity<String> response = request("/nacos/v1/ns/api/checkStatus",
Params.newParams().done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
}
@Test
public void allDomNames() throws Exception {
@ -536,18 +507,7 @@ public class RestAPI_ITCase {
ResponseEntity<String> response = request("/nacos/v1/ns/api/reCalculateCheckSum4Dom",
Params.newParams()
.appendParam("dom", NamingBase.TEST_DOM_1)
.done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
}
@Test
public void getDomString4MD5() throws Exception {
ResponseEntity<String> response = request("/nacos/v1/ns/api/getDomString4MD5",
Params.newParams()
.appendParam(Constants.REQUEST_PARAM_NAMESPACE_ID, UtilsAndCommons.getDefaultNamespaceId())
.appendParam("dom", NamingBase.TEST_DOM_1)
.done(),
String.class);
@ -567,22 +527,6 @@ public class RestAPI_ITCase {
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
}
@Test
public void domServeStatus() throws Exception {
ResponseEntity<String> response = request("/nacos/v1/ns/api/domServeStatus",
Params.newParams()
.appendParam("dom", NamingBase.TEST_DOM_1)
.done(),
String.class);
Assert.assertTrue(response.getStatusCode().is2xxSuccessful());
JSONObject json = JSON.parseObject(response.getBody());
Assert.assertTrue(json.getBooleanValue("success"));
Assert.assertTrue(json.getJSONObject("data").getJSONArray("ips").size() > 0);
}
private <T> ResponseEntity<T> request(String path, MultiValueMap<String, String> params, Class<T> clazz) {
HttpHeaders headers = new HttpHeaders();