Merge pull request #540 from alibaba/hotfix_server_side_check_lose_data
Hotfix server side check lose data
This commit is contained in:
commit
deadaebcd5
@ -165,6 +165,7 @@ public class NacosNamingService implements NamingService {
|
||||
beatInfo.setCluster(instance.getClusterName());
|
||||
beatInfo.setWeight(instance.getWeight());
|
||||
beatInfo.setMetadata(instance.getMetadata());
|
||||
beatInfo.setScheduled(false);
|
||||
|
||||
beatReactor.addBeatInfo(serviceName, beatInfo);
|
||||
|
||||
|
@ -30,6 +30,7 @@ public class BeatInfo {
|
||||
private String dom;
|
||||
private String cluster;
|
||||
private Map<String, String> metadata;
|
||||
private boolean scheduled;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@ -83,4 +84,12 @@ public class BeatInfo {
|
||||
public void setWeight(double weight) {
|
||||
this.weight = weight;
|
||||
}
|
||||
|
||||
public boolean isScheduled() {
|
||||
return scheduled;
|
||||
}
|
||||
|
||||
public void setScheduled(boolean scheduled) {
|
||||
this.scheduled = scheduled;
|
||||
}
|
||||
}
|
||||
|
@ -73,6 +73,10 @@ public class BeatReactor {
|
||||
try {
|
||||
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
|
||||
BeatInfo beatInfo = entry.getValue();
|
||||
if (beatInfo.isScheduled()) {
|
||||
continue;
|
||||
}
|
||||
beatInfo.setScheduled(true);
|
||||
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
|
||||
LogUtils.LOG.info("BEAT", "send beat to server: " + beatInfo.toString());
|
||||
}
|
||||
@ -83,6 +87,7 @@ public class BeatReactor {
|
||||
}
|
||||
|
||||
class BeatTask implements Runnable {
|
||||
|
||||
BeatInfo beatInfo;
|
||||
|
||||
public BeatTask(BeatInfo beatInfo) {
|
||||
@ -96,6 +101,7 @@ public class BeatReactor {
|
||||
params.put("dom", beatInfo.getDom());
|
||||
|
||||
try {
|
||||
beatInfo.setScheduled(false);
|
||||
String result = serverProxy.callAllServers(UtilAndComs.NACOS_URL_BASE + "/api/clientBeat", params);
|
||||
JSONObject jsonObject = JSON.parseObject(result);
|
||||
|
||||
|
@ -58,9 +58,9 @@
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/naming-raft.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
|
||||
<maxFileSize>20MB</maxFileSize>
|
||||
<maxFileSize>1GB</maxFileSize>
|
||||
<MaxHistory>15</MaxHistory>
|
||||
<totalSizeCap>128MB</totalSizeCap>
|
||||
<totalSizeCap>3GB</totalSizeCap>
|
||||
<cleanHistoryOnStart>true</cleanHistoryOnStart>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
@ -82,9 +82,9 @@
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/naming-event.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
|
||||
<maxFileSize>20MB</maxFileSize>
|
||||
<maxFileSize>1GB</maxFileSize>
|
||||
<MaxHistory>15</MaxHistory>
|
||||
<totalSizeCap>128MB</totalSizeCap>
|
||||
<totalSizeCap>3GB</totalSizeCap>
|
||||
<cleanHistoryOnStart>true</cleanHistoryOnStart>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
@ -106,9 +106,9 @@
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/naming-push.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
|
||||
<maxFileSize>20MB</maxFileSize>
|
||||
<maxFileSize>1GB</maxFileSize>
|
||||
<MaxHistory>15</MaxHistory>
|
||||
<totalSizeCap>128MB</totalSizeCap>
|
||||
<totalSizeCap>3GB</totalSizeCap>
|
||||
<cleanHistoryOnStart>true</cleanHistoryOnStart>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
@ -139,9 +139,9 @@
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/naming-performance.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
|
||||
<maxFileSize>50MB</maxFileSize>
|
||||
<maxFileSize>1GB</maxFileSize>
|
||||
<MaxHistory>15</MaxHistory>
|
||||
<totalSizeCap>512MB</totalSizeCap>
|
||||
<totalSizeCap>3GB</totalSizeCap>
|
||||
<cleanHistoryOnStart>true</cleanHistoryOnStart>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
@ -241,9 +241,9 @@
|
||||
<append>true</append>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/naming-debug.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
|
||||
<maxFileSize>20MB</maxFileSize>
|
||||
<maxFileSize>1GB</maxFileSize>
|
||||
<MaxHistory>15</MaxHistory>
|
||||
<totalSizeCap>128MB</totalSizeCap>
|
||||
<totalSizeCap>3GB</totalSizeCap>
|
||||
<cleanHistoryOnStart>true</cleanHistoryOnStart>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
|
@ -25,6 +25,7 @@ import com.alibaba.nacos.naming.core.VirtualClusterDomain;
|
||||
import com.alibaba.nacos.naming.exception.NacosException;
|
||||
import com.alibaba.nacos.naming.healthcheck.HealthCheckMode;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import com.alibaba.nacos.naming.raft.RaftCore;
|
||||
import com.alibaba.nacos.naming.selector.LabelSelector;
|
||||
import com.alibaba.nacos.naming.selector.NoneSelector;
|
||||
import com.alibaba.nacos.naming.selector.Selector;
|
||||
@ -130,6 +131,9 @@ public class ServiceController {
|
||||
|
||||
res.put("selector", domain.getSelector());
|
||||
|
||||
res.put("instanceTimestamp", RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(domain)).timestamp.get());
|
||||
res.put("serviceTimestamp", RaftCore.getDatum(UtilsAndCommons.getDomStoreKey(domain)).timestamp.get());
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ public class DomainsManager {
|
||||
* thread pool that processes getting domain detail from other server asynchronously
|
||||
*/
|
||||
private ExecutorService domainUpdateExecutor
|
||||
= Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() {
|
||||
= Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
@ -202,8 +202,8 @@ public class DomainsManager {
|
||||
if (valid != ipAddress.isValid()) {
|
||||
ipAddress.setValid(valid);
|
||||
Loggers.EVT_LOG.info("{" + domName + "} {SYNC} " +
|
||||
"{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp()
|
||||
+ ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName());
|
||||
"{IP-" + (ipAddress.isValid() ? "ENABLED" : "DISABLED") + "} " + ipAddress.getIp()
|
||||
+ ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,73 +284,106 @@ public class DomainsManager {
|
||||
virtualClusterDomain = (VirtualClusterDomain) newDom;
|
||||
newDom = virtualClusterDomain;
|
||||
}
|
||||
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom));
|
||||
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom), true);
|
||||
}
|
||||
|
||||
public void easyAddIP4Dom(String domName, List<IpAddress> ips, long timestamp, long term) throws Exception {
|
||||
public void easyAddIP4Dom(String domName, List<IpAddress> ips, long term) throws Exception {
|
||||
easyUpdateIP4Dom(domName, ips, term, "add");
|
||||
}
|
||||
|
||||
public void easyRemvIP4Dom(String domName, List<IpAddress> ips, long term) throws Exception {
|
||||
easyUpdateIP4Dom(domName, ips, term, "remove");
|
||||
}
|
||||
|
||||
public void easyUpdateIP4Dom(String domName, List<IpAddress> ips, long term, String action) throws Exception {
|
||||
|
||||
VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap().get(domName);
|
||||
if (dom == null) {
|
||||
throw new IllegalArgumentException("dom doesn't exist: " + domName);
|
||||
}
|
||||
|
||||
Datum datum1 = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom));
|
||||
String oldJson = StringUtils.EMPTY;
|
||||
try {
|
||||
|
||||
if (datum1 != null) {
|
||||
oldJson = datum1.value;
|
||||
}
|
||||
|
||||
List<IpAddress> ipAddresses;
|
||||
List<IpAddress> currentIPs = dom.allIPs();
|
||||
Map<String, IpAddress> map = new ConcurrentHashMap(currentIPs.size());
|
||||
|
||||
for (IpAddress ipAddress : currentIPs) {
|
||||
map.put(ipAddress.toIPAddr(), ipAddress);
|
||||
}
|
||||
|
||||
ipAddresses = setValid(oldJson, map);
|
||||
|
||||
Map<String, IpAddress> ipAddressMap = new HashMap<String, IpAddress>(ipAddresses.size());
|
||||
|
||||
for (IpAddress ipAddress : ipAddresses) {
|
||||
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
|
||||
}
|
||||
|
||||
for (IpAddress ipAddress : ips) {
|
||||
if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) {
|
||||
Cluster cluster = new Cluster(ipAddress.getClusterName());
|
||||
cluster.setDom(dom);
|
||||
dom.getClusterMap().put(ipAddress.getClusterName(), cluster);
|
||||
Loggers.SRV_LOG.warn("cluster: " + ipAddress.getClusterName() + " not found, ip: " + ipAddress.toJSON()
|
||||
+ ", will create new cluster with default configuration.");
|
||||
if (!dom.getEnableClientBeat()) {
|
||||
getDom2LockMap().get(domName).lock();
|
||||
}
|
||||
|
||||
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
|
||||
}
|
||||
Datum datum1 = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom));
|
||||
String oldJson = StringUtils.EMPTY;
|
||||
|
||||
if (ipAddressMap.size() <= 0) {
|
||||
throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: "
|
||||
if (datum1 != null) {
|
||||
oldJson = datum1.value;
|
||||
}
|
||||
|
||||
List<IpAddress> ipAddresses;
|
||||
List<IpAddress> currentIPs = dom.allIPs();
|
||||
Map<String, IpAddress> map = new ConcurrentHashMap(currentIPs.size());
|
||||
|
||||
for (IpAddress ipAddress : currentIPs) {
|
||||
map.put(ipAddress.toIPAddr(), ipAddress);
|
||||
}
|
||||
|
||||
ipAddresses = setValid(oldJson, map);
|
||||
|
||||
Map<String, IpAddress> ipAddressMap = new HashMap<String, IpAddress>(ipAddresses.size());
|
||||
|
||||
for (IpAddress ipAddress : ipAddresses) {
|
||||
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
|
||||
}
|
||||
|
||||
for (IpAddress ipAddress : ips) {
|
||||
if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) {
|
||||
Cluster cluster = new Cluster(ipAddress.getClusterName());
|
||||
cluster.setDom(dom);
|
||||
dom.getClusterMap().put(ipAddress.getClusterName(), cluster);
|
||||
Loggers.SRV_LOG.warn("cluster: " + ipAddress.getClusterName() + " not found, ip: " + ipAddress.toJSON()
|
||||
+ ", will create new cluster with default configuration.");
|
||||
}
|
||||
|
||||
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
|
||||
ipAddressMap.remove(ipAddress.getDatumKey());
|
||||
} else {
|
||||
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (ipAddressMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
|
||||
throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: "
|
||||
+ JSON.toJSONString(ipAddressMap.values()));
|
||||
}
|
||||
|
||||
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-UPDATE}" + ips +
|
||||
", action:" + action);
|
||||
|
||||
String key = UtilsAndCommons.getIPListStoreKey(dom);
|
||||
String value = JSON.toJSONString(ipAddressMap.values());
|
||||
|
||||
Datum datum = new Datum();
|
||||
datum.key = key;
|
||||
datum.value = value;
|
||||
|
||||
datum.timestamp.set(datum1 == null ? 1 : datum1.timestamp.get() + 1);
|
||||
|
||||
Loggers.RAFT.info("datum " + key + " updated:" + datum.timestamp.get());
|
||||
|
||||
RaftPeer peer = new RaftPeer();
|
||||
peer.ip = RaftCore.getLeader().ip;
|
||||
peer.term.set(term);
|
||||
peer.voteFor = RaftCore.getLeader().voteFor;
|
||||
peer.heartbeatDueMs = RaftCore.getLeader().heartbeatDueMs;
|
||||
peer.leaderDueMs = RaftCore.getLeader().leaderDueMs;
|
||||
peer.state = RaftCore.getLeader().state;
|
||||
|
||||
boolean increaseTerm = !((VirtualClusterDomain) getDomain(domName)).getEnableClientBeat();
|
||||
|
||||
RaftCore.onPublish(datum, peer, increaseTerm);
|
||||
} finally {
|
||||
if (!dom.getEnableClientBeat()) {
|
||||
getDom2LockMap().get(domName).unlock();
|
||||
}
|
||||
}
|
||||
|
||||
String key = UtilsAndCommons.getIPListStoreKey(dom);
|
||||
String value = JSON.toJSONString(ipAddressMap.values());
|
||||
|
||||
Datum datum = new Datum();
|
||||
datum.key = key;
|
||||
datum.value = value;
|
||||
datum.timestamp.set(timestamp);
|
||||
|
||||
RaftPeer peer = new RaftPeer();
|
||||
peer.ip = RaftCore.getLeader().ip;
|
||||
peer.term.set(term);
|
||||
peer.voteFor = RaftCore.getLeader().voteFor;
|
||||
peer.heartbeatDueMs = RaftCore.getLeader().heartbeatDueMs;
|
||||
peer.leaderDueMs = RaftCore.getLeader().leaderDueMs;
|
||||
peer.state = RaftCore.getLeader().state;
|
||||
|
||||
RaftCore.onPublish(datum, peer);
|
||||
}
|
||||
|
||||
private List<IpAddress> setValid(String oldJson, Map<String, IpAddress> map) {
|
||||
@ -378,47 +411,6 @@ public class DomainsManager {
|
||||
return ipAddresses;
|
||||
}
|
||||
|
||||
public void easyRemvIP4Dom(String domName, List<IpAddress> ips) throws Exception {
|
||||
Lock lock = dom2LockMap.get(domName);
|
||||
if (lock == null) {
|
||||
throw new IllegalStateException("no lock for " + domName + ", operation is disabled now.");
|
||||
}
|
||||
|
||||
try {
|
||||
lock.lock();
|
||||
Domain dom = chooseDomMap().get(domName);
|
||||
if (dom == null) {
|
||||
throw new IllegalArgumentException("domain doesn't exist: " + domName);
|
||||
}
|
||||
|
||||
Datum datum = RaftCore.getDatum(UtilsAndCommons.getIPListStoreKey(dom));
|
||||
String oldJson = StringUtils.EMPTY;
|
||||
List<IpAddress> currentIPs = dom.allIPs();
|
||||
|
||||
if (currentIPs.size() <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, IpAddress> map = new ConcurrentHashMap<String, IpAddress>(currentIPs.size());
|
||||
|
||||
for (IpAddress ipAddress : currentIPs) {
|
||||
map.put(ipAddress.toIPAddr(), ipAddress);
|
||||
}
|
||||
|
||||
if (datum != null) {
|
||||
oldJson = datum.value;
|
||||
}
|
||||
|
||||
List<IpAddress> ipAddrs = setValid(oldJson, map);
|
||||
|
||||
ipAddrs.removeAll(ips);
|
||||
|
||||
RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs));
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Domain getDomain(String domName) {
|
||||
return chooseDomMap().get(domName);
|
||||
}
|
||||
|
@ -178,7 +178,9 @@ public class VirtualClusterDomain implements Domain, RaftListener {
|
||||
|
||||
List<IpAddress> ips = JSON.parseObject(value, new TypeReference<List<IpAddress>>() {
|
||||
});
|
||||
|
||||
for (IpAddress ip : ips) {
|
||||
|
||||
if (ip.getWeight() > 10000.0D) {
|
||||
ip.setWeight(10000.0D);
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ public class HttpClient {
|
||||
|
||||
return getResult(conn);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.warn("VIPSRV", "Exception while request: " + url + ", caused: " + e.getMessage());
|
||||
Loggers.SRV_LOG.warn("VIPSRV {}", "Exception while request: " + url + ", caused: " + e.getMessage());
|
||||
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
|
@ -135,7 +135,7 @@ public class Switch {
|
||||
|
||||
public static void save() {
|
||||
try {
|
||||
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom));
|
||||
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom), true);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("VIPSRV-SWITCH", "failed to save switch", e);
|
||||
}
|
||||
|
@ -104,6 +104,10 @@ public class UtilsAndCommons {
|
||||
|
||||
public static final String API_DOM = "/api/dom";
|
||||
|
||||
public static final String UPDATE_INSTANCE_ACTION_ADD = "add";
|
||||
|
||||
public static final String UPDATE_INSTANCE_ACTION_REMOVE = "remove";
|
||||
|
||||
public static final String INSTANCE_LIST_PERSISTED_PROPERTY_KEY = "nacos.instanceListPersisted";
|
||||
|
||||
public static final boolean INSTANCE_LIST_PERSISTED = Boolean.getBoolean(INSTANCE_LIST_PERSISTED_PROPERTY_KEY);
|
||||
|
@ -114,13 +114,13 @@ public class PushService {
|
||||
try {
|
||||
removeClientIfZombie();
|
||||
} catch (Throwable e) {
|
||||
Loggers.PUSH.warn("VIPSRV-PUSH", "failed to remove client zombied");
|
||||
Loggers.PUSH.warn("VIPSRV-PUSH {}", "failed to remove client zombied");
|
||||
}
|
||||
}
|
||||
}, 0, 20, TimeUnit.SECONDS);
|
||||
|
||||
} catch (SocketException e) {
|
||||
Loggers.SRV_LOG.error("VIPSRV-PUSH", "failed to init push service");
|
||||
Loggers.SRV_LOG.error("VIPSRV-PUSH {}", "failed to init push service");
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +181,7 @@ public class PushService {
|
||||
size += clientConcurrentMap.size();
|
||||
}
|
||||
|
||||
Loggers.PUSH.info("VIPSRV-PUSH", "clientMap size: " + size);
|
||||
Loggers.PUSH.info("VIPSRV-PUSH {}", "clientMap size: " + size);
|
||||
|
||||
}
|
||||
|
||||
@ -221,7 +221,9 @@ public class PushService {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Loggers.PUSH.info(dom + " is changed, add it to push queue.");
|
||||
if (Loggers.PUSH.isDebugEnabled()) {
|
||||
Loggers.PUSH.debug(dom + " is changed, add it to push queue.");
|
||||
}
|
||||
ConcurrentMap<String, PushClient> clients = clientMap.get(dom);
|
||||
if (MapUtils.isEmpty(clients)) {
|
||||
return;
|
||||
@ -247,7 +249,7 @@ public class PushService {
|
||||
compressData = (byte[]) (pair.getValue0());
|
||||
data = (Map<String, Object>) pair.getValue1();
|
||||
|
||||
Loggers.PUSH.debug("PUSH-CACHE", "cache hit: " + dom + ":" + client.getAddrStr());
|
||||
Loggers.PUSH.debug("PUSH-CACHE {}", "cache hit: " + dom + ":" + client.getAddrStr());
|
||||
}
|
||||
|
||||
if (compressData != null) {
|
||||
|
@ -86,11 +86,11 @@ public class RaftCore {
|
||||
|
||||
private static volatile List<RaftListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
private static ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<String, Datum>();
|
||||
private static volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<String, Datum>();
|
||||
|
||||
private static PeerSet peers = new PeerSet();
|
||||
|
||||
private static volatile Notifier notifier = new Notifier();
|
||||
public static volatile Notifier notifier = new Notifier();
|
||||
|
||||
public static void init() throws Exception {
|
||||
|
||||
@ -139,44 +139,6 @@ public class RaftCore {
|
||||
return listeners;
|
||||
}
|
||||
|
||||
/**
|
||||
* will return success once local writes success instead of the majority,
|
||||
* therefore is unsafe
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void unsafePublish(String key, String value) throws Exception {
|
||||
OPERATE_LOCK.lock();
|
||||
|
||||
try {
|
||||
if (!RaftCore.isLeader()) {
|
||||
JSONObject params = new JSONObject();
|
||||
params.put("key", key);
|
||||
params.put("value", value);
|
||||
|
||||
Map<String, String> parameters = new HashMap<>(1);
|
||||
parameters.put("key", key);
|
||||
RaftProxy.proxyPostLarge(API_UNSF_PUB, params.toJSONString(), parameters);
|
||||
|
||||
if (!RaftCore.isLeader()) {
|
||||
throw new IllegalStateException("I'm not leader, can not handle update/delete operation");
|
||||
}
|
||||
}
|
||||
|
||||
Datum datum = new Datum();
|
||||
datum.key = key;
|
||||
datum.value = value;
|
||||
datum.timestamp.set(RaftCore.getDatum(key).timestamp.incrementAndGet());
|
||||
|
||||
RaftPeer local = peers.local();
|
||||
|
||||
onPublish(datum, local);
|
||||
} finally {
|
||||
OPERATE_LOCK.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static void signalPublish(String key, String value) throws Exception {
|
||||
|
||||
@ -194,8 +156,9 @@ public class RaftCore {
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("datum", datum);
|
||||
json.put("source", peers.local());
|
||||
json.put("increaseTerm", false);
|
||||
|
||||
onPublish(datum, peers.local());
|
||||
onPublish(datum, peers.local(), false);
|
||||
|
||||
final String content = JSON.toJSONString(json);
|
||||
|
||||
@ -223,15 +186,17 @@ public class RaftCore {
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key);
|
||||
|
||||
if (Loggers.RAFT.isDebugEnabled()) {
|
||||
Loggers.RAFT.debug("signalPublish cost " + (end - start) + " ms" + " : " + key);
|
||||
}
|
||||
}
|
||||
|
||||
public static void doSignalPublish(String key, String value) throws Exception {
|
||||
public static void doSignalPublish(String key, String value, boolean locked) throws Exception {
|
||||
if (!RaftCore.isLeader()) {
|
||||
JSONObject params = new JSONObject();
|
||||
params.put("key", key);
|
||||
params.put("value", value);
|
||||
params.put("locked", locked);
|
||||
Map<String, String> parameters = new HashMap<>(1);
|
||||
parameters.put("key", key);
|
||||
|
||||
@ -244,7 +209,7 @@ public class RaftCore {
|
||||
throw new IllegalStateException("I'm not leader, can not handle update/delete operation");
|
||||
}
|
||||
|
||||
if (key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
|
||||
if (locked) {
|
||||
signalPublishLocked(key, value);
|
||||
} else {
|
||||
signalPublish(key, value);
|
||||
@ -268,8 +233,9 @@ public class RaftCore {
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("datum", datum);
|
||||
json.put("source", peers.local());
|
||||
json.put("increaseTerm", true);
|
||||
|
||||
onPublish(datum, peers.local());
|
||||
onPublish(datum, peers.local(), true);
|
||||
|
||||
final String content = JSON.toJSONString(json);
|
||||
|
||||
@ -306,7 +272,9 @@ public class RaftCore {
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key);
|
||||
if (Loggers.RAFT.isDebugEnabled()) {
|
||||
Loggers.RAFT.debug("signalPublish cost " + (end - start) + " ms" + " : " + key);
|
||||
}
|
||||
} finally {
|
||||
RaftCore.OPERATE_LOCK.unlock();
|
||||
}
|
||||
@ -357,13 +325,13 @@ public class RaftCore {
|
||||
}
|
||||
}
|
||||
|
||||
public static void onPublish(JSONObject json) throws Exception {
|
||||
public static void onPublish(JSONObject json, boolean increaseTerm) throws Exception {
|
||||
Datum datum = JSON.parseObject(json.getString("datum"), Datum.class);
|
||||
RaftPeer source = JSON.parseObject(json.getString("source"), RaftPeer.class);
|
||||
onPublish(datum, source);
|
||||
onPublish(datum, source, increaseTerm);
|
||||
}
|
||||
|
||||
public static void onPublish(Datum datum, RaftPeer source) throws Exception {
|
||||
public static void onPublish(Datum datum, RaftPeer source, boolean increaseTerm) throws Exception {
|
||||
RaftPeer local = peers.local();
|
||||
if (StringUtils.isBlank(datum.value)) {
|
||||
Loggers.RAFT.warn("received empty datum");
|
||||
@ -386,15 +354,6 @@ public class RaftCore {
|
||||
|
||||
local.resetLeaderDue();
|
||||
|
||||
Datum datumOrigin = RaftCore.getDatum(datum.key);
|
||||
|
||||
if (datumOrigin != null && datumOrigin.timestamp.get() > datum.timestamp.get()) {
|
||||
// refuse operation:
|
||||
Loggers.RAFT.warn("out of date publish, pub-timestamp:"
|
||||
+ datumOrigin.timestamp.get() + ", cur-timestamp: " + datum.timestamp.get());
|
||||
return;
|
||||
}
|
||||
|
||||
// do apply
|
||||
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID) || UtilsAndCommons.INSTANCE_LIST_PERSISTED) {
|
||||
RaftStore.write(datum);
|
||||
@ -402,7 +361,7 @@ public class RaftCore {
|
||||
|
||||
RaftCore.datums.put(datum.key, datum);
|
||||
|
||||
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
|
||||
if (increaseTerm) {
|
||||
if (isLeader()) {
|
||||
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
|
||||
} else {
|
||||
@ -419,7 +378,7 @@ public class RaftCore {
|
||||
|
||||
notifier.addTask(datum, Notifier.ApplyAction.CHANGE);
|
||||
|
||||
Loggers.RAFT.info("data added/updated, key=" + datum.key + ", term: " + local.term);
|
||||
Loggers.RAFT.info("data added/updated, key=" + datum.key + ", term: " + local.term + ", increaseTerm:" + increaseTerm);
|
||||
}
|
||||
|
||||
public static void onDelete(JSONObject params) throws Exception {
|
||||
@ -1024,6 +983,10 @@ public class RaftCore {
|
||||
tasks.add(Pair.with(datum, action));
|
||||
}
|
||||
|
||||
public int getTaskSize() {
|
||||
return tasks.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Loggers.RAFT.info("raft notifier started");
|
||||
@ -1046,7 +1009,9 @@ public class RaftCore {
|
||||
for (RaftListener listener : listeners) {
|
||||
|
||||
if (listener instanceof VirtualClusterDomain) {
|
||||
Loggers.RAFT.debug("listener: " + ((VirtualClusterDomain) listener).getName());
|
||||
if (Loggers.RAFT.isDebugEnabled()) {
|
||||
Loggers.RAFT.debug("listener: " + ((VirtualClusterDomain) listener).getName());
|
||||
}
|
||||
}
|
||||
|
||||
if (!listener.interests(datum.key)) {
|
||||
@ -1071,8 +1036,10 @@ public class RaftCore {
|
||||
}
|
||||
}
|
||||
|
||||
Loggers.RAFT.debug("VIPSRV-RAFT", "datum change notified" +
|
||||
if (Loggers.RAFT.isDebugEnabled()) {
|
||||
Loggers.RAFT.debug("VIPSRV-RAFT", "datum change notified" +
|
||||
", key: " + datum.key + "; listener count: " + count);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.RAFT.error("VIPSRV-RAFT", "Error while handling notifying task", e);
|
||||
}
|
||||
|
@ -65,6 +65,7 @@ import java.security.AccessControlException;
|
||||
import java.security.InvalidParameterException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
@ -148,7 +149,7 @@ public class ApiCommands {
|
||||
String dom = WebUtils.required(request, "dom");
|
||||
|
||||
VirtualClusterDomain domObj
|
||||
= (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
= (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
if (domObj == null) {
|
||||
throw new IllegalArgumentException("request dom doesn't exist");
|
||||
}
|
||||
@ -289,7 +290,7 @@ public class ApiCommands {
|
||||
clusterName = UtilsAndCommons.DEFAULT_CLUSTER_NAME;
|
||||
}
|
||||
|
||||
Loggers.DEBUG_LOG.debug("[CLIENT-BEAT] full arguments: beat: " + clientBeat + ", serviceName:" + dom);
|
||||
Loggers.DEBUG_LOG.debug("[CLIENT-BEAT] full arguments: beat: " + clientBeat + ", serviceName:" + dom + ", client:" + request.getRemoteAddr());
|
||||
|
||||
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
Map<String, String[]> stringMap = new HashMap<>(16);
|
||||
@ -323,7 +324,15 @@ public class ApiCommands {
|
||||
doAddCluster4Dom(MockHttpRequest.buildRequest(stringMap));
|
||||
}
|
||||
|
||||
JSONObject result = new JSONObject();
|
||||
|
||||
result.put("clientBeatInterval", Switch.getClientBeatInterval());
|
||||
|
||||
if (!virtualClusterDomain.allIPs().contains(ipAddress)) {
|
||||
|
||||
if (!virtualClusterDomain.getEnableClientBeat()) {
|
||||
return result;
|
||||
}
|
||||
stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1]));
|
||||
stringMap.put("json", Arrays.asList("true").toArray(new String[1]));
|
||||
addIP4Dom(MockHttpRequest.buildRequest(stringMap));
|
||||
@ -345,7 +354,7 @@ public class ApiCommands {
|
||||
}
|
||||
|
||||
String url = "http://" + server + RunningConfig.getContextPath()
|
||||
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/clientBeat";
|
||||
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/clientBeat";
|
||||
HttpClient.HttpResult httpResult = HttpClient.httpGet(url, null, proxyParams);
|
||||
|
||||
if (httpResult.code != HttpURLConnection.HTTP_OK) {
|
||||
@ -357,10 +366,6 @@ public class ApiCommands {
|
||||
}
|
||||
}
|
||||
|
||||
JSONObject result = new JSONObject();
|
||||
|
||||
result.put("clientBeatInterval", Switch.getClientBeatInterval());
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -632,7 +637,7 @@ public class ApiCommands {
|
||||
String setSiteGroupForce = WebUtils.optional(request, "setSiteGroupForce", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(sitegroup) || !StringUtils.isEmpty(setSiteGroupForce)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -643,7 +648,7 @@ public class ApiCommands {
|
||||
String cktype = WebUtils.optional(request, "cktype", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(cktype)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -672,7 +677,7 @@ public class ApiCommands {
|
||||
String defIPPort = WebUtils.optional(request, "defIPPort", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(defIPPort)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -683,7 +688,7 @@ public class ApiCommands {
|
||||
String submask = WebUtils.optional(request, "submask", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(submask)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -694,7 +699,7 @@ public class ApiCommands {
|
||||
String ipPort4Check = WebUtils.optional(request, "ipPort4Check", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(ipPort4Check)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -705,7 +710,7 @@ public class ApiCommands {
|
||||
String defCkPort = WebUtils.optional(request, "defCkPort", StringUtils.EMPTY);
|
||||
if (!StringUtils.isEmpty(defCkPort)) {
|
||||
Cluster cluster
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
= dom.getClusterMap().get(WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME));
|
||||
if (cluster == null) {
|
||||
throw new IllegalStateException("cluster not found");
|
||||
}
|
||||
@ -758,7 +763,7 @@ public class ApiCommands {
|
||||
public JSONObject hello(HttpServletRequest request) {
|
||||
JSONObject result = new JSONObject();
|
||||
result.put("msg", "Hello! I am Nacos-Naming and healthy! total services: raft " + domainsManager.getRaftDomMap().size()
|
||||
+ ", local port:" + RunningConfig.getServerPort());
|
||||
+ ", local port:" + RunningConfig.getServerPort());
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -817,16 +822,16 @@ public class ApiCommands {
|
||||
|
||||
if (!RaftCore.isLeader(clientIP)) {
|
||||
Loggers.RAFT.warn("peer(" + JSON.toJSONString(clientIP) + ") tried to publish " +
|
||||
"data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader()));
|
||||
"data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader()));
|
||||
throw new IllegalStateException("peer(" + clientIP + ") tried to publish " +
|
||||
"data but wasn't leader");
|
||||
"data but wasn't leader");
|
||||
}
|
||||
|
||||
if (term < RaftCore.getPeerSet().local().term.get()) {
|
||||
Loggers.RAFT.warn("out of date publish, pub-term: "
|
||||
+ JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local()));
|
||||
+ JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local()));
|
||||
throw new IllegalStateException("out of date publish, pub-term:"
|
||||
+ term + ", cur-term: " + RaftCore.getPeerSet().local().term.get());
|
||||
+ term + ", cur-term: " + RaftCore.getPeerSet().local().term.get());
|
||||
}
|
||||
|
||||
RaftCore.getPeerSet().local().resetLeaderDue();
|
||||
@ -853,8 +858,6 @@ public class ApiCommands {
|
||||
}
|
||||
}
|
||||
|
||||
long timestamp = Long.parseLong(WebUtils.required(request, "timestamp"));
|
||||
|
||||
if (CollectionUtils.isEmpty(newIPs)) {
|
||||
throw new IllegalArgumentException("Empty ip list");
|
||||
}
|
||||
@ -865,10 +868,10 @@ public class ApiCommands {
|
||||
Collection diff = CollectionUtils.subtract(newIPs, oldIPs);
|
||||
if (diff.size() != 0) {
|
||||
throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray())
|
||||
+ ", if you want to add them, remove updateOnly flag");
|
||||
+ ", if you want to add them, remove updateOnly flag");
|
||||
}
|
||||
}
|
||||
domainsManager.easyAddIP4Dom(dom, newIPs, timestamp, term);
|
||||
domainsManager.easyAddIP4Dom(dom, newIPs, term);
|
||||
|
||||
return "ok";
|
||||
}
|
||||
@ -885,7 +888,9 @@ public class ApiCommands {
|
||||
proxyParams.put(entry.getKey(), entry.getValue()[0]);
|
||||
}
|
||||
|
||||
Loggers.DEBUG_LOG.debug("[ADD-IP] full arguments:" + proxyParams);
|
||||
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
|
||||
Loggers.DEBUG_LOG.debug("[ADD-IP] full arguments:" + proxyParams + ", client:" + request.getRemoteAddr());
|
||||
}
|
||||
|
||||
String ipListString = WebUtils.required(request, "ipList");
|
||||
final List<String> ipList;
|
||||
@ -917,7 +922,7 @@ public class ApiCommands {
|
||||
}
|
||||
|
||||
String url = "http://" + server
|
||||
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/addIP4Dom";
|
||||
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/addIP4Dom";
|
||||
HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams);
|
||||
|
||||
if (result1.code != HttpURLConnection.HTTP_OK) {
|
||||
@ -929,7 +934,10 @@ public class ApiCommands {
|
||||
}
|
||||
|
||||
final String dom = WebUtils.required(request, "dom");
|
||||
if (domainsManager.getDomain(dom) == null) {
|
||||
|
||||
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
|
||||
if (domain == null) {
|
||||
throw new IllegalStateException("dom doesn't exist: " + dom);
|
||||
}
|
||||
|
||||
@ -941,17 +949,18 @@ public class ApiCommands {
|
||||
|
||||
if (updateOnly) {
|
||||
//make sure every IP is in the dom, otherwise refuse update
|
||||
List<IpAddress> oldIPs = domainsManager.getDomain(dom).allIPs();
|
||||
List<IpAddress> oldIPs = domain.allIPs();
|
||||
Collection diff = CollectionUtils.subtract(newIPs, oldIPs);
|
||||
if (diff.size() != 0) {
|
||||
throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray())
|
||||
+ ", if you want to add them, remove updateOnly flag");
|
||||
+ ", if you want to add them, remove updateOnly flag");
|
||||
}
|
||||
}
|
||||
|
||||
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom));
|
||||
String key = UtilsAndCommons.getIPListStoreKey(domain);
|
||||
|
||||
Datum datum = RaftCore.getDatum(key);
|
||||
|
||||
if (datum == null) {
|
||||
try {
|
||||
domainsManager.getDom2LockMap().get(dom).lock();
|
||||
@ -966,63 +975,105 @@ public class ApiCommands {
|
||||
}
|
||||
}
|
||||
|
||||
long timestamp = RaftCore.getDatum(key).timestamp.incrementAndGet();
|
||||
long timestamp = RaftCore.getDatum(key).timestamp.get();
|
||||
|
||||
if (RaftCore.isLeader()) {
|
||||
try {
|
||||
domainsManager.getDom2LockMap().get(dom).lock();
|
||||
|
||||
RaftCore.OPERATE_LOCK.lock();
|
||||
|
||||
proxyParams.put("clientIP", NetUtils.localServer());
|
||||
proxyParams.put("notify", "true");
|
||||
|
||||
proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term));
|
||||
proxyParams.put("timestamp", String.valueOf(timestamp));
|
||||
|
||||
for (final RaftPeer peer : RaftCore.getPeers()) {
|
||||
onAddIP4Dom(MockHttpRequest.buildRequest2(proxyParams));
|
||||
|
||||
UtilsAndCommons.RAFT_PUBLISH_EXECUTOR.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
String server = peer.ip;
|
||||
|
||||
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
|
||||
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
|
||||
}
|
||||
|
||||
String url = "http://" + server
|
||||
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/onAddIP4Dom";
|
||||
|
||||
try {
|
||||
HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() {
|
||||
@Override
|
||||
public Integer onCompleted(Response response) throws Exception {
|
||||
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||
Loggers.SRV_LOG.warn("failed to add ip for dom: " + dom
|
||||
+ ",ipList = " + ipList + ",code: " + response.getStatusCode()
|
||||
+ ", caused " + response.getResponseBody() + ", server: " + peer.ip);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("ADD-IP", "failed when publish to peer." + url, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) {
|
||||
syncOnAddIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown"));
|
||||
} else {
|
||||
asyncOnAddIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown"));
|
||||
}
|
||||
|
||||
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-ADD}" + " new: "
|
||||
+ Arrays.toString(ipList.toArray()) + " operatorIP: "
|
||||
+ WebUtils.optional(request, "clientIP", "unknown"));
|
||||
} finally {
|
||||
domainsManager.getDom2LockMap().get(dom).unlock();
|
||||
RaftCore.OPERATE_LOCK.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return "ok";
|
||||
}
|
||||
|
||||
private void syncOnUpdateIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP, String action) throws InterruptedException {
|
||||
|
||||
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom));
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(RaftCore.getPeerSet().majorityCount());
|
||||
updateIpPublish(dom, ipList, proxyParams, clientIP, countDownLatch, action);
|
||||
if (!countDownLatch.await(UtilsAndCommons.MAX_PUBLISH_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
|
||||
Loggers.RAFT.info("data publish failed, key=" + key, ",notify timeout.");
|
||||
throw new IllegalArgumentException("data publish failed, key=" + key);
|
||||
}
|
||||
}
|
||||
|
||||
private void syncOnAddIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) throws InterruptedException {
|
||||
syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
|
||||
}
|
||||
|
||||
private void asyncOnAddIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) {
|
||||
updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
|
||||
}
|
||||
|
||||
private void syncOnRemvIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) throws InterruptedException {
|
||||
syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
|
||||
}
|
||||
|
||||
private void asyncOnRemvIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) {
|
||||
updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
|
||||
}
|
||||
|
||||
private void updateIpPublish(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP, CountDownLatch countDownLatch, String action) {
|
||||
|
||||
for (final String peer : RaftCore.getPeerSet().allServersWithoutMySelf()) {
|
||||
|
||||
UtilsAndCommons.RAFT_PUBLISH_EXECUTOR.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
String server = peer;
|
||||
|
||||
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
|
||||
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
|
||||
}
|
||||
|
||||
String api = action.equals("remove") ? "onRemvIP4Dom" : "onAddIP4Dom";
|
||||
|
||||
String url = "http://" + server
|
||||
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api;
|
||||
|
||||
try {
|
||||
HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() {
|
||||
@Override
|
||||
public Integer onCompleted(Response response) throws Exception {
|
||||
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||
Loggers.SRV_LOG.warn("failed to add ip for dom: " + dom
|
||||
+ ",ipList = " + ipList + ",code: " + response.getStatusCode()
|
||||
+ ", caused " + response.getResponseBody() + ", server: " + peer);
|
||||
return 1;
|
||||
}
|
||||
if (countDownLatch != null) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error(action + "-IP", "failed when publish to peer." + url, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@NeedAuth
|
||||
@RequestMapping("/addIP4Dom")
|
||||
public String addIP4Dom(HttpServletRequest request) throws Exception {
|
||||
@ -1128,12 +1179,12 @@ public class ApiCommands {
|
||||
try {
|
||||
if (udpPort > 0 && PushService.canEnablePush(agent)) {
|
||||
PushService.addClient(dom,
|
||||
clusters,
|
||||
agent,
|
||||
new InetSocketAddress(clientIP, udpPort),
|
||||
pushDataSource,
|
||||
tenant,
|
||||
app);
|
||||
clusters,
|
||||
agent,
|
||||
new InetSocketAddress(clientIP, udpPort),
|
||||
pushDataSource,
|
||||
tenant,
|
||||
app);
|
||||
cacheMillis = Switch.getPushCacheMillis(dom);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -1173,7 +1224,7 @@ public class ApiCommands {
|
||||
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
|
||||
|
||||
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, " +
|
||||
"dom: " + dom);
|
||||
"dom: " + dom);
|
||||
if (isCheck) {
|
||||
result.put("reachProtectThreshold", true);
|
||||
}
|
||||
@ -1229,37 +1280,142 @@ public class ApiCommands {
|
||||
return result;
|
||||
}
|
||||
|
||||
@RequestMapping("/onRemvIP4Dom")
|
||||
public void onRemvIP4Dom(HttpServletRequest request) throws Exception {
|
||||
if (Switch.getDisableAddIP()) {
|
||||
throw new AccessControlException("Deleting IP for dom is forbidden now.");
|
||||
}
|
||||
|
||||
String clientIP = WebUtils.required(request, "clientIP");
|
||||
long term = Long.parseLong(WebUtils.required(request, "term"));
|
||||
|
||||
if (!RaftCore.isLeader(clientIP)) {
|
||||
Loggers.RAFT.warn("peer(" + JSON.toJSONString(clientIP) + ") tried to publish " +
|
||||
"data but wasn't leader, leader: " + JSON.toJSONString(RaftCore.getLeader()));
|
||||
throw new IllegalStateException("peer(" + clientIP + ") tried to publish " +
|
||||
"data but wasn't leader");
|
||||
}
|
||||
|
||||
if (term < RaftCore.getPeerSet().local().term.get()) {
|
||||
Loggers.RAFT.warn("out of date publish, pub-term: "
|
||||
+ JSON.toJSONString(clientIP) + ", cur-term: " + JSON.toJSONString(RaftCore.getPeerSet().local()));
|
||||
throw new IllegalStateException("out of date publish, pub-term:"
|
||||
+ term + ", cur-term: " + RaftCore.getPeerSet().local().term);
|
||||
}
|
||||
|
||||
RaftCore.getPeerSet().local().resetLeaderDue();
|
||||
|
||||
final String dom = WebUtils.required(request, "dom");
|
||||
if (domainsManager.getDomain(dom) == null) {
|
||||
throw new IllegalStateException("dom doesn't exist: " + dom);
|
||||
}
|
||||
|
||||
List<IpAddress> removedIPs = getIpAddresses(request);
|
||||
|
||||
if (CollectionUtils.isEmpty(removedIPs)) {
|
||||
throw new IllegalArgumentException("Empty ip list");
|
||||
}
|
||||
|
||||
domainsManager.easyRemvIP4Dom(dom, removedIPs, term);
|
||||
}
|
||||
|
||||
@NeedAuth
|
||||
@RequestMapping("/remvIP4Dom")
|
||||
public String remvIP4Dom(HttpServletRequest request) throws Exception {
|
||||
String dom = WebUtils.required(request, "dom");
|
||||
String ipListString = WebUtils.required(request, "ipList");
|
||||
|
||||
Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: serviceName:" + dom + ", iplist:" + ipListString);
|
||||
|
||||
List<IpAddress> newIPs = new ArrayList<>();
|
||||
List<String> ipList = new ArrayList<>();
|
||||
if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) {
|
||||
newIPs = JSON.parseObject(ipListString, new TypeReference<List<IpAddress>>() {
|
||||
});
|
||||
} else {
|
||||
ipList = Arrays.asList(ipListString.split(","));
|
||||
Map<String, String> proxyParams = new HashMap<>(16);
|
||||
for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) {
|
||||
proxyParams.put(entry.getKey(), entry.getValue()[0]);
|
||||
}
|
||||
|
||||
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
|
||||
Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: params:" + proxyParams);
|
||||
}
|
||||
|
||||
List<String> ipList = new ArrayList<>();
|
||||
|
||||
List<IpAddress> ipObjList = new ArrayList<>(ipList.size());
|
||||
if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) {
|
||||
ipObjList = newIPs;
|
||||
ipList = Arrays.asList(ipListString);
|
||||
ipObjList = JSON.parseObject(ipListString, new TypeReference<List<IpAddress>>() {
|
||||
});
|
||||
} else {
|
||||
ipList = Arrays.asList(ipListString.split(","));
|
||||
for (String ip : ipList) {
|
||||
ipObjList.add(IpAddress.fromJSON(ip));
|
||||
}
|
||||
}
|
||||
|
||||
domainsManager.easyRemvIP4Dom(dom, ipObjList);
|
||||
if (!RaftCore.isLeader()) {
|
||||
Loggers.RAFT.info("I'm not leader, will proxy to leader.");
|
||||
if (RaftCore.getLeader() == null) {
|
||||
throw new IllegalArgumentException("no leader now.");
|
||||
}
|
||||
|
||||
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " dead: "
|
||||
+ Arrays.toString(ipList.toArray()) + " operator: "
|
||||
RaftPeer leader = RaftCore.getLeader();
|
||||
|
||||
String server = leader.ip;
|
||||
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
|
||||
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
|
||||
}
|
||||
|
||||
String url = "http://" + server
|
||||
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/remvIP4Dom";
|
||||
HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams);
|
||||
|
||||
if (result1.code != HttpURLConnection.HTTP_OK) {
|
||||
Loggers.SRV_LOG.warn("failed to remove ip for dom, caused " + result1.content);
|
||||
throw new IllegalArgumentException("failed to remove ip for dom, caused " + result1.content);
|
||||
}
|
||||
|
||||
return "ok";
|
||||
}
|
||||
|
||||
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
|
||||
if (domain == null) {
|
||||
throw new IllegalStateException("dom doesn't exist: " + dom);
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(ipObjList)) {
|
||||
throw new IllegalArgumentException("Empty ip list");
|
||||
}
|
||||
|
||||
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom));
|
||||
|
||||
long timestamp = 1;
|
||||
if (RaftCore.getDatum(key) != null) {
|
||||
timestamp = RaftCore.getDatum(key).timestamp.get();
|
||||
}
|
||||
|
||||
if (RaftCore.isLeader()) {
|
||||
|
||||
try {
|
||||
|
||||
RaftCore.OPERATE_LOCK.lock();
|
||||
|
||||
proxyParams.put("clientIP", NetUtils.localServer());
|
||||
proxyParams.put("notify", "true");
|
||||
proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term));
|
||||
proxyParams.put("timestamp", String.valueOf(timestamp));
|
||||
|
||||
onRemvIP4Dom(MockHttpRequest.buildRequest2(proxyParams));
|
||||
|
||||
if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) {
|
||||
syncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown"));
|
||||
} else {
|
||||
asyncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown"));
|
||||
}
|
||||
} finally {
|
||||
RaftCore.OPERATE_LOCK.unlock();
|
||||
}
|
||||
|
||||
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " new: "
|
||||
+ ipListString + " operatorIP: "
|
||||
+ WebUtils.optional(request, "clientIP", "unknown"));
|
||||
}
|
||||
|
||||
return "ok";
|
||||
}
|
||||
@ -1340,21 +1496,21 @@ public class ApiCommands {
|
||||
SwitchDomain dom = JSON.parseObject(WebUtils.required(request, "json"), SwitchDomain.class);
|
||||
dom.setEnableStandalone(Switch.isEnableStandalone());
|
||||
if (dom.httpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN
|
||||
|| dom.tcpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) {
|
||||
|| dom.tcpHealthParams.getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) {
|
||||
|
||||
throw new IllegalArgumentException("min check time for http or tcp is too small(<500)");
|
||||
}
|
||||
|
||||
if (dom.httpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX
|
||||
|| dom.tcpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX) {
|
||||
|| dom.tcpHealthParams.getMax() < SwitchDomain.HttpHealthParams.MIN_MAX) {
|
||||
|
||||
throw new IllegalArgumentException("max check time for http or tcp is too small(<3000)");
|
||||
}
|
||||
|
||||
if (dom.httpHealthParams.getFactor() < 0
|
||||
|| dom.httpHealthParams.getFactor() > 1
|
||||
|| dom.tcpHealthParams.getFactor() < 0
|
||||
|| dom.tcpHealthParams.getFactor() > 1) {
|
||||
|| dom.httpHealthParams.getFactor() > 1
|
||||
|| dom.tcpHealthParams.getFactor() < 0
|
||||
|| dom.tcpHealthParams.getFactor() > 1) {
|
||||
|
||||
throw new IllegalArgumentException("malformed factor");
|
||||
}
|
||||
@ -1727,7 +1883,7 @@ public class ApiCommands {
|
||||
properties.load(is);
|
||||
|
||||
try (InputStreamReader releaseNode =
|
||||
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
|
||||
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
|
||||
|
||||
Properties properties1 = new Properties();
|
||||
properties1.load(releaseNode);
|
||||
@ -1743,7 +1899,7 @@ public class ApiCommands {
|
||||
|
||||
JSONObject result = new JSONObject();
|
||||
try (InputStreamReader releaseNode =
|
||||
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
|
||||
new InputStreamReader(ApiCommands.class.getClassLoader().getResourceAsStream("changelog.properties"), "UTF-8")) {
|
||||
|
||||
Properties properties1 = new Properties();
|
||||
properties1.load(releaseNode);
|
||||
@ -1791,7 +1947,7 @@ public class ApiCommands {
|
||||
String expr = WebUtils.required(request, "expr");
|
||||
|
||||
List<Domain> doms
|
||||
= domainsManager.searchDomains(".*" + expr + ".*");
|
||||
= domainsManager.searchDomains(".*" + expr + ".*");
|
||||
|
||||
if (CollectionUtils.isEmpty(doms)) {
|
||||
result.put("doms", Collections.emptyList());
|
||||
@ -2013,6 +2169,7 @@ public class ApiCommands {
|
||||
result.put("cpu", SystemUtils.getCPU());
|
||||
result.put("load", SystemUtils.getLoad());
|
||||
result.put("mem", SystemUtils.getMem());
|
||||
result.put("notifyTask", RaftCore.notifier.getTaskSize());
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -2275,7 +2432,7 @@ public class ApiCommands {
|
||||
String state = WebUtils.optional(request, "state", StringUtils.EMPTY);
|
||||
|
||||
Loggers.SRV_LOG.info("[CONTAINER_NOTFY] received notify event, type:" + type + ", domain:" + domain +
|
||||
", ip:" + ip + ", port:" + port + ", state:" + state);
|
||||
", ip:" + ip + ", port:" + port + ", state:" + state);
|
||||
|
||||
return "ok";
|
||||
}
|
||||
@ -2359,6 +2516,29 @@ public class ApiCommands {
|
||||
return pac;
|
||||
}
|
||||
|
||||
private List<IpAddress> getIpAddresses(HttpServletRequest request) {
|
||||
String ipListString = WebUtils.required(request, "ipList");
|
||||
final List<String> ipList;
|
||||
List<IpAddress> newIPs = new ArrayList<>();
|
||||
|
||||
if (Boolean.parseBoolean(WebUtils.optional(request, SwitchEntry.PARAM_JSON, Boolean.FALSE.toString()))) {
|
||||
newIPs = JSON.parseObject(ipListString, new TypeReference<List<IpAddress>>() {
|
||||
});
|
||||
} else {
|
||||
ipList = Arrays.asList(ipListString.split(","));
|
||||
for (String ip : ipList) {
|
||||
IpAddress ipAddr = IpAddress.fromJSON(ip);
|
||||
if (ipAddr == null) {
|
||||
throw new IllegalArgumentException("malformed ip ->" + ip);
|
||||
}
|
||||
|
||||
newIPs.add(ipAddr);
|
||||
}
|
||||
}
|
||||
|
||||
return newIPs;
|
||||
}
|
||||
|
||||
public void setDomainsManager(DomainsManager domainsManager) {
|
||||
this.domainsManager = domainsManager;
|
||||
}
|
||||
|
@ -114,28 +114,11 @@ public class RaftCommands {
|
||||
String value = Arrays.asList(entity).toArray(new String[1])[0];
|
||||
JSONObject json = JSON.parseObject(value);
|
||||
|
||||
RaftCore.doSignalPublish(json.getString("key"), json.getString("value"));
|
||||
RaftCore.doSignalPublish(json.getString("key"), json.getString("value"), json.getBooleanValue("locked"));
|
||||
|
||||
return "ok";
|
||||
}
|
||||
|
||||
@NeedAuth
|
||||
@RequestMapping("/unSafePublish")
|
||||
public String unSafePublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
|
||||
|
||||
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
|
||||
response.setHeader("Cache-Control", "no-cache");
|
||||
response.setHeader("Content-Encode", "gzip");
|
||||
|
||||
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
|
||||
|
||||
String value = Arrays.asList(entity).toArray(new String[1])[0];
|
||||
JSONObject json = JSON.parseObject(value);
|
||||
|
||||
RaftCore.unsafePublish(json.getString("key"), json.getString("value"));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
@NeedAuth
|
||||
@RequestMapping("/delete")
|
||||
public String delete(HttpServletRequest request, HttpServletResponse response) throws Exception {
|
||||
@ -193,7 +176,7 @@ public class RaftCommands {
|
||||
String value = Arrays.asList(entity).toArray(new String[1])[0];
|
||||
JSONObject jsonObject = JSON.parseObject(value);
|
||||
|
||||
RaftCore.onPublish(jsonObject);
|
||||
RaftCore.onPublish(jsonObject, jsonObject.getBoolean("increaseTerm"));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
|
@ -49,22 +49,6 @@ public class DomainsManagerTest extends BaseTest {
|
||||
domainsManager.easyRemoveDom("nacos.test.1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void easyRemvIP4Dom() throws Exception {
|
||||
|
||||
VirtualClusterDomain domain = new VirtualClusterDomain();
|
||||
domain.setName("nacos.test.1");
|
||||
|
||||
domainsManager.chooseDomMap().put("nacos.test.1", domain);
|
||||
|
||||
IpAddress ipAddress = new IpAddress();
|
||||
ipAddress.setIp("1.1.1.1");
|
||||
List<IpAddress> ipList = new ArrayList<IpAddress>();
|
||||
ipList.add(ipAddress);
|
||||
domainsManager.addLock("nacos.test.1");
|
||||
domainsManager.easyRemvIP4Dom("nacos.test.1", ipList);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchDom() throws Exception {
|
||||
VirtualClusterDomain domain = new VirtualClusterDomain();
|
||||
|
@ -85,7 +85,6 @@ public class AutoDeregisterInstance_ITCase {
|
||||
|
||||
namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT);
|
||||
|
||||
//TimeUnit.SECONDS.sleep(40);
|
||||
verifyInstanceList(instances, 1, serviceName);
|
||||
instances = naming.getAllInstances(serviceName);
|
||||
|
||||
@ -123,7 +122,6 @@ public class AutoDeregisterInstance_ITCase {
|
||||
|
||||
namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT);
|
||||
|
||||
//TimeUnit.SECONDS.sleep(40);
|
||||
verifyInstanceList(instances, 1, serviceName);
|
||||
instances = naming.getAllInstances(serviceName);
|
||||
|
||||
@ -156,7 +154,6 @@ public class AutoDeregisterInstance_ITCase {
|
||||
|
||||
namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT);
|
||||
|
||||
//TimeUnit.SECONDS.sleep(40);
|
||||
verifyInstanceList(instances, 1, serviceName);
|
||||
|
||||
instances = naming.getAllInstances(serviceName);
|
||||
@ -200,7 +197,6 @@ public class AutoDeregisterInstance_ITCase {
|
||||
|
||||
namingServiceImpl.getBeatReactor().removeBeatInfo(serviceName, "127.0.0.1", TEST_PORT);
|
||||
|
||||
//TimeUnit.SECONDS.sleep(40);
|
||||
verifyInstanceList(instances, 1, serviceName);
|
||||
|
||||
instances = naming.getAllInstances(serviceName);
|
||||
|
@ -379,7 +379,7 @@ public class SelectInstances_ITCase {
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
|
||||
ExpressionSelector expressionSelector = new ExpressionSelector();
|
||||
expressionSelector.setExpression("INSTANCE.metadata.registerSource = 'dubbo'");
|
||||
expressionSelector.setExpression("INSTANCE.label.registerSource = 'dubbo'");
|
||||
ListView<String> serviceList = naming.getServicesOfServer(1, 10, expressionSelector);
|
||||
|
||||
Assert.assertTrue(serviceList.getData().contains(serviceName));
|
||||
|
Loading…
Reference in New Issue
Block a user