#745 Support service creation in AP mode
This commit is contained in:
parent
78bf52f2e4
commit
81dd9fbc6b
@ -228,12 +228,13 @@ public class NamingProxy {
|
||||
public boolean serverHealthy() {
|
||||
|
||||
try {
|
||||
reqAPI(UtilAndComs.NACOS_URL_BASE + "/operator/metrics", new HashMap<String, String>(2));
|
||||
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/operator/metrics", new HashMap<String, String>(2));
|
||||
JSONObject json = JSON.parseObject(result);
|
||||
String serverStatus = json.getString("status");
|
||||
return "UP".equals(serverStatus);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public ListView<String> getServiceList(int pageNo, int pageSize) throws NacosException {
|
||||
|
@ -68,3 +68,5 @@ server.tomcat.basedir=
|
||||
nacos.naming.partition.taskDispatchThreadCount=10
|
||||
nacos.naming.partition.taskDispatchPeriod=200
|
||||
nacos.naming.partition.batchSyncKeyCount=1000
|
||||
nacos.naming.partition.initDataRatio=0.9
|
||||
nacos.naming.partition.syncRetryDelay=5000
|
||||
|
@ -42,3 +42,5 @@ nacos.security.ignore.urls=/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/
|
||||
nacos.naming.partition.taskDispatchThreadCount=10
|
||||
nacos.naming.partition.taskDispatchPeriod=200
|
||||
nacos.naming.partition.batchSyncKeyCount=1000
|
||||
nacos.naming.partition.initDataRatio=0.9
|
||||
nacos.naming.partition.syncRetryDelay=5000
|
||||
|
@ -510,7 +510,7 @@
|
||||
<appender-ref ref="async-naming-raft"/>
|
||||
</logger>
|
||||
<logger name="com.alibaba.nacos.naming.ephemeral" additivity="false">
|
||||
<level value="INFO"/>
|
||||
<level value="DEBUG"/>
|
||||
<appender-ref ref="async-naming-ephemeral"/>
|
||||
</logger>
|
||||
<logger name="com.alibaba.nacos.naming.event" additivity="false">
|
||||
|
@ -42,47 +42,27 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
|
||||
|
||||
@Override
|
||||
public void put(String key, Object value) throws NacosException {
|
||||
if (KeyBuilder.matchEphemeralKey(key)) {
|
||||
ephemeralConsistencyService.put(key, value);
|
||||
} else {
|
||||
persistentConsistencyService.put(key, value);
|
||||
}
|
||||
mapConsistencyService(key).put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String key) throws NacosException {
|
||||
if (KeyBuilder.matchEphemeralKey(key)) {
|
||||
ephemeralConsistencyService.remove(key);
|
||||
} else {
|
||||
persistentConsistencyService.remove(key);
|
||||
}
|
||||
mapConsistencyService(key).remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Datum get(String key) throws NacosException {
|
||||
if (KeyBuilder.matchEphemeralKey(key)) {
|
||||
return ephemeralConsistencyService.get(key);
|
||||
} else {
|
||||
return persistentConsistencyService.get(key);
|
||||
}
|
||||
return mapConsistencyService(key).get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listen(String key, DataListener listener) throws NacosException {
|
||||
if (KeyBuilder.matchEphemeralKey(key)) {
|
||||
ephemeralConsistencyService.listen(key, listener);
|
||||
} else {
|
||||
persistentConsistencyService.listen(key, listener);
|
||||
}
|
||||
mapConsistencyService(key).listen(key, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlisten(String key, DataListener listener) throws NacosException {
|
||||
if (KeyBuilder.matchEphemeralKey(key)) {
|
||||
ephemeralConsistencyService.unlisten(key, listener);
|
||||
} else {
|
||||
persistentConsistencyService.unlisten(key, listener);
|
||||
}
|
||||
mapConsistencyService(key).unlisten(key, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -99,4 +79,8 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
|
||||
public boolean isAvailable() {
|
||||
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();
|
||||
}
|
||||
|
||||
private ConsistencyService mapConsistencyService(String key) {
|
||||
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package com.alibaba.nacos.naming.consistency;
|
||||
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
@ -55,6 +56,10 @@ public class KeyBuilder {
|
||||
return SERVICE_META_KEY_PREFIX + namespaceId + KEY_CONNECTOR + serviceName;
|
||||
}
|
||||
|
||||
public static String getSwitchDomainKey() {
|
||||
return SERVICE_META_KEY_PREFIX + UtilsAndCommons.SWITCH_DOMAIN_NAME;
|
||||
}
|
||||
|
||||
public static boolean matchEphemeralInstanceListKey(String key) {
|
||||
return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
|
||||
}
|
||||
@ -67,6 +72,10 @@ public class KeyBuilder {
|
||||
return key.startsWith(SERVICE_META_KEY_PREFIX) || key.startsWith(BRIEF_SERVICE_META_KEY_PREFIX);
|
||||
}
|
||||
|
||||
public static boolean matchSwitchKey(String key) {
|
||||
return key.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME);
|
||||
}
|
||||
|
||||
public static boolean matchServiceName(String key, String namespaceId, String serviceName) {
|
||||
return key.endsWith(namespaceId + KEY_CONNECTOR + serviceName);
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.partition;
|
||||
import com.alibaba.nacos.naming.cluster.servers.Server;
|
||||
import com.alibaba.nacos.naming.misc.GlobalExecutor;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.NetUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -99,6 +100,9 @@ public class TaskDispatcher {
|
||||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
|
||||
|
||||
for (Server member : dataSyncer.getServers()) {
|
||||
if (NetUtils.localServer().equals(member.getKey())) {
|
||||
continue;
|
||||
}
|
||||
SyncTask syncTask = new SyncTask();
|
||||
syncTask.setKeys(keys);
|
||||
syncTask.setTargetServer(member.getKey());
|
||||
|
@ -917,20 +917,22 @@ public class RaftCore {
|
||||
|
||||
int count = 0;
|
||||
|
||||
if (KeyBuilder.matchServiceMetaKey(datum.key) &&
|
||||
listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
|
||||
if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
|
||||
|
||||
for (DataListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
|
||||
try {
|
||||
if (action == ApplyAction.CHANGE) {
|
||||
listener.onChange(datum.key, getDatum(datum.key).value);
|
||||
}
|
||||
if (KeyBuilder.matchServiceMetaKey(datum.key) && !KeyBuilder.matchSwitchKey(datum.key)) {
|
||||
|
||||
if (action == ApplyAction.DELETE) {
|
||||
listener.onDelete(datum.key);
|
||||
for (DataListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
|
||||
try {
|
||||
if (action == ApplyAction.CHANGE) {
|
||||
listener.onChange(datum.key, getDatum(datum.key).value);
|
||||
}
|
||||
|
||||
if (action == ApplyAction.DELETE) {
|
||||
listener.onDelete(datum.key);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.core.Instance;
|
||||
import com.alibaba.nacos.naming.core.Instances;
|
||||
import com.alibaba.nacos.naming.core.Service;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -125,6 +126,11 @@ public class RaftStore {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchSwitchKey(file.getName())) {
|
||||
return JSON.parseObject(json, new TypeReference<Datum<SwitchDomain>>() {
|
||||
});
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchServiceMetaKey(file.getName())) {
|
||||
try {
|
||||
return JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
|
||||
|
@ -231,11 +231,7 @@ public class InstanceController {
|
||||
instance.setInstanceId(instance.generateInstanceId());
|
||||
instance.setEphemeral(clientBeat.isEphemeral());
|
||||
|
||||
if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {
|
||||
serviceManager.registerInstance(namespaceId, serviceName, clusterName, instance);
|
||||
} else {
|
||||
serviceManager.addInstance(namespaceId, serviceName, clusterName, true, instance);
|
||||
}
|
||||
serviceManager.registerInstance(namespaceId, serviceName, clusterName, instance);
|
||||
}
|
||||
|
||||
Service service = serviceManager.getService(namespaceId, serviceName);
|
||||
@ -326,6 +322,16 @@ public class InstanceController {
|
||||
instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
|
||||
}
|
||||
|
||||
if ((ServerMode.AP.name().equals(switchDomain.getServerMode()) && !instance.isEphemeral())) {
|
||||
throw new NacosException(NacosException.INVALID_PARAM, "wrong instance type: " + instance.isEphemeral()
|
||||
+ " in " + switchDomain.getServerMode() + " mode.");
|
||||
}
|
||||
|
||||
if ((ServerMode.CP.name().equals(switchDomain.getServerMode()) && instance.isEphemeral())) {
|
||||
throw new NacosException(NacosException.INVALID_PARAM, "wrong instance type: " + instance.isEphemeral()
|
||||
+ " in " + switchDomain.getServerMode() + " mode.");
|
||||
}
|
||||
|
||||
serviceManager.registerInstance(namespaceId, serviceName, clusterName, instance);
|
||||
|
||||
return "ok";
|
||||
|
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package com.alibaba.nacos.naming.controllers;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.nacos.api.naming.CommonParams;
|
||||
@ -113,8 +112,8 @@ public class OperatorController {
|
||||
}
|
||||
|
||||
@RequestMapping("/switches")
|
||||
public JSONObject switches(HttpServletRequest request) {
|
||||
return JSON.parseObject(switchDomain.toString());
|
||||
public SwitchDomain switches(HttpServletRequest request) {
|
||||
return switchDomain;
|
||||
}
|
||||
|
||||
@NeedAuth
|
||||
|
@ -17,13 +17,16 @@ package com.alibaba.nacos.naming.controllers;
|
||||
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import com.alibaba.nacos.core.utils.WebUtils;
|
||||
import com.alibaba.nacos.naming.cluster.ServerMode;
|
||||
import com.alibaba.nacos.naming.cluster.transport.Serializer;
|
||||
import com.alibaba.nacos.naming.consistency.Datum;
|
||||
import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||
import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsistencyServiceImpl;
|
||||
import com.alibaba.nacos.naming.core.Instances;
|
||||
import com.alibaba.nacos.naming.core.ServiceManager;
|
||||
import com.alibaba.nacos.naming.exception.NacosException;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -52,6 +55,12 @@ public class PartitionController {
|
||||
@Autowired
|
||||
private PartitionConsistencyServiceImpl consistencyService;
|
||||
|
||||
@Autowired
|
||||
private ServiceManager serviceManager;
|
||||
|
||||
@Autowired
|
||||
private SwitchDomain switchDomain;
|
||||
|
||||
@RequestMapping("/onSync")
|
||||
public String onSync(HttpServletRequest request, HttpServletResponse response) throws Exception {
|
||||
|
||||
@ -67,6 +76,12 @@ public class PartitionController {
|
||||
|
||||
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
|
||||
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
|
||||
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
|
||||
String serviceName = KeyBuilder.getServiceName(entry.getKey());
|
||||
if (!serviceManager.containService(namespaceId, serviceName)
|
||||
&& ServerMode.AP.name().equals(switchDomain.getServerMode())) {
|
||||
serviceManager.createEmptyService(namespaceId, serviceName);
|
||||
}
|
||||
consistencyService.onPut(entry.getKey(), entry.getValue().value);
|
||||
}
|
||||
}
|
||||
@ -77,7 +92,18 @@ public class PartitionController {
|
||||
public String syncTimestamps(HttpServletRequest request, HttpServletResponse response) throws Exception {
|
||||
String source = WebUtils.required(request, "source");
|
||||
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
|
||||
Map<String, Long> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, Long>>(){});
|
||||
Map<String, Long> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, Long>>() {
|
||||
});
|
||||
|
||||
for (String key : dataMap.keySet()) {
|
||||
String namespaceId = KeyBuilder.getNamespace(key);
|
||||
String serviceName = KeyBuilder.getServiceName(key);
|
||||
if (!serviceManager.containService(namespaceId, serviceName)
|
||||
&& ServerMode.AP.name().equals(switchDomain.getServerMode())) {
|
||||
serviceManager.createEmptyService(namespaceId, serviceName);
|
||||
}
|
||||
}
|
||||
|
||||
consistencyService.onReceiveTimestamps(dataMap, source);
|
||||
return "ok";
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import com.alibaba.nacos.naming.core.Service;
|
||||
import com.alibaba.nacos.naming.core.ServiceManager;
|
||||
import com.alibaba.nacos.naming.exception.NacosException;
|
||||
import com.alibaba.nacos.naming.misc.NetUtils;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import com.alibaba.nacos.naming.web.NeedAuth;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
@ -139,6 +140,11 @@ public class RaftController {
|
||||
return "ok";
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchSwitchKey(key)) {
|
||||
raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), SwitchDomain.class));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchServiceMetaKey(key)) {
|
||||
raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Service.class));
|
||||
return "ok";
|
||||
@ -217,6 +223,15 @@ public class RaftController {
|
||||
return "ok";
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchSwitchKey(datum.key)) {
|
||||
Datum<SwitchDomain> switchDomainDatum = new Datum<>();
|
||||
switchDomainDatum.key = datum.key;
|
||||
switchDomainDatum.timestamp.set(datum.timestamp.get());
|
||||
switchDomainDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), SwitchDomain.class);
|
||||
raftConsistencyService.onPut(switchDomainDatum, source);
|
||||
return "ok";
|
||||
}
|
||||
|
||||
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
|
||||
Datum<Service> serviceDatum = new Datum<>();
|
||||
serviceDatum.key = datum.key;
|
||||
|
@ -57,7 +57,7 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
private Set<Instance> ephemeralInstances = new HashSet<>();
|
||||
|
||||
@JSONField(serialize = false)
|
||||
private Service dom;
|
||||
private Service service;
|
||||
|
||||
private Map<String, String> metadata = new ConcurrentHashMap<>();
|
||||
|
||||
@ -104,12 +104,12 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
return checkTask;
|
||||
}
|
||||
|
||||
public Service getDom() {
|
||||
return dom;
|
||||
public Service getService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
public void setDom(Service dom) {
|
||||
this.dom = dom;
|
||||
public void setService(Service service) {
|
||||
this.service = service;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -118,7 +118,7 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
Cluster cluster = new Cluster();
|
||||
|
||||
cluster.setHealthChecker(getHealthChecker().clone());
|
||||
cluster.setDom(getDom());
|
||||
cluster.setService(getService());
|
||||
cluster.persistentInstances = new HashSet<Instance>();
|
||||
cluster.checkTask = null;
|
||||
cluster.metadata = new HashMap<>(metadata);
|
||||
@ -150,12 +150,12 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
if (ip.isValid() != oldIP.isValid()) {
|
||||
// ip validation status updated
|
||||
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}",
|
||||
getDom().getName(), (ip.isValid() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
|
||||
getService().getName(), (ip.isValid() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
|
||||
}
|
||||
|
||||
if (ip.getWeight() != oldIP.getWeight()) {
|
||||
// ip validation status updated
|
||||
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getDom().getName(), oldIP.toString(), ip.toString());
|
||||
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -163,7 +163,7 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
List<Instance> newIPs = subtract(ips, oldIPMap.values());
|
||||
if (newIPs.size() > 0) {
|
||||
Loggers.EVT_LOG.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}",
|
||||
getDom().getName(), getName(), newIPs.size(), newIPs.toString());
|
||||
getService().getName(), getName(), newIPs.size(), newIPs.toString());
|
||||
|
||||
for (Instance ip : newIPs) {
|
||||
HealthCheckStatus.reset(ip);
|
||||
@ -174,7 +174,7 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
|
||||
if (deadIPs.size() > 0) {
|
||||
Loggers.EVT_LOG.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}",
|
||||
getDom().getName(), getName(), deadIPs.size(), deadIPs.toString());
|
||||
getService().getName(), getName(), deadIPs.size(), deadIPs.toString());
|
||||
|
||||
for (Instance ip : deadIPs) {
|
||||
HealthCheckStatus.remv(ip);
|
||||
@ -281,37 +281,37 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
|
||||
if (!getHealthChecker().equals(cluster.getHealthChecker())) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}:, healthChecker: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), getHealthChecker().toString(), cluster.getHealthChecker().toString());
|
||||
cluster.getService().getName(), cluster.getName(), getHealthChecker().toString(), cluster.getHealthChecker().toString());
|
||||
setHealthChecker(cluster.getHealthChecker());
|
||||
}
|
||||
|
||||
if (defCkport != cluster.getDefCkport()) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}, defCkport: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), defCkport, cluster.getDefCkport());
|
||||
cluster.getService().getName(), cluster.getName(), defCkport, cluster.getDefCkport());
|
||||
defCkport = cluster.getDefCkport();
|
||||
}
|
||||
|
||||
if (defIPPort != cluster.getDefIPPort()) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}, defIPPort: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), defIPPort, cluster.getDefIPPort());
|
||||
cluster.getService().getName(), cluster.getName(), defIPPort, cluster.getDefIPPort());
|
||||
defIPPort = cluster.getDefIPPort();
|
||||
}
|
||||
|
||||
if (!StringUtils.equals(submask, cluster.getSubmask())) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}, submask: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), submask, cluster.getSubmask());
|
||||
cluster.getService().getName(), cluster.getName(), submask, cluster.getSubmask());
|
||||
submask = cluster.getSubmask();
|
||||
}
|
||||
|
||||
if (!StringUtils.equals(sitegroup, cluster.getSitegroup())) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}, sitegroup: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), sitegroup, cluster.getSitegroup());
|
||||
cluster.getService().getName(), cluster.getName(), sitegroup, cluster.getSitegroup());
|
||||
sitegroup = cluster.getSitegroup();
|
||||
}
|
||||
|
||||
if (isUseIPPort4Check() != cluster.isUseIPPort4Check()) {
|
||||
Loggers.SRV_LOG.info("[CLUSTER-UPDATE] {}:{}, useIPPort4Check: {} -> {}",
|
||||
cluster.getDom().getName(), cluster.getName(), isUseIPPort4Check(), cluster.isUseIPPort4Check());
|
||||
cluster.getService().getName(), cluster.getName(), isUseIPPort4Check(), cluster.isUseIPPort4Check());
|
||||
setUseIPPort4Check(cluster.isUseIPPort4Check());
|
||||
}
|
||||
|
||||
|
@ -73,6 +73,11 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
|
||||
private volatile String checksum;
|
||||
|
||||
/**
|
||||
* TODO set customized push expire time:
|
||||
*/
|
||||
private long pushCacheMillis = 0L;
|
||||
|
||||
private Map<String, Cluster> clusterMap = new HashMap<String, Cluster>();
|
||||
|
||||
public Service() {
|
||||
@ -184,7 +189,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
return (healthyInstanceCount() * 1.0 / allIPs().size()) < getProtectThreshold();
|
||||
}
|
||||
|
||||
public void updateIPs(Collection<Instance> ips, boolean ephemeral) {
|
||||
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
|
||||
// TODO prevent most of the instances from removed
|
||||
|
||||
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
|
||||
@ -192,32 +197,34 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
ipMap.put(clusterName, new ArrayList<>());
|
||||
}
|
||||
|
||||
for (Instance ip : ips) {
|
||||
for (Instance instance : instances) {
|
||||
try {
|
||||
if (ip == null) {
|
||||
if (instance == null) {
|
||||
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (StringUtils.isEmpty(ip.getClusterName())) {
|
||||
ip.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
if (StringUtils.isEmpty(instance.getClusterName())) {
|
||||
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
}
|
||||
|
||||
// put wild ip into DEFAULT cluster
|
||||
if (!clusterMap.containsKey(ip.getClusterName())) {
|
||||
Loggers.SRV_LOG.warn("cluster of IP not found: {}", ip);
|
||||
continue;
|
||||
if (!clusterMap.containsKey(instance.getClusterName())) {
|
||||
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
|
||||
instance.getClusterName(), instance.toJSON());
|
||||
Cluster cluster = new Cluster(instance.getClusterName());
|
||||
cluster.setService(this);
|
||||
getClusterMap().put(instance.getClusterName(), cluster);
|
||||
}
|
||||
|
||||
List<Instance> clusterIPs = ipMap.get(ip.getClusterName());
|
||||
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
|
||||
if (clusterIPs == null) {
|
||||
clusterIPs = new LinkedList<>();
|
||||
ipMap.put(ip.getClusterName(), clusterIPs);
|
||||
ipMap.put(instance.getClusterName(), clusterIPs);
|
||||
}
|
||||
|
||||
clusterIPs.add(ip);
|
||||
clusterIPs.add(instance);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + ip, e);
|
||||
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -244,7 +251,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
|
||||
|
||||
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
|
||||
entry.getValue().setDom(this);
|
||||
entry.getValue().setService(this);
|
||||
entry.getValue().init();
|
||||
}
|
||||
}
|
||||
@ -465,10 +472,10 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
Cluster oldCluster = clusterMap.get(cluster.getName());
|
||||
if (oldCluster != null) {
|
||||
oldCluster.update(cluster);
|
||||
oldCluster.setDom(this);
|
||||
oldCluster.setService(this);
|
||||
} else {
|
||||
cluster.init();
|
||||
cluster.setDom(this);
|
||||
cluster.setService(this);
|
||||
clusterMap.put(cluster.getName(), cluster);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.naming.cluster.ServerListManager;
|
||||
import com.alibaba.nacos.naming.cluster.ServerMode;
|
||||
import com.alibaba.nacos.naming.cluster.servers.Server;
|
||||
import com.alibaba.nacos.naming.consistency.ConsistencyService;
|
||||
import com.alibaba.nacos.naming.consistency.DataListener;
|
||||
@ -321,6 +322,23 @@ public class ServiceManager implements DataListener<Service> {
|
||||
consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
|
||||
}
|
||||
|
||||
public void createEmptyService(String namespaceId, String serviceName) throws NacosException {
|
||||
Service service = getService(namespaceId, serviceName);
|
||||
if (service == null) {
|
||||
service = new Service();
|
||||
service.setName(serviceName);
|
||||
service.setNamespaceId(namespaceId);
|
||||
// now validate the service. if failed, exception will be thrown
|
||||
service.setLastModifiedMillis(System.currentTimeMillis());
|
||||
service.recalculateChecksum();
|
||||
service.valid();
|
||||
putService(service);
|
||||
service.init();
|
||||
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
|
||||
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register an instance to a service in AP mode.
|
||||
* <p>
|
||||
@ -333,41 +351,15 @@ public class ServiceManager implements DataListener<Service> {
|
||||
*/
|
||||
public void registerInstance(String namespaceId, String serviceName, String clusterName, Instance instance) throws Exception {
|
||||
|
||||
if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {
|
||||
createEmptyService(namespaceId, serviceName);
|
||||
}
|
||||
|
||||
Service service = getService(namespaceId, serviceName);
|
||||
boolean serviceUpdated = false;
|
||||
|
||||
if (service == null) {
|
||||
service = new Service();
|
||||
service.setName(serviceName);
|
||||
service.setNamespaceId(namespaceId);
|
||||
// now validate the service. if failed, exception will be thrown
|
||||
service.setLastModifiedMillis(System.currentTimeMillis());
|
||||
service.recalculateChecksum();
|
||||
service.valid();
|
||||
serviceUpdated = true;
|
||||
}
|
||||
|
||||
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
|
||||
|
||||
Cluster cluster = new Cluster();
|
||||
cluster.setName(instance.getClusterName());
|
||||
cluster.setDom(service);
|
||||
cluster.init();
|
||||
|
||||
if (service.getClusterMap().containsKey(cluster.getName())) {
|
||||
service.getClusterMap().get(cluster.getName()).update(cluster);
|
||||
} else {
|
||||
service.getClusterMap().put(cluster.getName(), cluster);
|
||||
}
|
||||
|
||||
service.setLastModifiedMillis(System.currentTimeMillis());
|
||||
service.recalculateChecksum();
|
||||
service.valid();
|
||||
serviceUpdated = true;
|
||||
}
|
||||
|
||||
// only local memory is updated:
|
||||
if (serviceUpdated) {
|
||||
putService(service);
|
||||
throw new NacosException(NacosException.INVALID_PARAM,
|
||||
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
|
||||
}
|
||||
|
||||
if (service.allIPs().contains(instance)) {
|
||||
@ -383,11 +375,6 @@ public class ServiceManager implements DataListener<Service> {
|
||||
|
||||
Service service = getService(namespaceId, serviceName);
|
||||
|
||||
if (service == null) {
|
||||
throw new NacosException(NacosException.INVALID_PARAM,
|
||||
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
|
||||
}
|
||||
|
||||
Map<String, Instance> instanceMap = addIpAddresses(service, ephemeral, ips);
|
||||
|
||||
Instances instances = new Instances();
|
||||
@ -460,7 +447,7 @@ public class ServiceManager implements DataListener<Service> {
|
||||
for (Instance instance : ips) {
|
||||
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
|
||||
Cluster cluster = new Cluster(instance.getClusterName());
|
||||
cluster.setDom(service);
|
||||
cluster.setService(service);
|
||||
service.getClusterMap().put(instance.getClusterName(), cluster);
|
||||
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
|
||||
instance.getClusterName(), instance.toJSON());
|
||||
@ -508,6 +495,10 @@ public class ServiceManager implements DataListener<Service> {
|
||||
return chooseServiceMap(namespaceId).get(serviceName);
|
||||
}
|
||||
|
||||
public boolean containService(String namespaceId, String serviceName) {
|
||||
return getService(namespaceId, serviceName) != null;
|
||||
}
|
||||
|
||||
public void putService(Service service) {
|
||||
if (!serviceMap.containsKey(service.getNamespaceId())) {
|
||||
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
|
||||
|
@ -79,7 +79,7 @@ public class ClientBeatProcessor implements Runnable {
|
||||
if (!instance.isValid()) {
|
||||
instance.setValid(true);
|
||||
Loggers.EVT_LOG.info("dom: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
|
||||
cluster.getDom().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
|
||||
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
|
||||
getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
|
||||
}
|
||||
}
|
||||
|
@ -142,24 +142,24 @@ public class HealthCheckCommon {
|
||||
ip.setValid(true);
|
||||
ip.setMockValid(true);
|
||||
|
||||
Service vDom = cluster.getDom();
|
||||
Service vDom = cluster.getService();
|
||||
vDom.setLastModifiedMillis(System.currentTimeMillis());
|
||||
|
||||
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
|
||||
addResult(new HealthCheckResult(vDom.getName(), ip));
|
||||
|
||||
Loggers.EVT_LOG.info("dom: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
} else {
|
||||
if (!ip.isMockValid()) {
|
||||
ip.setMockValid(true);
|
||||
Loggers.EVT_LOG.info("dom: {} {PROBE} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Loggers.EVT_LOG.info("dom: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getOKCount(), msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getOKCount(), msg);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
@ -180,22 +180,22 @@ public class HealthCheckCommon {
|
||||
ip.setValid(false);
|
||||
ip.setMockValid(false);
|
||||
|
||||
Service vDom = cluster.getDom();
|
||||
Service vDom = cluster.getService();
|
||||
vDom.setLastModifiedMillis(System.currentTimeMillis());
|
||||
addResult(new HealthCheckResult(vDom.getName(), ip));
|
||||
|
||||
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
|
||||
|
||||
Loggers.EVT_LOG.info("dom: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
} else {
|
||||
Loggers.EVT_LOG.info("dom: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
}
|
||||
|
||||
} else {
|
||||
Loggers.EVT_LOG.info("dom: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getFailCount(), msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getFailCount(), msg);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
@ -215,19 +215,19 @@ public class HealthCheckCommon {
|
||||
ip.setValid(false);
|
||||
ip.setMockValid(false);
|
||||
|
||||
Service vDom = cluster.getDom();
|
||||
Service vDom = cluster.getService();
|
||||
vDom.setLastModifiedMillis(System.currentTimeMillis());
|
||||
|
||||
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
|
||||
addResult(new HealthCheckResult(vDom.getName(), ip));
|
||||
|
||||
Loggers.EVT_LOG.info("dom: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
} else {
|
||||
if (ip.isMockValid()) {
|
||||
ip.setMockValid(false);
|
||||
Loggers.EVT_LOG.info("dom: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -69,22 +69,22 @@ public class HealthCheckTask implements Runnable {
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
if (distroMapper.responsible(cluster.getDom().getName()) &&
|
||||
switchDomain.isHealthCheckEnabled(cluster.getDom().getName())) {
|
||||
if (distroMapper.responsible(cluster.getService().getName()) &&
|
||||
switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
|
||||
healthCheckProcessor.process(this);
|
||||
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getDom().getName());
|
||||
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}",
|
||||
cluster.getDom().getName(), cluster.getName(), e);
|
||||
cluster.getService().getName(), cluster.getName(), e);
|
||||
} finally {
|
||||
if (!cancelled) {
|
||||
HealthCheckReactor.scheduleCheck(this);
|
||||
|
||||
// worst == 0 means never checked
|
||||
if (this.getCheckRTWorst() > 0
|
||||
&& switchDomain.isHealthCheckEnabled(cluster.getDom().getName())
|
||||
&& distroMapper.responsible(cluster.getDom().getName())) {
|
||||
&& switchDomain.isHealthCheckEnabled(cluster.getService().getName())
|
||||
&& distroMapper.responsible(cluster.getService().getName())) {
|
||||
// TLog doesn't support float so we must convert it into long
|
||||
long diff = ((this.getCheckRTLast() - this.getCheckRTLastLast()) * 10000)
|
||||
/ this.getCheckRTLastLast();
|
||||
@ -93,7 +93,7 @@ public class HealthCheckTask implements Runnable {
|
||||
|
||||
Cluster cluster = this.getCluster();
|
||||
Loggers.CHECK_RT.info("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
|
||||
cluster.getDom().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
|
||||
cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
|
||||
this.getCheckRTNormalized(), this.getCheckRTWorst(), this.getCheckRTBest(),
|
||||
this.getCheckRTLast(), diff);
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public class HttpHealthCheckProcessor implements HealthCheckProcessor {
|
||||
|
||||
if (!ip.markChecking()) {
|
||||
SRV_LOG.warn("http check started before last one finished, dom: {}:{}:{}",
|
||||
task.getCluster().getDom().getName(), task.getCluster().getName(), ip.getIp());
|
||||
task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());
|
||||
|
||||
healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getHttpHealthParams());
|
||||
continue;
|
||||
|
@ -107,7 +107,7 @@ public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
|
||||
|
||||
if (!ip.markChecking()) {
|
||||
SRV_LOG.warn("mysql check started before last one finished, dom: {}:{}:{}",
|
||||
task.getCluster().getDom().getName(), task.getCluster().getName(), ip.getIp());
|
||||
task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());
|
||||
|
||||
healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getMysqlHealthParams());
|
||||
continue;
|
||||
@ -142,7 +142,7 @@ public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
|
||||
try {
|
||||
;
|
||||
Cluster cluster = task.getCluster();
|
||||
String key = cluster.getDom().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();
|
||||
String key = cluster.getService().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();
|
||||
Connection connection = CONNECTION_POOL.get(key);
|
||||
AbstractHealthChecker.Mysql config = (AbstractHealthChecker.Mysql) cluster.getHealthChecker();
|
||||
|
||||
|
@ -111,7 +111,7 @@ public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
|
||||
if (CollectionUtils.isEmpty(ips)) {
|
||||
return;
|
||||
}
|
||||
Service service = task.getCluster().getDom();
|
||||
Service service = task.getCluster().getService();
|
||||
|
||||
for (Instance ip : ips) {
|
||||
|
||||
@ -124,7 +124,7 @@ public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
|
||||
|
||||
if (!ip.markChecking()) {
|
||||
SRV_LOG.warn("tcp check started before last one finished, dom: "
|
||||
+ task.getCluster().getDom().getName() + ":"
|
||||
+ task.getCluster().getService().getName() + ":"
|
||||
+ task.getCluster().getName() + ":"
|
||||
+ ip.getIp() + ":"
|
||||
+ ip.getPort());
|
||||
@ -292,7 +292,7 @@ public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return task.getCluster().getDom().getName() + ":"
|
||||
return task.getCluster().getService().getName() + ":"
|
||||
+ task.getCluster().getName() + ":"
|
||||
+ ip.getIp() + ":"
|
||||
+ ip.getPort();
|
||||
|
@ -15,8 +15,7 @@
|
||||
*/
|
||||
package com.alibaba.nacos.naming.misc;
|
||||
|
||||
import com.alibaba.fastjson.annotation.JSONField;
|
||||
import com.alibaba.nacos.naming.consistency.DataListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
@ -26,7 +25,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* @author nacos
|
||||
*/
|
||||
@Component
|
||||
public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
public class SwitchDomain implements Cloneable {
|
||||
|
||||
public String name = UtilsAndCommons.SWITCH_DOMAIN_NAME;
|
||||
|
||||
@ -44,10 +43,6 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
|
||||
public String token = UtilsAndCommons.SUPER_TOKEN;
|
||||
|
||||
public Map<String, Long> cacheMillisMap = new HashMap<String, Long>();
|
||||
|
||||
public Map<String, Long> pushCacheMillisMap = new HashMap<String, Long>();
|
||||
|
||||
public boolean healthCheckEnabled = true;
|
||||
|
||||
public boolean distroEnabled = true;
|
||||
@ -64,16 +59,12 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
|
||||
private List<String> incrementalList = new ArrayList<>();
|
||||
|
||||
private boolean allDomNameCache = true;
|
||||
|
||||
public long serverStatusSynchronizationPeriodMillis = TimeUnit.SECONDS.toMillis(15);
|
||||
|
||||
public long domStatusSynchronizationPeriodMillis = TimeUnit.SECONDS.toMillis(5);
|
||||
|
||||
public boolean disableAddIP = false;
|
||||
|
||||
public boolean enableCache = true;
|
||||
|
||||
public boolean sendBeatOnly = false;
|
||||
|
||||
public Map<String, Integer> limitedUrlMap = new HashMap<>();
|
||||
@ -123,10 +114,6 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
this.clientBeatInterval = clientBeatInterval;
|
||||
}
|
||||
|
||||
public boolean isEnableCache() {
|
||||
return enableCache;
|
||||
}
|
||||
|
||||
public boolean isEnableStandalone() {
|
||||
return enableStandalone;
|
||||
}
|
||||
@ -153,7 +140,7 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
}
|
||||
|
||||
|
||||
public void update(SwitchDomain dom) {
|
||||
public void update(SwitchDomain domain) {
|
||||
|
||||
}
|
||||
|
||||
@ -161,12 +148,8 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
return incrementalList;
|
||||
}
|
||||
|
||||
public boolean isAllDomNameCache() {
|
||||
return allDomNameCache;
|
||||
}
|
||||
|
||||
public void setAllDomNameCache(boolean enable) {
|
||||
allDomNameCache = enable;
|
||||
public void setIncrementalList(List<String> incrementalList) {
|
||||
this.incrementalList = incrementalList;
|
||||
}
|
||||
|
||||
public List<String> getMasters() {
|
||||
@ -214,17 +197,7 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
}
|
||||
|
||||
public long getPushCacheMillis(String dom) {
|
||||
if (pushCacheMillisMap == null
|
||||
|| !pushCacheMillisMap.containsKey(dom)) {
|
||||
return defaultPushCacheMillis;
|
||||
}
|
||||
|
||||
return pushCacheMillisMap.get(dom);
|
||||
}
|
||||
|
||||
@JSONField(serialize = false)
|
||||
public void setPushCacheMillis(Long cacheMillis) {
|
||||
defaultPushCacheMillis = cacheMillis;
|
||||
return defaultPushCacheMillis;
|
||||
}
|
||||
|
||||
public boolean isHealthCheckEnabled() {
|
||||
@ -303,10 +276,6 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
this.disableAddIP = disableAddIP;
|
||||
}
|
||||
|
||||
public void setEnableCache(boolean enableCache) {
|
||||
this.enableCache = enableCache;
|
||||
}
|
||||
|
||||
public Map<String, Integer> getLimitedUrlMap() {
|
||||
return limitedUrlMap;
|
||||
}
|
||||
@ -371,28 +340,14 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
|
||||
this.serverMode = serverMode;
|
||||
}
|
||||
|
||||
public void replace(SwitchDomain newSwitchDomain) {
|
||||
// TODO
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean interests(String key) {
|
||||
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matchUnlistenKey(String key) {
|
||||
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChange(String key, SwitchDomain domain) throws Exception {
|
||||
update(domain);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(String key) throws Exception {
|
||||
|
||||
protected SwitchDomain clone() throws CloneNotSupportedException {
|
||||
return (SwitchDomain) super.clone();
|
||||
}
|
||||
|
||||
public interface HealthParams {
|
||||
|
@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.naming.cluster.ServerMode;
|
||||
import com.alibaba.nacos.naming.consistency.ConsistencyService;
|
||||
import com.alibaba.nacos.naming.consistency.DataListener;
|
||||
import com.alibaba.nacos.naming.consistency.Datum;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -39,7 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Component
|
||||
public class SwitchManager {
|
||||
public class SwitchManager implements DataListener<SwitchDomain> {
|
||||
|
||||
@Autowired
|
||||
private SwitchDomain switchDomain;
|
||||
@ -53,13 +54,13 @@ public class SwitchManager {
|
||||
public void init() {
|
||||
|
||||
try {
|
||||
consistencyService.listen(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
consistencyService.listen(UtilsAndCommons.getSwitchDomainKey(), this);
|
||||
} catch (NacosException e) {
|
||||
Loggers.SRV_LOG.error("listen switch domain failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void update(String entry, String value, boolean debug) throws NacosException {
|
||||
public void update(String entry, String value, boolean debug) throws Exception {
|
||||
|
||||
try {
|
||||
lock.lock();
|
||||
@ -68,10 +69,9 @@ public class SwitchManager {
|
||||
SwitchDomain switchDomain;
|
||||
|
||||
if (datum != null) {
|
||||
switchDomain = JSON.parseObject((String) datum.value, SwitchDomain.class);
|
||||
switchDomain = (SwitchDomain) datum.value;
|
||||
} else {
|
||||
Loggers.SRV_LOG.warn("switch domain is null");
|
||||
throw new NacosException(NacosException.SERVER_ERROR, "switch datum is null!");
|
||||
switchDomain = this.switchDomain.clone();
|
||||
}
|
||||
|
||||
if (SwitchEntry.BATCH.equals(entry)) {
|
||||
@ -98,9 +98,9 @@ public class SwitchManager {
|
||||
throw new IllegalArgumentException("malformed factor");
|
||||
}
|
||||
|
||||
switchDomain.replace(dom);
|
||||
update(dom);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(dom));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), dom);
|
||||
}
|
||||
|
||||
return;
|
||||
@ -117,29 +117,17 @@ public class SwitchManager {
|
||||
switchDomain.setDistroThreshold(threshold);
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (entry.equals(SwitchEntry.ENABLE_ALL_DOM_NAME_CACHE)) {
|
||||
Boolean enable = Boolean.parseBoolean(value);
|
||||
switchDomain.setAllDomNameCache(enable);
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry.equals(SwitchEntry.CLIENT_BEAT_INTERVAL)) {
|
||||
long clientBeatInterval = Long.parseLong(value);
|
||||
switchDomain.setClientBeatInterval(clientBeatInterval);
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -166,7 +154,7 @@ public class SwitchManager {
|
||||
}
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -178,9 +166,9 @@ public class SwitchManager {
|
||||
throw new IllegalArgumentException("min cache time for http or tcp is too small(<10000)");
|
||||
}
|
||||
|
||||
switchDomain.setPushCacheMillis(cacheMillis);
|
||||
switchDomain.setDefaultPushCacheMillis(cacheMillis);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -195,7 +183,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setDefaultCacheMillis(cacheMillis);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -205,7 +193,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setMasters(masters);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -215,7 +203,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setDistroEnabled(enabled);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -225,7 +213,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setHealthCheckEnabled(enabled);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -239,7 +227,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setDomStatusSynchronizationPeriodMillis(millis);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -253,7 +241,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setServerStatusSynchronizationPeriodMillis(millis);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -263,7 +251,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setCheckTimes(times);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -273,17 +261,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setDisableAddIP(disableAddIP);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry.equals(SwitchEntry.ENABLE_CACHE)) {
|
||||
boolean enableCache = Boolean.parseBoolean(value);
|
||||
|
||||
switchDomain.setEnableCache(enableCache);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -293,7 +271,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setSendBeatOnly(sendBeatOnly);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -326,7 +304,7 @@ public class SwitchManager {
|
||||
|
||||
switchDomain.setLimitedUrlMap(limitedUrlMap);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -340,7 +318,7 @@ public class SwitchManager {
|
||||
}
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
|
||||
return;
|
||||
@ -351,7 +329,7 @@ public class SwitchManager {
|
||||
switchDomain.setOverriddenServerStatus(status);
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
|
||||
return;
|
||||
@ -362,7 +340,7 @@ public class SwitchManager {
|
||||
switchDomain.setServerMode(ServerMode.valueOf(mode).name());
|
||||
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
|
||||
return;
|
||||
@ -376,6 +354,56 @@ public class SwitchManager {
|
||||
}
|
||||
|
||||
public void update(SwitchDomain newSwitchDomain) {
|
||||
switchDomain.setMasters(newSwitchDomain.getMasters());
|
||||
switchDomain.setAdWeightMap(newSwitchDomain.getAdWeightMap());
|
||||
switchDomain.setDefaultPushCacheMillis(newSwitchDomain.getDefaultPushCacheMillis());
|
||||
switchDomain.setClientBeatInterval(newSwitchDomain.getClientBeatInterval());
|
||||
switchDomain.setDefaultCacheMillis(newSwitchDomain.getDefaultCacheMillis());
|
||||
switchDomain.setDistroThreshold(newSwitchDomain.getDistroThreshold());
|
||||
switchDomain.setHealthCheckEnabled(newSwitchDomain.isHealthCheckEnabled());
|
||||
switchDomain.setDistroEnabled(newSwitchDomain.isDistroEnabled());
|
||||
switchDomain.setEnableStandalone(newSwitchDomain.isEnableStandalone());
|
||||
switchDomain.setCheckTimes(newSwitchDomain.getCheckTimes());
|
||||
switchDomain.setHttpHealthParams(newSwitchDomain.getHttpHealthParams());
|
||||
switchDomain.setTcpHealthParams(newSwitchDomain.getTcpHealthParams());
|
||||
switchDomain.setMysqlHealthParams(newSwitchDomain.getMysqlHealthParams());
|
||||
switchDomain.setIncrementalList(newSwitchDomain.getIncrementalList());
|
||||
switchDomain.setServerStatusSynchronizationPeriodMillis(newSwitchDomain.getServerStatusSynchronizationPeriodMillis());
|
||||
switchDomain.setDomStatusSynchronizationPeriodMillis(newSwitchDomain.getDomStatusSynchronizationPeriodMillis());
|
||||
switchDomain.setDisableAddIP(newSwitchDomain.isDisableAddIP());
|
||||
switchDomain.setSendBeatOnly(newSwitchDomain.isSendBeatOnly());
|
||||
switchDomain.setLimitedUrlMap(newSwitchDomain.getLimitedUrlMap());
|
||||
switchDomain.setDistroServerExpiredMillis(newSwitchDomain.getDistroServerExpiredMillis());
|
||||
switchDomain.setPushGoVersion(newSwitchDomain.getPushGoVersion());
|
||||
switchDomain.setPushJavaVersion(newSwitchDomain.getPushJavaVersion());
|
||||
switchDomain.setPushPythonVersion(newSwitchDomain.getPushPythonVersion());
|
||||
switchDomain.setPushCVersion(newSwitchDomain.getPushCVersion());
|
||||
switchDomain.setEnableAuthentication(newSwitchDomain.isEnableAuthentication());
|
||||
switchDomain.setOverriddenServerStatus(newSwitchDomain.getOverriddenServerStatus());
|
||||
switchDomain.setServerMode(newSwitchDomain.getServerMode());
|
||||
}
|
||||
|
||||
public SwitchDomain getSwitchDomain() {
|
||||
return switchDomain;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean interests(String key) {
|
||||
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matchUnlistenKey(String key) {
|
||||
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChange(String key, SwitchDomain domain) throws Exception {
|
||||
update(domain);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(String key) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -208,17 +208,6 @@ public class UtilsAndCommons {
|
||||
return strBuilder.toString();
|
||||
}
|
||||
|
||||
|
||||
public static String getIPListStoreKey(Service dom) {
|
||||
return UtilsAndCommons.IPADDRESS_DATA_ID_PRE + dom.getNamespaceId() +
|
||||
UtilsAndCommons.SERVICE_GROUP_CONNECTOR + dom.getName();
|
||||
}
|
||||
|
||||
public static String getDomStoreKey(Service dom) {
|
||||
return UtilsAndCommons.DOMAINS_DATA_ID_PRE + dom.getNamespaceId() +
|
||||
UtilsAndCommons.SERVICE_GROUP_CONNECTOR + dom.getName();
|
||||
}
|
||||
|
||||
public static String getSwitchDomainKey() {
|
||||
return UtilsAndCommons.DOMAINS_DATA_ID_PRE + UtilsAndCommons.SWITCH_DOMAIN_NAME;
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ public class InstanceControllerTest extends BaseTest {
|
||||
|
||||
Cluster cluster = new Cluster();
|
||||
cluster.setName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
cluster.setDom(domain);
|
||||
cluster.setService(domain);
|
||||
domain.addCluster(cluster);
|
||||
|
||||
Instance instance = new Instance();
|
||||
@ -118,7 +118,7 @@ public class InstanceControllerTest extends BaseTest {
|
||||
|
||||
Cluster cluster = new Cluster();
|
||||
cluster.setName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
cluster.setDom(domain);
|
||||
cluster.setService(domain);
|
||||
domain.addCluster(cluster);
|
||||
|
||||
Instance instance = new Instance();
|
||||
|
@ -38,7 +38,7 @@ public class ClusterTest {
|
||||
|
||||
cluster = new Cluster();
|
||||
cluster.setName("nacos-cluster-1");
|
||||
cluster.setDom(domain);
|
||||
cluster.setService(domain);
|
||||
cluster.setDefCkport(80);
|
||||
cluster.setDefIPPort(8080);
|
||||
}
|
||||
@ -59,7 +59,7 @@ public class ClusterTest {
|
||||
Service domain = new Service();
|
||||
domain.setName("nacos.domain.2");
|
||||
|
||||
newCluster.setDom(domain);
|
||||
newCluster.setService(domain);
|
||||
|
||||
cluster.update(newCluster);
|
||||
|
||||
|
@ -38,7 +38,7 @@ public class DomainTest {
|
||||
domain.setName("nacos.domain.1");
|
||||
Cluster cluster = new Cluster();
|
||||
cluster.setName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
cluster.setDom(domain);
|
||||
cluster.setService(domain);
|
||||
domain.addCluster(cluster);
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ public class DomainTest {
|
||||
newDomain.setProtectThreshold(0.7f);
|
||||
Cluster cluster = new Cluster();
|
||||
cluster.setName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
|
||||
cluster.setDom(newDomain);
|
||||
cluster.setService(newDomain);
|
||||
newDomain.addCluster(cluster);
|
||||
|
||||
domain.update(newDomain);
|
||||
|
@ -22,3 +22,5 @@ server.tomcat.basedir=
|
||||
nacos.naming.partition.taskDispatchThreadCount=10
|
||||
nacos.naming.partition.taskDispatchPeriod=200
|
||||
nacos.naming.partition.batchSyncKeyCount=1000
|
||||
nacos.naming.partition.initDataRatio=0.9
|
||||
nacos.naming.partition.syncRetryDelay=5000
|
||||
|
Loading…
Reference in New Issue
Block a user