Fix client side health check error
This commit is contained in:
parent
1cf0a5b731
commit
855037cac6
@ -69,7 +69,7 @@ public class BeatReactor {
|
||||
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
|
||||
BeatInfo beatInfo = entry.getValue();
|
||||
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
|
||||
LogUtils.LOG.debug("BEAT", "send beat to server: ", beatInfo.toString());
|
||||
LogUtils.LOG.debug("BEAT", "send beat to server: " + beatInfo.toString());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtils.LOG.error("CLIENT-BEAT", "Exception while scheduling beat.", e);
|
||||
|
@ -271,6 +271,8 @@ public class VirtualClusterDomain implements Domain, RaftListener {
|
||||
RaftCore.signalDelete(UtilsAndCommons.getIPListStoreKey(this));
|
||||
}
|
||||
|
||||
HealthCheckReactor.cancelCheck(clientBeatCheckTask);
|
||||
|
||||
RaftCore.unlisten(UtilsAndCommons.getIPListStoreKey(this));
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,10 @@ public class ClientBeatCheckTask implements Runnable {
|
||||
public ClientBeatCheckTask(VirtualClusterDomain domain) {
|
||||
this.domain = domain;
|
||||
}
|
||||
public String taskKey() {
|
||||
return domain.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
@ -71,7 +75,7 @@ public class ClientBeatCheckTask implements Runnable {
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
|
||||
} finally {
|
||||
HealthCheckReactor.scheduleCheck(this);
|
||||
// HealthCheckReactor.scheduleCheck(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,9 @@
|
||||
*/
|
||||
package com.alibaba.nacos.naming.healthcheck;
|
||||
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
@ -24,6 +27,8 @@ public class HealthCheckReactor {
|
||||
|
||||
private static final ScheduledExecutorService EXECUTOR;
|
||||
|
||||
private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
|
||||
|
||||
static {
|
||||
|
||||
int processorCount = Runtime.getRuntime().availableProcessors();
|
||||
@ -46,10 +51,24 @@ public class HealthCheckReactor {
|
||||
return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public static ScheduledFuture<?> scheduleCheck(ClientBeatCheckTask task) {
|
||||
return EXECUTOR.schedule(task, 5000, TimeUnit.MILLISECONDS);
|
||||
public static void scheduleCheck(ClientBeatCheckTask task) {
|
||||
futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));
|
||||
// return EXECUTOR.schedule(task, 5000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public static void cancelCheck(ClientBeatCheckTask task) {
|
||||
ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());
|
||||
if (scheduledFuture == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
scheduledFuture.cancel(true);
|
||||
} catch (Exception e) {
|
||||
Loggers.EVT_LOG.error("CANCEL-CHECK", "cancel failed!", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static ScheduledFuture<?> scheduleNow(Runnable task) {
|
||||
return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
@ -287,31 +287,35 @@ public class ApiCommands {
|
||||
|
||||
Loggers.TENANT.debug("client-beat", "beat: " + beat);
|
||||
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
Map<String, String[]> stringMap = new HashMap<>(16);
|
||||
stringMap.put("dom", Arrays.asList(dom).toArray(new String[1]));
|
||||
stringMap.put("enableClientBeat", Arrays.asList("true").toArray(new String[1]));
|
||||
stringMap.put("cktype", Arrays.asList("TCP").toArray(new String[1]));
|
||||
stringMap.put("appName", Arrays.asList(app).toArray(new String[1]));
|
||||
stringMap.put("clusterName", Arrays.asList(clusterName).toArray(new String[1]));
|
||||
|
||||
//if domain does not exist, register it.
|
||||
if (virtualClusterDomain == null) {
|
||||
Map<String, String[]> stringMap = new HashMap<>(16);
|
||||
stringMap.put("dom", Arrays.asList(dom).toArray(new String[1]));
|
||||
stringMap.put("enableClientBeat", Arrays.asList("true").toArray(new String[1]));
|
||||
stringMap.put("cktype", Arrays.asList("TCP").toArray(new String[1]));
|
||||
stringMap.put("appName", Arrays.asList(app).toArray(new String[1]));
|
||||
stringMap.put("clusterName", Arrays.asList(clusterName).toArray(new String[1]));
|
||||
regDom(MockHttpRequest.buildRequest(stringMap));
|
||||
Loggers.SRV_LOG.warn("dom not found, register it, dom:" + dom);
|
||||
}
|
||||
|
||||
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
String ip = clientBeat.getIp();
|
||||
int port = clientBeat.getPort();
|
||||
virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(dom);
|
||||
|
||||
IpAddress ipAddress = new IpAddress();
|
||||
ipAddress.setPort(port);
|
||||
ipAddress.setIp(ip);
|
||||
ipAddress.setWeight(1);
|
||||
ipAddress.setClusterName(clusterName);
|
||||
String ip = clientBeat.getIp();
|
||||
int port = clientBeat.getPort();
|
||||
|
||||
IpAddress ipAddress = new IpAddress();
|
||||
ipAddress.setPort(port);
|
||||
ipAddress.setIp(ip);
|
||||
ipAddress.setWeight(1);
|
||||
ipAddress.setClusterName(clusterName);
|
||||
|
||||
if (!virtualClusterDomain.allIPs().contains(ipAddress)) {
|
||||
stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1]));
|
||||
stringMap.put("json", Arrays.asList("true").toArray(new String[1]));
|
||||
addIP4Dom(MockHttpRequest.buildRequest(stringMap));
|
||||
Loggers.SRV_LOG.warn("dom not found, register it, dom:" + dom);
|
||||
Loggers.SRV_LOG.warn("ip not found, register it, dom:" + dom + ", ip:" + ipAddress);
|
||||
}
|
||||
|
||||
if (!DistroMapper.responsible(dom)) {
|
||||
|
Loading…
Reference in New Issue
Block a user