#498 Merge develop

This commit is contained in:
nkorange 2019-01-12 22:43:24 +08:00
parent 9898a3c8e4
commit 55e1269ee1
8 changed files with 150 additions and 161 deletions

View File

@ -57,12 +57,12 @@ public class BeatReactor {
} }
public void addBeatInfo(String dom, BeatInfo beatInfo) { public void addBeatInfo(String dom, BeatInfo beatInfo) {
LogUtils.LOG.info("BEAT", "adding service:" + dom + " to beat map."); LogUtils.LOG.info("BEAT", "adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(dom, beatInfo.getIp(), beatInfo.getPort()), beatInfo); dom2Beat.put(buildKey(dom, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
} }
public void removeBeatInfo(String dom, String ip, int port) { public void removeBeatInfo(String dom, String ip, int port) {
LogUtils.LOG.info("BEAT", "removing service:" + dom + " from beat map."); LogUtils.LOG.info("BEAT", "removing beat: {}:{}:{} from beat map.", dom, ip, port);
dom2Beat.remove(buildKey(dom, ip, port)); dom2Beat.remove(buildKey(dom, ip, port));
} }
@ -82,7 +82,6 @@ public class BeatReactor {
} }
beatInfo.setScheduled(true); beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
LogUtils.LOG.info("BEAT", "send beat to server: " + beatInfo.toString());
} }
} catch (Exception e) { } catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "Exception while scheduling beat.", e); LogUtils.LOG.error("CLIENT-BEAT", "Exception while scheduling beat.", e);
@ -100,22 +99,8 @@ public class BeatReactor {
@Override @Override
public void run() { public void run() {
Map<String, String> params = new HashMap<String, String>(2);
params.put("beat", JSON.toJSONString(beatInfo));
params.put("dom", beatInfo.getDom());
try {
beatInfo.setScheduled(false);
String result = serverProxy.callAllServers(UtilAndComs.NACOS_URL_BASE + "/api/clientBeat", params);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
clientBeatInterval = jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
long result = serverProxy.sendBeat(beatInfo); long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) { if (result > 0) {
clientBeatInterval = result; clientBeatInterval = result;
} }

View File

@ -157,7 +157,8 @@ public class NamingProxy {
public void registerService(String serviceName, Instance instance) throws NacosException { public void registerService(String serviceName, Instance instance) throws NacosException {
LogUtils.LOG.info("REGISTER-SERVICE", "registering service " + serviceName + " with instance:" + instance); LogUtils.LOG.info("REGISTER-SERVICE", "{} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(8); final Map<String, String> params = new HashMap<String, String>(8);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId); params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
@ -170,13 +171,14 @@ public class NamingProxy {
params.put("serviceName", serviceName); params.put("serviceName", serviceName);
params.put("clusterName", instance.getClusterName()); params.put("clusterName", instance.getClusterName());
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.PUT); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
} }
public void deregisterService(String serviceName, String ip, int port, String cluster) throws NacosException { public void deregisterService(String serviceName, String ip, int port, String cluster) throws NacosException {
LogUtils.LOG.info("DEREGISTER-SERVICE", "deregistering service " + serviceName LogUtils.LOG.info("DEREGISTER-SERVICE", "{} deregistering service {} with instance: {}:{}@{}",
+ " with instance:" + ip + ":" + port + "@" + cluster); namespaceId, serviceName, ip, port, cluster);
final Map<String, String> params = new HashMap<String, String>(8); final Map<String, String> params = new HashMap<String, String>(8);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId); params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
@ -202,24 +204,21 @@ public class NamingProxy {
} }
public long sendBeat(BeatInfo beatInfo) { public long sendBeat(BeatInfo beatInfo) {
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("serviceName", beatInfo.getServiceName());
try { try {
LogUtils.LOG.info("BEAT", "{} sending beat to server: {}", namespaceId, beatInfo.toString());
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("serviceName", beatInfo.getServiceName());
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT); String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result); JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) { if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval"); return jsonObject.getLong("clientBeatInterval");
} }
} catch (Exception e) { } catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e); LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
} }
return 0L; return 0L;
} }

View File

@ -295,16 +295,15 @@ public class DomainsManager {
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom), true); RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom), true);
} }
public void easyAddIP4Dom(String namespaceId, String domName, List<IpAddress> ips, long timestamp, long term) throws Exception { public void easyAddIP4Dom(String namespaceId, String domName, List<IpAddress> ips, long term) throws Exception {
public void easyAddIP4Dom(String domName, List<IpAddress> ips, long term) throws Exception { easyUpdateIP4Dom(namespaceId, domName, ips, term, "add");
easyUpdateIP4Dom(domName, ips, term, "add");
} }
public void easyRemvIP4Dom(String domName, List<IpAddress> ips, long term) throws Exception { public void easyRemvIP4Dom(String namespaceId, String domName, List<IpAddress> ips, long term) throws Exception {
easyUpdateIP4Dom(domName, ips, term, "remove"); easyUpdateIP4Dom(namespaceId, domName, ips, term, "remove");
} }
public void easyUpdateIP4Dom(String domName, List<IpAddress> ips, long term, String action) throws Exception { public void easyUpdateIP4Dom(String namespaceId, String domName, List<IpAddress> ips, long term, String action) throws Exception {
VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap(namespaceId).get(domName); VirtualClusterDomain dom = (VirtualClusterDomain) chooseDomMap(namespaceId).get(domName);
if (dom == null) { if (dom == null) {
@ -340,14 +339,14 @@ public class DomainsManager {
ipAddressMap.put(ipAddress.getDatumKey(), ipAddress); ipAddressMap.put(ipAddress.getDatumKey(), ipAddress);
} }
for (IpAddress ipAddress : ips) { for (IpAddress ipAddress : ips) {
if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) { if (!dom.getClusterMap().containsKey(ipAddress.getClusterName())) {
Cluster cluster = new Cluster(ipAddress.getClusterName()); Cluster cluster = new Cluster(ipAddress.getClusterName());
cluster.setDom(dom); cluster.setDom(dom);
dom.getClusterMap().put(ipAddress.getClusterName(), cluster); dom.getClusterMap().put(ipAddress.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
ipAddress.getClusterName(), ipAddress.toJSON()); ipAddress.getClusterName(), ipAddress.toJSON());
} }
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
ipAddressMap.remove(ipAddress.getDatumKey()); ipAddressMap.remove(ipAddress.getDatumKey());
@ -362,8 +361,7 @@ public class DomainsManager {
+ JSON.toJSONString(ipAddressMap.values())); + JSON.toJSONString(ipAddressMap.values()));
} }
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-UPDATE}" + ips + Loggers.EVT_LOG.info("{} {POS} {IP-UPDATE} {}, action: {}", dom, ips, action);
", action:" + action);
String key = UtilsAndCommons.getIPListStoreKey(dom); String key = UtilsAndCommons.getIPListStoreKey(dom);
String value = JSON.toJSONString(ipAddressMap.values()); String value = JSON.toJSONString(ipAddressMap.values());
@ -384,7 +382,7 @@ public class DomainsManager {
peer.leaderDueMs = RaftCore.getLeader().leaderDueMs; peer.leaderDueMs = RaftCore.getLeader().leaderDueMs;
peer.state = RaftCore.getLeader().state; peer.state = RaftCore.getLeader().state;
boolean increaseTerm = !((VirtualClusterDomain) getDomain(domName)).getEnableClientBeat(); boolean increaseTerm = !((VirtualClusterDomain) getDomain(namespaceId, domName)).getEnableClientBeat();
RaftCore.onPublish(datum, peer, increaseTerm); RaftCore.onPublish(datum, peer, increaseTerm);
} finally { } finally {
@ -420,8 +418,11 @@ public class DomainsManager {
return ipAddresses; return ipAddresses;
} }
public Domain getDomain(String domName) { public Domain getDomain(String namespaceId, String domName) {
return chooseDomMap().get(domName); if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseDomMap(namespaceId).get(domName);
} }
public void putDomain(VirtualClusterDomain domain) { public void putDomain(VirtualClusterDomain domain) {

View File

@ -73,7 +73,6 @@ public class PerformanceLoggerThread {
PerformanceLogTask task = new PerformanceLogTask(); PerformanceLogTask task = new PerformanceLogTask();
executor.scheduleWithFixedDelay(task, 30, PERIOD, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(task, 30, PERIOD, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new HealthCheckSwitchTask(), 30, HEALTH_CHECK_PERIOD, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(new HealthCheckSwitchTask(), 30, HEALTH_CHECK_PERIOD, TimeUnit.SECONDS);
} }
class PerformanceLogTask implements Runnable { class PerformanceLogTask implements Runnable {

View File

@ -21,6 +21,7 @@ import java.util.concurrent.*;
* @author nacos * @author nacos
*/ */
public class GlobalExecutor { public class GlobalExecutor {
public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L); public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);
public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L); public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L);

View File

@ -116,7 +116,7 @@ public class RaftCore {
System.out.println(notifier.tasks.size()); System.out.println(notifier.tasks.size());
} }
Loggers.RAFT.info("finish to load data from disk,cost: {} ms.", (System.currentTimeMillis() - start)); Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.register(new MasterElection()); GlobalExecutor.register(new MasterElection());
GlobalExecutor.register1(new HeartBeat()); GlobalExecutor.register1(new HeartBeat());
@ -132,7 +132,7 @@ public class RaftCore {
} }
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTVERAL_MS); GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
} }
public static List<RaftListener> getListeners() { public static List<RaftListener> getListeners() {

View File

@ -343,6 +343,7 @@ public class ApiCommands {
if (!virtualClusterDomain.getEnableClientBeat()) { if (!virtualClusterDomain.getEnableClientBeat()) {
return result; return result;
} }
stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1])); stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1]));
stringMap.put("json", Arrays.asList("true").toArray(new String[1])); stringMap.put("json", Arrays.asList("true").toArray(new String[1]));
stringMap.put("dom", Arrays.asList(dom).toArray(new String[1])); stringMap.put("dom", Arrays.asList(dom).toArray(new String[1]));
@ -606,7 +607,7 @@ public class ApiCommands {
stringMap.put("json", Arrays.asList("true").toArray(new String[1])); stringMap.put("json", Arrays.asList("true").toArray(new String[1]));
stringMap.put("token", Arrays.asList(virtualClusterDomain.getToken()).toArray(new String[1])); stringMap.put("token", Arrays.asList(virtualClusterDomain.getToken()).toArray(new String[1]));
doAddIP4Dom(OverrideParameterRequestWrapper.buildRequest(request, stringMap)); addIP4Dom(OverrideParameterRequestWrapper.buildRequest(request, stringMap));
} else { } else {
throw new IllegalArgumentException("dom not found: " + dom); throw new IllegalArgumentException("dom not found: " + dom);
} }
@ -897,13 +898,85 @@ public class ApiCommands {
+ ", if you want to add them, remove updateOnly flag"); + ", if you want to add them, remove updateOnly flag");
} }
} }
domainsManager.easyAddIP4Dom(namespaceId, dom, newIPs, timestamp, term); domainsManager.easyAddIP4Dom(namespaceId, dom, newIPs, term);
return "ok"; return "ok";
} }
private void syncOnUpdateIP4Dom(String namespaceId, String dom, Map<String, String> proxyParams, String action) throws InterruptedException {
private String doAddIP4Dom(HttpServletRequest request) throws Exception { String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(namespaceId, dom));
final CountDownLatch countDownLatch = new CountDownLatch(RaftCore.getPeerSet().majorityCount());
updateIpPublish(proxyParams, countDownLatch, action);
if (!countDownLatch.await(UtilsAndCommons.MAX_PUBLISH_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
Loggers.RAFT.info("data publish failed, key=" + key, ",notify timeout.");
throw new IllegalArgumentException("data publish failed, key=" + key);
}
}
private void syncOnAddIP4Dom(String namespaceId, String dom, Map<String, String> proxyParams) throws InterruptedException {
syncOnUpdateIP4Dom(namespaceId, dom, proxyParams, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
}
private void asyncOnAddIP4Dom(Map<String, String> proxyParams) {
updateIpPublish(proxyParams, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
}
private void syncOnRemvIP4Dom(String namespaceId, String dom, Map<String, String> proxyParams) throws InterruptedException {
syncOnUpdateIP4Dom(namespaceId, dom, proxyParams, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
}
private void asyncOnRemvIP4Dom(Map<String, String> proxyParams) {
updateIpPublish(proxyParams, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
}
private void updateIpPublish(Map<String, String> proxyParams, CountDownLatch countDownLatch, String action) {
for (final String peer : RaftCore.getPeerSet().allServersWithoutMySelf()) {
UtilsAndCommons.RAFT_PUBLISH_EXECUTOR.execute(new Runnable() {
@Override
public void run() {
String server = peer;
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
String api = action.equals("remove") ? "onRemvIP4Dom" : "onAddIP4Dom";
String url = "http://" + server
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api;
try {
HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("failed to add ip params: " + proxyParams
+ ",code: " + response.getStatusCode() + ", caused " + response.getResponseBody()
+ ", server: " + peer);
return 1;
}
if (countDownLatch != null) {
countDownLatch.countDown();
}
return 0;
}
});
} catch (Exception e) {
Loggers.SRV_LOG.error(action + "-IP", "failed when publish to peer." + url, e);
}
}
});
}
}
@NeedAuth
@RequestMapping("/addIP4Dom")
public String addIP4Dom(HttpServletRequest request) throws Exception {
if (Switch.getDisableAddIP()) { if (Switch.getDisableAddIP()) {
throw new AccessControlException("Adding IP for dom is forbidden now."); throw new AccessControlException("Adding IP for dom is forbidden now.");
@ -963,7 +1036,10 @@ public class ApiCommands {
} }
final String dom = WebUtils.required(request, "dom"); final String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(namespaceId, dom) == null) {
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (domain == null) {
throw new IllegalStateException("dom doesn't exist: " + dom); throw new IllegalStateException("dom doesn't exist: " + dom);
} }
@ -975,7 +1051,7 @@ public class ApiCommands {
if (updateOnly) { if (updateOnly) {
//make sure every IP is in the dom, otherwise refuse update //make sure every IP is in the dom, otherwise refuse update
List<IpAddress> oldIPs = domainsManager.getDomain(namespaceId, dom).allIPs(); List<IpAddress> oldIPs = domain.allIPs();
Collection diff = CollectionUtils.subtract(newIPs, oldIPs); Collection diff = CollectionUtils.subtract(newIPs, oldIPs);
if (diff.size() != 0) { if (diff.size() != 0) {
throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray()) throw new IllegalArgumentException("these IPs are not present: " + Arrays.toString(diff.toArray())
@ -983,7 +1059,7 @@ public class ApiCommands {
} }
} }
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(namespaceId, dom)); String key = UtilsAndCommons.getIPListStoreKey(domain);
Datum datum = RaftCore.getDatum(key); Datum datum = RaftCore.getDatum(key);
if (datum == null) { if (datum == null) {
@ -1005,17 +1081,19 @@ public class ApiCommands {
if (RaftCore.isLeader()) { if (RaftCore.isLeader()) {
try { try {
RaftCore.OPERATE_LOCK.lock(); RaftCore.OPERATE_LOCK.lock();
proxyParams.put("clientIP", NetUtils.localServer());
proxyParams.put("notify", "true");
proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term));
proxyParams.put("timestamp", String.valueOf(timestamp));
onAddIP4Dom(MockHttpRequest.buildRequest2(proxyParams)); OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(request);
requestWrapper.addParameter("clientIP", NetUtils.localServer());
requestWrapper.addParameter("notify", "true");
requestWrapper.addParameter("term", String.valueOf(RaftCore.getPeerSet().local().term));
requestWrapper.addParameter("timestamp", String.valueOf(timestamp));
onAddIP4Dom(requestWrapper);
if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) { if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) {
syncOnAddIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); syncOnAddIP4Dom(namespaceId, dom, proxyParams);
} else { } else {
asyncOnAddIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); asyncOnAddIP4Dom(proxyParams);
} }
} finally { } finally {
RaftCore.OPERATE_LOCK.unlock(); RaftCore.OPERATE_LOCK.unlock();
@ -1026,83 +1104,6 @@ public class ApiCommands {
return "ok"; return "ok";
} }
private void syncOnUpdateIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP, String action) throws InterruptedException {
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom));
final CountDownLatch countDownLatch = new CountDownLatch(RaftCore.getPeerSet().majorityCount());
updateIpPublish(dom, ipList, proxyParams, clientIP, countDownLatch, action);
if (!countDownLatch.await(UtilsAndCommons.MAX_PUBLISH_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
Loggers.RAFT.info("data publish failed, key=" + key, ",notify timeout.");
throw new IllegalArgumentException("data publish failed, key=" + key);
}
}
private void syncOnAddIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) throws InterruptedException {
syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
}
private void asyncOnAddIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) {
updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD);
}
private void syncOnRemvIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) throws InterruptedException {
syncOnUpdateIP4Dom(dom, ipList, proxyParams, clientIP, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
}
private void asyncOnRemvIP4Dom(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP) {
updateIpPublish(dom, ipList, proxyParams, clientIP, null, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE);
}
private void updateIpPublish(String dom, List<String> ipList, Map<String, String> proxyParams, String clientIP, CountDownLatch countDownLatch, String action) {
for (final String peer : RaftCore.getPeerSet().allServersWithoutMySelf()) {
UtilsAndCommons.RAFT_PUBLISH_EXECUTOR.execute(new Runnable() {
@Override
public void run() {
String server = peer;
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
String api = action.equals("remove") ? "onRemvIP4Dom" : "onAddIP4Dom";
String url = "http://" + server
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api;
try {
HttpClient.asyncHttpPost(url, null, proxyParams, new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("failed to add ip for dom: " + dom
+ ",ipList = " + ipList + ",code: " + response.getStatusCode()
+ ", caused " + response.getResponseBody() + ", server: " + peer);
return 1;
}
if (countDownLatch != null) {
countDownLatch.countDown();
}
return 0;
}
});
} catch (Exception e) {
Loggers.SRV_LOG.error(action + "-IP", "failed when publish to peer." + url, e);
}
}
});
}
}
@NeedAuth
@RequestMapping("/addIP4Dom")
public String addIP4Dom(HttpServletRequest request) throws Exception {
return doAddIP4Dom(request);
}
public JSONObject doSrvIPXT(String namespaceId, String dom, String agent, String clusters, String clientIP, int udpPort, public JSONObject doSrvIPXT(String namespaceId, String dom, String agent, String clusters, String clientIP, int udpPort,
String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
@ -1247,7 +1248,10 @@ public class ApiCommands {
RaftCore.getPeerSet().local().resetLeaderDue(); RaftCore.getPeerSet().local().resetLeaderDue();
final String dom = WebUtils.required(request, "dom"); final String dom = WebUtils.required(request, "dom");
if (domainsManager.getDomain(dom) == null) { final String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
if (domainsManager.getDomain(namespaceId, dom) == null) {
throw new IllegalStateException("dom doesn't exist: " + dom); throw new IllegalStateException("dom doesn't exist: " + dom);
} }
@ -1257,7 +1261,7 @@ public class ApiCommands {
throw new IllegalArgumentException("Empty ip list"); throw new IllegalArgumentException("Empty ip list");
} }
domainsManager.easyRemvIP4Dom(dom, removedIPs, term); domainsManager.easyRemvIP4Dom(namespaceId, dom, removedIPs, term);
} }
@RequestMapping("/srvIPXT") @RequestMapping("/srvIPXT")
@ -1299,16 +1303,16 @@ public class ApiCommands {
String ipListString = WebUtils.required(request, "ipList"); String ipListString = WebUtils.required(request, "ipList");
if (Loggers.DEBUG_LOG.isDebugEnabled()) { if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: serviceName:" + dom + ", iplist:" + ipListString); Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: serviceName: {}, iplist: {}", dom, ipListString);
} }
List<IpAddress> newIPs = new ArrayList<>();
Map<String, String> proxyParams = new HashMap<>(16); Map<String, String> proxyParams = new HashMap<>(16);
for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) { for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) {
proxyParams.put(entry.getKey(), entry.getValue()[0]); proxyParams.put(entry.getKey(), entry.getValue()[0]);
} }
if (Loggers.DEBUG_LOG.isDebugEnabled()) { if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments: params:" + proxyParams); Loggers.DEBUG_LOG.debug("[REMOVE-IP] full arguments, params: {}", proxyParams);
} }
List<String> ipList = new ArrayList<>(); List<String> ipList = new ArrayList<>();
@ -1343,14 +1347,14 @@ public class ApiCommands {
HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams); HttpClient.HttpResult result1 = HttpClient.httpPost(url, null, proxyParams);
if (result1.code != HttpURLConnection.HTTP_OK) { if (result1.code != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("failed to remove ip for dom, caused " + result1.content); Loggers.SRV_LOG.warn("failed to remove ip for dom, caused: {}", result1.content);
throw new IllegalArgumentException("failed to remove ip for dom, caused " + result1.content); throw new IllegalArgumentException("failed to remove ip for dom, caused " + result1.content);
} }
return "ok"; return "ok";
} }
VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(dom); VirtualClusterDomain domain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
if (domain == null) { if (domain == null) {
throw new IllegalStateException("dom doesn't exist: " + dom); throw new IllegalStateException("dom doesn't exist: " + dom);
@ -1360,7 +1364,7 @@ public class ApiCommands {
throw new IllegalArgumentException("Empty ip list"); throw new IllegalArgumentException("Empty ip list");
} }
String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(dom)); String key = UtilsAndCommons.getIPListStoreKey(domainsManager.getDomain(namespaceId, dom));
long timestamp = 1; long timestamp = 1;
if (RaftCore.getDatum(key) != null) { if (RaftCore.getDatum(key) != null) {
@ -1373,25 +1377,25 @@ public class ApiCommands {
RaftCore.OPERATE_LOCK.lock(); RaftCore.OPERATE_LOCK.lock();
proxyParams.put("clientIP", NetUtils.localServer()); OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(request);
proxyParams.put("notify", "true"); requestWrapper.addParameter("clientIP", NetUtils.localServer());
proxyParams.put("term", String.valueOf(RaftCore.getPeerSet().local().term)); requestWrapper.addParameter("notify", "true");
proxyParams.put("timestamp", String.valueOf(timestamp)); requestWrapper.addParameter("term", String.valueOf(RaftCore.getPeerSet().local().term));
requestWrapper.addParameter("timestamp", String.valueOf(timestamp));
onRemvIP4Dom(MockHttpRequest.buildRequest2(proxyParams)); onRemvIP4Dom(requestWrapper);
if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) { if (domain.getEnableHealthCheck() && !domain.getEnableClientBeat()) {
syncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); syncOnRemvIP4Dom(namespaceId, dom, proxyParams);
} else { } else {
asyncOnRemvIP4Dom(dom, ipList, proxyParams, WebUtils.optional(request, "clientIP", "unknown")); asyncOnRemvIP4Dom(proxyParams);
} }
} finally { } finally {
RaftCore.OPERATE_LOCK.unlock(); RaftCore.OPERATE_LOCK.unlock();
} }
Loggers.EVT_LOG.info("{" + dom + "} {POS} {IP-REMV}" + " new: " Loggers.EVT_LOG.info("dom: {} {POS} {IP-REMV} new: {} operatorIP: {}",
+ ipListString + " operatorIP: " dom, ipListString, WebUtils.optional(request, "clientIP", "unknown"));
+ WebUtils.optional(request, "clientIP", "unknown"));
} }
return "ok"; return "ok";

View File

@ -63,7 +63,7 @@ public class DomainsManagerTest extends BaseTest {
List<IpAddress> ipList = new ArrayList<IpAddress>(); List<IpAddress> ipList = new ArrayList<IpAddress>();
ipList.add(ipAddress); ipList.add(ipAddress);
domainsManager.addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")); domainsManager.addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1"));
domainsManager.easyRemvIP4Dom(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1", ipList); domainsManager.easyRemvIP4Dom(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1", ipList, 1L);
} }
@Test @Test