Remove HealthCheckTask.java

This commit is contained in:
KomachiSion 2022-08-26 15:31:34 +08:00
parent 989922600c
commit 7af6e869d7
24 changed files with 25 additions and 2139 deletions

View File

@ -24,7 +24,6 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager; import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate; import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -49,19 +48,15 @@ public class DistroClientComponentRegistry {
private final ClusterRpcClientProxy clusterRpcClientProxy; private final ClusterRpcClientProxy clusterRpcClientProxy;
private final UpgradeJudgement upgradeJudgement;
public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol, public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,
DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder, DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy, ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy) {
UpgradeJudgement upgradeJudgement) {
this.serverMemberManager = serverMemberManager; this.serverMemberManager = serverMemberManager;
this.distroProtocol = distroProtocol; this.distroProtocol = distroProtocol;
this.componentHolder = componentHolder; this.componentHolder = componentHolder;
this.taskEngineHolder = taskEngineHolder; this.taskEngineHolder = taskEngineHolder;
this.clientManager = clientManager; this.clientManager = clientManager;
this.clusterRpcClientProxy = clusterRpcClientProxy; this.clusterRpcClientProxy = clusterRpcClientProxy;
this.upgradeJudgement = upgradeJudgement;
} }
/** /**
@ -70,8 +65,7 @@ public class DistroClientComponentRegistry {
*/ */
@PostConstruct @PostConstruct
public void doRegister() { public void doRegister() {
DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol, DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol);
upgradeJudgement);
DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy, DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
serverMemberManager); serverMemberManager);
DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder); DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);

View File

@ -38,7 +38,6 @@ import com.alibaba.nacos.naming.core.v2.pojo.BatchInstanceData;
import com.alibaba.nacos.naming.core.v2.pojo.BatchInstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.BatchInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.sys.utils.ApplicationUtils;
@ -62,15 +61,11 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
private final DistroProtocol distroProtocol; private final DistroProtocol distroProtocol;
private final UpgradeJudgement upgradeJudgement;
private volatile boolean isFinishInitial; private volatile boolean isFinishInitial;
public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol, public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol) {
UpgradeJudgement upgradeJudgement) {
this.clientManager = clientManager; this.clientManager = clientManager;
this.distroProtocol = distroProtocol; this.distroProtocol = distroProtocol;
this.upgradeJudgement = upgradeJudgement;
NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance()); NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
} }
@ -98,9 +93,6 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
if (EnvUtil.getStandaloneMode()) { if (EnvUtil.getStandaloneMode()) {
return; return;
} }
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) { if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event); syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else { } else {
@ -193,23 +185,26 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
} }
} }
private static void processBatchInstanceDistroData(Set<Service> syncedService, Client client, ClientSyncData clientSyncData) { private static void processBatchInstanceDistroData(Set<Service> syncedService, Client client,
ClientSyncData clientSyncData) {
BatchInstanceData batchInstanceData = clientSyncData.getBatchInstanceData(); BatchInstanceData batchInstanceData = clientSyncData.getBatchInstanceData();
if (batchInstanceData == null || CollectionUtils.isEmpty(batchInstanceData.getNamespaces())) { if (batchInstanceData == null || CollectionUtils.isEmpty(batchInstanceData.getNamespaces())) {
Loggers.DISTRO.info("[processBatchInstanceDistroData] BatchInstanceData is null , clientId is :{}", client.getClientId()); Loggers.DISTRO.info("[processBatchInstanceDistroData] BatchInstanceData is null , clientId is :{}",
client.getClientId());
return; return;
} }
List<String> namespaces = batchInstanceData.getNamespaces(); List<String> namespaces = batchInstanceData.getNamespaces();
List<String> groupNames = batchInstanceData.getGroupNames(); List<String> groupNames = batchInstanceData.getGroupNames();
List<String> serviceNames = batchInstanceData.getServiceNames(); List<String> serviceNames = batchInstanceData.getServiceNames();
List<BatchInstancePublishInfo> batchInstancePublishInfos = batchInstanceData.getBatchInstancePublishInfos(); List<BatchInstancePublishInfo> batchInstancePublishInfos = batchInstanceData.getBatchInstancePublishInfos();
for (int i = 0; i < namespaces.size(); i++) { for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service); Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton); syncedService.add(singleton);
BatchInstancePublishInfo batchInstancePublishInfo = batchInstancePublishInfos.get(i); BatchInstancePublishInfo batchInstancePublishInfo = batchInstancePublishInfos.get(i);
BatchInstancePublishInfo targetInstanceInfo = (BatchInstancePublishInfo) client.getInstancePublishInfo(singleton); BatchInstancePublishInfo targetInstanceInfo = (BatchInstancePublishInfo) client
.getInstancePublishInfo(singleton);
boolean result = false; boolean result = false;
if (targetInstanceInfo != null) { if (targetInstanceInfo != null) {
result = batchInstancePublishInfo.equals(targetInstanceInfo); result = batchInstancePublishInfo.equals(targetInstanceInfo);

View File

@ -18,21 +18,15 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.auth.annotation.Secured; import com.alibaba.nacos.auth.annotation.Secured;
import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.CatalogService; import com.alibaba.nacos.naming.core.CatalogService;
import com.alibaba.nacos.naming.core.CatalogServiceV2Impl; import com.alibaba.nacos.naming.core.CatalogServiceV2Impl;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.healthcheck.HealthCheckTask;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.plugin.auth.constant.ActionTypes; import com.alibaba.nacos.plugin.auth.constant.ActionTypes;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
@ -40,9 +34,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Catalog controller. * Catalog controller.
@ -53,9 +45,6 @@ import java.util.Map;
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_CATALOG_CONTEXT) @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_CATALOG_CONTEXT)
public class CatalogController { public class CatalogController {
@Autowired
protected ServiceManager serviceManager;
@Autowired @Autowired
private CatalogServiceV2Impl catalogServiceV2; private CatalogServiceV2Impl catalogServiceV2;
@ -148,41 +137,6 @@ public class CatalogController {
.pageListService(namespaceId, groupName, serviceName, pageNo, pageSize, containedInstance, hasIpCount); .pageListService(namespaceId, groupName, serviceName, pageNo, pageSize, containedInstance, hasIpCount);
} }
/**
* Get response time of service.
*
* @param request http request
* @return response time information
*/
@RequestMapping("/rt/service")
public ObjectNode rt4Service(HttpServletRequest request) throws NacosException {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
Service service = serviceManager.getService(namespaceId, serviceName);
serviceManager.checkServiceIsNull(service, namespaceId, serviceName);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
ArrayNode clusters = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<String, com.alibaba.nacos.naming.core.Cluster> entry : service.getClusterMap().entrySet()) {
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
HealthCheckTask task = entry.getValue().getHealthCheckTask();
packet.put("name", entry.getKey());
packet.put("checkRTBest", task.getCheckRtBest());
packet.put("checkRTWorst", task.getCheckRtWorst());
packet.put("checkRTNormalized", task.getCheckRtNormalized());
clusters.add(packet);
}
result.replace("clusters", clusters);
return result;
}
private CatalogService judgeCatalogService() { private CatalogService judgeCatalogService() {
return catalogServiceV2; return catalogServiceV2;
} }

View File

@ -16,14 +16,11 @@
package com.alibaba.nacos.naming.core; package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.healthcheck.HealthCheckStatus; import com.alibaba.nacos.naming.healthcheck.HealthCheckStatus;
import com.alibaba.nacos.naming.healthcheck.HealthCheckTask;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.ArrayList; import java.util.ArrayList;
@ -57,9 +54,6 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
private int defIpPort = -1; private int defIpPort = -1;
@JsonIgnore
private HealthCheckTask checkTask;
@JsonIgnore @JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>(); private Set<Instance> persistentInstances = new HashSet<>();
@ -144,9 +138,6 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
if (inited) { if (inited) {
return; return;
} }
checkTask = new HealthCheckTask(this);
HealthCheckReactor.scheduleCheck(checkTask);
inited = true; inited = true;
} }
@ -154,14 +145,6 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
* Destroy cluster. * Destroy cluster.
*/ */
public void destroy() { public void destroy() {
if (checkTask != null) {
checkTask.setCancelled(true);
}
}
@JsonIgnore
public HealthCheckTask getHealthCheckTask() {
return checkTask;
} }
public Service getService() { public Service getService() {
@ -220,7 +203,6 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
Cluster cluster = new Cluster(this.getName(), service); Cluster cluster = new Cluster(this.getName(), service);
cluster.setHealthChecker(getHealthChecker().clone()); cluster.setHealthChecker(getHealthChecker().clone());
cluster.persistentInstances = new HashSet<>(); cluster.persistentInstances = new HashSet<>();
cluster.checkTask = null;
cluster.metadata = new HashMap<>(metadata); cluster.metadata = new HashMap<>(metadata);
return cluster; return cluster;
} }

View File

@ -70,8 +70,8 @@ public class DistroMapper extends MemberChangeListener {
} }
public boolean responsible(Cluster cluster, Instance instance) { public boolean responsible(Cluster cluster, Instance instance) {
return switchDomain.isHealthCheckEnabled(cluster.getServiceName()) && !cluster.getHealthCheckTask() return switchDomain.isHealthCheckEnabled(cluster.getServiceName()) && responsible(cluster.getServiceName())
.isCancelled() && responsible(cluster.getServiceName()) && cluster.contains(instance); && cluster.contains(instance);
} }
/** /**

View File

@ -23,7 +23,6 @@ import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener; import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor; import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor; import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo; import com.alibaba.nacos.naming.healthcheck.RsInfo;
@ -63,9 +62,6 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+"; private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";
@JsonIgnore
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
/** /**
* Identify the information used to determine how many isEmpty judgments the service has experienced. * Identify the information used to determine how many isEmpty judgments the service has experienced.
*/ */
@ -251,9 +247,9 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
} }
if (!clusterMap.containsKey(instance.getClusterName())) { if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn( Loggers.SRV_LOG
"cluster: {} not found, ip: {}, will create new cluster with default configuration.", .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson()); instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this); Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init(); cluster.init();
getClusterMap().put(instance.getClusterName(), cluster); getClusterMap().put(instance.getClusterName(), cluster);
@ -294,7 +290,6 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
* Init service. * Init service.
*/ */
public void init() { public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this); entry.getValue().setService(this);
entry.getValue().init(); entry.getValue().init();
@ -310,7 +305,6 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().destroy(); entry.getValue().destroy();
} }
HealthCheckReactor.cancelCheck(clientBeatCheckTask);
} }
/** /**
@ -495,8 +489,9 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
} }
if (getProtectThreshold() != vDom.getProtectThreshold()) { if (getProtectThreshold() != vDom.getProtectThreshold()) {
Loggers.SRV_LOG.info("[SERVICE-UPDATE] service: {}, protectThreshold: {} -> {}", getName(), Loggers.SRV_LOG
getProtectThreshold(), vDom.getProtectThreshold()); .info("[SERVICE-UPDATE] service: {}, protectThreshold: {} -> {}", getName(), getProtectThreshold(),
vDom.getProtectThreshold());
setProtectThreshold(vDom.getProtectThreshold()); setProtectThreshold(vDom.getProtectThreshold());
} }
@ -507,8 +502,8 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
} }
if (enabled != vDom.getEnabled().booleanValue()) { if (enabled != vDom.getEnabled().booleanValue()) {
Loggers.SRV_LOG.info("[SERVICE-UPDATE] service: {}, enabled: {} -> {}", getName(), enabled, Loggers.SRV_LOG
vDom.getEnabled()); .info("[SERVICE-UPDATE] service: {}, enabled: {} -> {}", getName(), enabled, vDom.getEnabled());
enabled = vDom.getEnabled(); enabled = vDom.getEnabled();
} }
@ -552,8 +547,8 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
} }
for (Instance ip : ips) { for (Instance ip : ips) {
String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip
+ ip.getClusterName(); .getClusterName();
ipsString.append(string); ipsString.append(string);
ipsString.append(','); ipsString.append(',');
} }

View File

@ -1,186 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.constants.FieldsConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.heartbeat.BeatCheckTask;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;
/**
* Client beat check task of service for version 1.x.
*
* @author nkorange
*/
public class ClientBeatCheckTask implements BeatCheckTask {
private Service service;
public static final String EPHEMERAL = "true";
public ClientBeatCheckTask(Service service) {
this.service = service;
}
@JsonIgnore
public UdpPushService getPushService() {
return ApplicationUtils.getBean(UdpPushService.class);
}
@JsonIgnore
public DistroMapper getDistroMapper() {
return ApplicationUtils.getBean(DistroMapper.class);
}
public GlobalConfig getGlobalConfig() {
return ApplicationUtils.getBean(GlobalConfig.class);
}
public SwitchDomain getSwitchDomain() {
return ApplicationUtils.getBean(SwitchDomain.class);
}
@Override
public String taskKey() {
return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());
}
@Override
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam(FieldsConstants.IP, instance.getIp())
.appendParam(FieldsConstants.PORT, String.valueOf(instance.getPort()))
.appendParam(FieldsConstants.EPHEMERAL, EPHEMERAL)
.appendParam(FieldsConstants.CLUSTER_NAME, instance.getClusterName())
.appendParam(FieldsConstants.SERVICE_NAME, service.getName())
.appendParam(FieldsConstants.NAME_SPACE_ID, service.getNamespaceId());
String url = HTTP_PREFIX + InternetAddressUtil.localHostIP() + InternetAddressUtil.IP_PORT_SPLITER + EnvUtil
.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT + "?" + request.toUrl();
// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
}

View File

@ -1,230 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.trace.event.naming.HealthStateChangeTraceEvent;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Health check public methods.
*
* @author nkorange
* @since 1.0.0
*/
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class HealthCheckCommon {
@Autowired
private DistroMapper distroMapper;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private UdpPushService pushService;
/**
* Re-evaluate check response time.
*
* @param checkRT check response time
* @param task health check task
* @param params health params
*/
public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) {
task.setCheckRtLast(checkRT);
if (checkRT > task.getCheckRtWorst()) {
task.setCheckRtWorst(checkRT);
}
if (checkRT < task.getCheckRtBest()) {
task.setCheckRtBest(checkRT);
}
checkRT = (long) ((params.getFactor() * task.getCheckRtNormalized()) + (1 - params.getFactor()) * checkRT);
if (checkRT > params.getMax()) {
checkRT = params.getMax();
}
if (checkRT < params.getMin()) {
checkRT = params.getMin();
}
task.setCheckRtNormalized(checkRT);
}
/**
* Health check pass.
*
* @param ip instance
* @param task health check task
* @param msg message
*/
public void checkOK(Instance ip, HealthCheckTask task, String msg) {
Cluster cluster = task.getCluster();
try {
if (!ip.isHealthy() || !ip.isMockValid()) {
if (ip.getOkCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (distroMapper.responsible(cluster, ip)) {
ip.setHealthy(true);
ip.setMockValid(true);
Service service = cluster.getService();
service.setLastModifiedMillis(System.currentTimeMillis());
pushService.serviceChanged(service);
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(), ip.getPort(),
true, msg));
} else {
if (!ip.isMockValid()) {
ip.setMockValid(true);
Loggers.EVT_LOG
.info("serviceName: {} {PROBE} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
}
}
} else {
Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
ip.getOkCount(), msg);
}
}
} catch (Throwable t) {
Loggers.SRV_LOG.error("[CHECK-OK] error when close check task.", t);
}
ip.getFailCount().set(0);
ip.setBeingChecked(false);
}
/**
* Health check fail, when instance check failed count more than max failed time, set unhealthy.
*
* @param ip instance
* @param task health check task
* @param msg message
*/
public void checkFail(Instance ip, HealthCheckTask task, String msg) {
Cluster cluster = task.getCluster();
try {
if (ip.isHealthy() || ip.isMockValid()) {
if (ip.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (distroMapper.responsible(cluster, ip)) {
ip.setHealthy(false);
ip.setMockValid(false);
Service service = cluster.getService();
service.setLastModifiedMillis(System.currentTimeMillis());
pushService.serviceChanged(service);
Loggers.EVT_LOG
.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(),
ip.getPort(), false, msg));
} else {
Loggers.EVT_LOG
.info("serviceName: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
}
} else {
Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
ip.getFailCount(), msg);
}
}
} catch (Throwable t) {
Loggers.SRV_LOG.error("[CHECK-FAIL] error when close check task.", t);
}
ip.getOkCount().set(0);
ip.setBeingChecked(false);
}
/**
* Health check fail, set instance unhealthy directly.
*
* @param ip instance
* @param task health check task
* @param msg message
*/
public void checkFailNow(Instance ip, HealthCheckTask task, String msg) {
Cluster cluster = task.getCluster();
try {
if (ip.isHealthy() || ip.isMockValid()) {
if (distroMapper.responsible(cluster, ip)) {
ip.setHealthy(false);
ip.setMockValid(false);
Service service = cluster.getService();
service.setLastModifiedMillis(System.currentTimeMillis());
pushService.serviceChanged(service);
Loggers.EVT_LOG
.info("serviceName: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(),
ip.getPort(), false, msg));
} else {
if (ip.isMockValid()) {
ip.setMockValid(false);
Loggers.EVT_LOG
.info("serviceName: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE, msg);
Service service = cluster.getService();
}
}
}
} catch (Throwable t) {
Loggers.SRV_LOG.error("[CHECK-FAIL-NOW] error when close check task.", t);
}
ip.getOkCount().set(0);
ip.setBeingChecked(false);
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
/**
* Health check processor.
*
* @author nkorange
*/
public interface HealthCheckProcessor {
/**
* Run check task for service.
*
* @param task check task
*/
void process(HealthCheckTask task);
/**
* Get check task type, refer to enum HealthCheckType.
*
* @return check type
*/
String getType();
}

View File

@ -1,67 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckExtendProvider;
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckProcessorExtendV1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Delegate of health check.
*
* @author nacos
*/
@Component("healthCheckDelegate")
public class HealthCheckProcessorDelegate implements HealthCheckProcessor {
private Map<String, HealthCheckProcessor> healthCheckProcessorMap = new HashMap<>();
public HealthCheckProcessorDelegate(HealthCheckExtendProvider provider,
HealthCheckProcessorExtendV1 healthCheckProcessorExtend) {
provider.setHealthCheckProcessorExtend(healthCheckProcessorExtend);
provider.init();
}
@Autowired
public void addProcessor(Collection<HealthCheckProcessor> processors) {
healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null)
.collect(Collectors.toMap(HealthCheckProcessor::getType, processor -> processor)));
}
@Override
public void process(HealthCheckTask task) {
String type = task.getCluster().getHealthChecker().getType();
HealthCheckProcessor processor = healthCheckProcessorMap.get(type);
if (processor == null) {
processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
}
processor.process(task);
}
@Override
public String getType() {
return null;
}
}

View File

@ -37,17 +37,6 @@ public class HealthCheckReactor {
private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>(); private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
/**
* Schedule health check task.
*
* @param task health check task
* @return scheduled future
*/
public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
task.setStartTime(System.currentTimeMillis());
return GlobalExecutor.scheduleNamingHealth(task, task.getCheckRtNormalized(), TimeUnit.MILLISECONDS);
}
/** /**
* Schedule health check task for v2. * Schedule health check task for v2.
* *

View File

@ -1,186 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.alibaba.nacos.common.utils.RandomUtils;
/**
* Health check task.
*
* @author nacos
*/
public class HealthCheckTask implements Runnable {
private Cluster cluster;
private long checkRtNormalized = -1;
private long checkRtBest = -1;
private long checkRtWorst = -1;
private long checkRtLast = -1;
private long checkRtLastLast = -1;
private long startTime;
private volatile boolean cancelled = false;
@JsonIgnore
private final DistroMapper distroMapper;
@JsonIgnore
private final SwitchDomain switchDomain;
@JsonIgnore
private final HealthCheckProcessor healthCheckProcessor;
public HealthCheckTask(Cluster cluster) {
this.cluster = cluster;
distroMapper = ApplicationUtils.getBean(DistroMapper.class);
switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
healthCheckProcessor = ApplicationUtils.getBean(HealthCheckProcessorDelegate.class);
initCheckRT();
}
private void initCheckRT() {
// first check time delay
checkRtNormalized =
2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
checkRtBest = Long.MAX_VALUE;
checkRtWorst = 0L;
}
@Override
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
.isHealthCheckEnabled(cluster.getService().getName())) {
healthCheckProcessor.process(this);
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG
.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
}
}
} catch (Throwable e) {
Loggers.SRV_LOG
.error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(),
cluster.getName(), e);
} finally {
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
// worst == 0 means never checked
if (this.getCheckRtWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName())
&& distroMapper.responsible(cluster.getService().getName())) {
// TLog doesn't support float so we must convert it into long
long diff =
((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / this.getCheckRtLastLast();
this.setCheckRtLastLast(this.getCheckRtLast());
Cluster cluster = this.getCluster();
if (Loggers.CHECK_RT.isDebugEnabled()) {
Loggers.CHECK_RT.debug("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
this.getCheckRtNormalized(), this.getCheckRtWorst(), this.getCheckRtBest(),
this.getCheckRtLast(), diff);
}
}
}
}
}
public Cluster getCluster() {
return cluster;
}
public void setCluster(Cluster cluster) {
this.cluster = cluster;
}
public long getCheckRtNormalized() {
return checkRtNormalized;
}
public long getCheckRtBest() {
return checkRtBest;
}
public long getCheckRtWorst() {
return checkRtWorst;
}
public void setCheckRtWorst(long checkRtWorst) {
this.checkRtWorst = checkRtWorst;
}
public void setCheckRtBest(long checkRtBest) {
this.checkRtBest = checkRtBest;
}
public void setCheckRtNormalized(long checkRtNormalized) {
this.checkRtNormalized = checkRtNormalized;
}
public boolean isCancelled() {
return cancelled;
}
public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getCheckRtLast() {
return checkRtLast;
}
public void setCheckRtLast(long checkRtLast) {
this.checkRtLast = checkRtLast;
}
public long getCheckRtLastLast() {
return checkRtLastLast;
}
public void setCheckRtLastLast(long checkRtLastLast) {
this.checkRtLastLast = checkRtLastLast;
}
}

View File

@ -1,193 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.api.naming.pojo.healthcheck.impl.Http;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.HttpUtils;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.misc.HttpClientManager;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;
import static com.alibaba.nacos.naming.misc.Loggers.SRV_LOG;
/**
* HTTP health check processor.
*
* @author xuanyin.zy
*/
@Component("httpHealthCheckProcessorV1")
public class HttpHealthCheckProcessor implements HealthCheckProcessor {
public static final String TYPE = "HTTP";
@Autowired
private SwitchDomain switchDomain;
@Autowired
private HealthCheckCommon healthCheckCommon;
private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager.getProcessorNacosAsyncRestTemplate();
@Override
public String getType() {
return TYPE;
}
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
if (!switchDomain.isHealthCheckEnabled()) {
return;
}
Cluster cluster = task.getCluster();
for (Instance ip : ips) {
try {
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("http check, ip is marked as to skip health check, ip: {}" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("http check started before last one finished, service: {}:{}:{}",
task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getHttpHealthParams());
continue;
}
Http healthChecker = (Http) cluster.getHealthChecker();
int ckPort = cluster.isUseIPPort4Check() ? ip.getPort() : cluster.getDefCkport();
URL host = new URL(HTTP_PREFIX + ip.getIp() + ":" + ckPort);
URL target = new URL(host, healthChecker.getPath());
Map<String, String> customHeaders = healthChecker.getCustomHeaders();
Header header = Header.newInstance();
header.addAll(customHeaders);
ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
new HttpHealthCheckCallback(ip, task));
MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
} catch (Throwable e) {
ip.setCheckRt(switchDomain.getHttpHealthParams().getMax());
healthCheckCommon.checkFail(ip, task, "http:error:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
}
private class HttpHealthCheckCallback implements Callback<String> {
private Instance ip;
private HealthCheckTask task;
private long startTime = System.currentTimeMillis();
public HttpHealthCheckCallback(Instance ip, HealthCheckTask task) {
this.ip = ip;
this.task = task;
}
@Override
public void onReceive(RestResult<String> result) {
ip.setCheckRt(System.currentTimeMillis() - startTime);
int httpCode = result.getCode();
if (HttpURLConnection.HTTP_OK == httpCode) {
healthCheckCommon.checkOK(ip, task, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getHttpHealthParams());
} else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
|| HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
// server is busy, need verification later
healthCheckCommon.checkFail(ip, task, "http:" + httpCode);
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
} else {
//probably means the state files has been removed by administrator
healthCheckCommon.checkFailNow(ip, task, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public void onError(Throwable t) {
ip.setCheckRt(System.currentTimeMillis() - startTime);
Throwable cause = t;
int maxStackDepth = 50;
for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
if (HttpUtils.isTimeoutException(t)) {
healthCheckCommon.checkFail(ip, task, "http:timeout:" + cause.getMessage());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getHttpHealthParams());
return;
}
cause = cause.getCause();
}
// connection error, probably not reachable
if (t instanceof ConnectException) {
healthCheckCommon.checkFailNow(ip, task, "http:unable2connect:" + t.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
} else {
healthCheckCommon.checkFail(ip, task, "http:error:" + t.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public void onCancel() {
}
}
}

View File

@ -1,211 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.api.naming.pojo.healthcheck.impl.Mysql;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import static com.alibaba.nacos.naming.misc.Loggers.SRV_LOG;
/**
* MYSQL health check processor.
*
* @author nacos
*/
@Component("mysqlHealthCheckProcessorV1")
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
public static final String TYPE = "MYSQL";
@Autowired
private HealthCheckCommon healthCheckCommon;
@Autowired
private SwitchDomain switchDomain;
public static final int CONNECT_TIMEOUT_MS = 500;
private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
private static final String MYSQL_SLAVE_READONLY = "ON";
private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();
public MysqlHealthCheckProcessor() {
}
@Override
public String getType() {
return TYPE;
}
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs(false);
SRV_LOG.debug("mysql check, ips:" + ips);
if (CollectionUtils.isEmpty(ips)) {
return;
}
for (Instance ip : ips) {
try {
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("mysql check, ip is marked as to skip health check, ip: {}", ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("mysql check started before last one finished, service: {}:{}:{}",
task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getMysqlHealthParams());
continue;
}
GlobalExecutor.executeMysqlCheckTask(new MysqlCheckTask(ip, task));
MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet();
} catch (Exception e) {
ip.setCheckRt(switchDomain.getMysqlHealthParams().getMax());
healthCheckCommon.checkFail(ip, task, "mysql:error:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
}
}
}
private class MysqlCheckTask implements Runnable {
private Instance ip;
private HealthCheckTask task;
private long startTime = System.currentTimeMillis();
public MysqlCheckTask(Instance ip, HealthCheckTask task) {
this.ip = ip;
this.task = task;
}
@Override
public void run() {
Statement statement = null;
ResultSet resultSet = null;
try {
Cluster cluster = task.getCluster();
String key = cluster.getService().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip
.getPort();
Connection connection = CONNECTION_POOL.get(key);
Mysql config = (Mysql) cluster.getHealthChecker();
if (connection == null || connection.isClosed()) {
String url =
"jdbc:mysql://" + ip.getIp() + ":" + ip.getPort() + "?connectTimeout=" + CONNECT_TIMEOUT_MS
+ "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;
connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());
CONNECTION_POOL.put(key, connection);
}
statement = connection.createStatement();
statement.setQueryTimeout(1);
resultSet = statement.executeQuery(config.getCmd());
int resultColumnIndex = 2;
if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
resultSet.next();
if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
throw new IllegalStateException("current node is slave!");
}
}
healthCheckCommon.checkOK(ip, task, "mysql:+ok");
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getMysqlHealthParams());
} catch (SQLException e) {
// fail immediately
healthCheckCommon.checkFailNow(ip, task, "mysql:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
} catch (Throwable t) {
Throwable cause = t;
int maxStackDepth = 50;
for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
if (cause instanceof SocketTimeoutException || cause instanceof ConnectException
|| cause instanceof TimeoutException || cause.getCause() instanceof TimeoutException) {
healthCheckCommon.checkFail(ip, task, "mysql:timeout:" + cause.getMessage());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getMysqlHealthParams());
return;
}
cause = cause.getCause();
}
// connection error, probably not reachable
healthCheckCommon.checkFail(ip, task, "mysql:error:" + t.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
} finally {
ip.setCheckRt(System.currentTimeMillis() - startTime);
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close statement:" + statement, e);
}
}
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close resultSet:" + resultSet, e);
}
}
}
}
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import org.springframework.stereotype.Component;
/**
* Health checker that does nothing.
*
* @author nkorange
* @since 1.0.0
*/
@Component
public class NoneHealthCheckProcessor implements HealthCheckProcessor {
public static final String TYPE = "NONE";
@Override
public void process(HealthCheckTask task) {
}
@Override
public String getType() {
return TYPE;
}
}

View File

@ -1,426 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.naming.misc.Loggers.SRV_LOG;
/**
* TCP health check processor.
*
* @author nacos
*/
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
public static final String TYPE = "TCP";
@Autowired
private HealthCheckCommon healthCheckCommon;
@Autowired
private SwitchDomain switchDomain;
public static final int CONNECT_TIMEOUT_MS = 500;
private Map<String, BeatKey> keyMap = new ConcurrentHashMap<>();
private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>();
/**
* this value has been carefully tuned, do not modify unless you're confident.
*/
private static final int NIO_THREAD_COUNT = EnvUtil.getAvailableProcessors(0.5);
/**
* because some hosts doesn't support keep-alive connections, disabled temporarily.
*/
private static final long TCP_KEEP_ALIVE_MILLIS = 0;
private Selector selector;
/**
* Tcp super sense processor construct.
*
* @throws IllegalStateException when construct failed
*/
public TcpSuperSenseProcessor() {
try {
selector = Selector.open();
GlobalExecutor.submitTcpCheck(this);
} catch (Exception e) {
throw new IllegalStateException("Error while initializing SuperSense(TM).");
}
}
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
for (Instance ip : ips) {
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished, service: " + task.getCluster().getService()
.getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":" + ip.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}
Beat beat = new Beat(ip, task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
private void processTask() throws Exception {
Collection<Callable<Void>> tasks = new LinkedList<>();
do {
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
if (beat == null) {
return;
}
tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
f.get();
}
}
@Override
public void run() {
while (true) {
try {
processTask();
int readyCount = selector.selectNow();
if (readyCount <= 0) {
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));
}
} catch (Throwable e) {
SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
}
}
}
public class PostProcessor implements Runnable {
SelectionKey key;
public PostProcessor(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {
if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();
beat.finishCheck();
return;
}
if (key.isValid() && key.isConnectable()) {
//connected
channel.finishConnect();
beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(),
"tcp:ok+");
}
if (key.isValid() && key.isReadable()) {
//disconnected
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {
key.cancel();
key.channel().close();
} else {
// not terminate request, ignore
}
}
} catch (ConnectException e) {
// unable to connect, possibly port not opened
beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(),
"tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
"tcp:error:" + e.getMessage());
try {
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
private class Beat {
Instance ip;
HealthCheckTask task;
long startTime = System.currentTimeMillis();
Beat(Instance ip, HealthCheckTask task) {
this.ip = ip;
this.task = task;
}
public void setStartTime(long time) {
startTime = time;
}
public long getStartTime() {
return startTime;
}
public Instance getIp() {
return ip;
}
public HealthCheckTask getTask() {
return task;
}
public boolean isHealthy() {
return System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(30L);
}
/**
* finish check only, no ip state will be changed.
*/
public void finishCheck() {
ip.setBeingChecked(false);
}
public void finishCheck(boolean success, boolean now, long rt, String msg) {
ip.setCheckRt(System.currentTimeMillis() - startTime);
if (success) {
healthCheckCommon.checkOK(ip, task, msg);
} else {
if (now) {
healthCheckCommon.checkFailNow(ip, task, msg);
} else {
healthCheckCommon.checkFail(ip, task, msg);
}
keyMap.remove(task.toString());
}
healthCheckCommon.reEvaluateCheckRT(rt, task, switchDomain.getTcpHealthParams());
}
@Override
public String toString() {
return task.getCluster().getService().getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":"
+ ip.getPort();
}
@Override
public int hashCode() {
return Objects.hash(ip.toJson());
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof Beat)) {
return false;
}
return this.toString().equals(obj.toString());
}
}
private static class BeatKey {
public SelectionKey key;
public long birthTime;
public BeatKey(SelectionKey key) {
this.key = key;
this.birthTime = System.currentTimeMillis();
}
}
private static class TimeOutTask implements Runnable {
SelectionKey key;
public TimeOutTask(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
if (key != null && key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
Beat beat = (Beat) key.attachment();
if (channel.isConnected()) {
return;
}
try {
channel.finishConnect();
} catch (Exception ignore) {
}
try {
beat.finishCheck(false, false, beat.getTask().getCheckRtNormalized() * 2, "tcp:timeout");
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
private class TaskProcessor implements Callable<Void> {
private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
Beat beat;
public TaskProcessor(Beat beat) {
this.beat = beat;
}
@Override
public Void call() {
long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}
SocketChannel channel = null;
try {
Instance instance = beat.getIp();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.setBeingChecked(false);
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
Cluster cluster = beat.getTask().getCluster();
int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(), port));
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));
beat.setStartTime(System.currentTimeMillis());
GlobalExecutor
.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
"tcp:error:" + e.getMessage());
if (channel != null) {
try {
channel.close();
} catch (Exception ignore) {
}
}
}
return null;
}
}
@Override
public String getType() {
return TYPE;
}
}

View File

@ -34,7 +34,7 @@ public abstract class AbstractHealthCheckProcessorExtend implements BeanFactoryA
protected SingletonBeanRegistry registry; protected SingletonBeanRegistry registry;
/** /**
* Add HealthCheckProcessor Or HealthCheckProcessorV2. * Add HealthCheckProcessorV2.
* *
* @param origin Origin Checker Type * @param origin Origin Checker Type
* @return Extend Processor Type * @return Extend Processor Type

View File

@ -1,54 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck.extend;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessor;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Health Check Processor Extend V1.
*
* @author sunmengying
*/
@Component
public class HealthCheckProcessorExtendV1 extends AbstractHealthCheckProcessorExtend {
private final Collection<HealthCheckProcessor> processors = NacosServiceLoader.load(HealthCheckProcessor.class);
@Override
public Set<String> addProcessor(Set<String> origin) {
Iterator<HealthCheckProcessor> processorIt = processors.iterator();
Set<String> processorType = new HashSet<>(origin);
while (processorIt.hasNext()) {
HealthCheckProcessor processor = processorIt.next();
String type = processor.getType();
if (processorType.contains(type)) {
throw new RuntimeException(
"More than one processor of the same type was found : [type=\"" + type + "\"]");
}
processorType.add(type);
registry.registerSingleton(lowerFirstChar(processor.getClass().getSimpleName()), processor);
}
return processorType;
}
}

View File

@ -18,7 +18,6 @@ package com.alibaba.nacos.naming.healthcheck.v2.processor;
import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.healthcheck.NoneHealthCheckProcessor;
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckExtendProvider; import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckExtendProvider;
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckProcessorExtendV2; import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckProcessorExtendV2;
import com.alibaba.nacos.naming.healthcheck.v2.HealthCheckTaskV2; import com.alibaba.nacos.naming.healthcheck.v2.HealthCheckTaskV2;
@ -41,7 +40,7 @@ public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>(); private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
public HealthCheckProcessorV2Delegate(HealthCheckExtendProvider provider, public HealthCheckProcessorV2Delegate(HealthCheckExtendProvider provider,
HealthCheckProcessorExtendV2 healthCheckProcessorExtend) { HealthCheckProcessorExtendV2 healthCheckProcessorExtend) {
provider.setHealthCheckProcessorExtend(healthCheckProcessorExtend); provider.setHealthCheckProcessorExtend(healthCheckProcessorExtend);
provider.init(); provider.init();
} }

View File

@ -19,7 +19,6 @@ package com.alibaba.nacos.naming;
import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.ServiceManager; import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessorDelegate;
import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.UdpPushService; import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.env.EnvUtil;
@ -70,9 +69,6 @@ public abstract class BaseTest {
@Spy @Spy
protected SwitchDomain switchDomain; protected SwitchDomain switchDomain;
@Mock
protected HealthCheckProcessorDelegate delegate;
@Mock @Mock
protected UdpPushService pushService; protected UdpPushService pushService;
@ -92,10 +88,6 @@ public abstract class BaseTest {
doReturn(pushService).when(context).getBean(UdpPushService.class); doReturn(pushService).when(context).getBean(UdpPushService.class);
} }
protected void mockInjectHealthCheckProcessor() {
doReturn(delegate).when(context).getBean(HealthCheckProcessorDelegate.class);
}
protected void mockInjectSwitchDomain() { protected void mockInjectSwitchDomain() {
doReturn(switchDomain).when(context).getBean(SwitchDomain.class); doReturn(switchDomain).when(context).getBean(SwitchDomain.class);
} }

View File

@ -18,39 +18,27 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.naming.core.Cluster; import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.HealthCheckTask;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class CatalogControllerTest { public class CatalogControllerTest {
@Mock
private ServiceManager serviceManager;
@Mock @Mock
protected UpgradeJudgement upgradeJudgement; protected UpgradeJudgement upgradeJudgement;
@ -60,13 +48,9 @@ public class CatalogControllerTest {
private Cluster cluster; private Cluster cluster;
@Mock
private HealthCheckTask healthCheckTask;
@Before @Before
public void setUp() throws NoSuchFieldException, IllegalAccessException, NacosException { public void setUp() throws NoSuchFieldException, IllegalAccessException, NacosException {
catalogController = new CatalogController(); catalogController = new CatalogController();
ReflectionTestUtils.setField(catalogController, "serviceManager", serviceManager);
ReflectionTestUtils.setField(catalogController, "upgradeJudgement", upgradeJudgement); ReflectionTestUtils.setField(catalogController, "upgradeJudgement", upgradeJudgement);
service = new Service(TEST_SERVICE_NAME); service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID); service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
@ -74,13 +58,7 @@ public class CatalogControllerTest {
service.setGroupName(TEST_GROUP_NAME); service.setGroupName(TEST_GROUP_NAME);
cluster = new Cluster(TEST_CLUSTER_NAME, service); cluster = new Cluster(TEST_CLUSTER_NAME, service);
cluster.setDefaultPort(1); cluster.setDefaultPort(1);
Mockito.when(healthCheckTask.getCheckRtBest()).thenReturn(1L);
Mockito.when(healthCheckTask.getCheckRtWorst()).thenReturn(1L);
ReflectionTestUtils.setField(cluster, "checkTask", healthCheckTask);
service.addCluster(cluster); service.addCluster(cluster);
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID,
TEST_GROUP_NAME + Constants.SERVICE_INFO_SPLITER + TEST_SERVICE_NAME)).thenReturn(service);
doCallRealMethod().when(serviceManager).checkServiceIsNull(eq(null), anyString(), anyString());
} }
@Test @Test
@ -141,19 +119,4 @@ public class CatalogControllerTest {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
} }
@Test
public void testRt4Service() {
MockHttpServletRequest request = new MockHttpServletRequest();
request.addParameter(CommonParams.SERVICE_NAME,
TEST_GROUP_NAME + Constants.SERVICE_INFO_SPLITER + TEST_SERVICE_NAME);
try {
ObjectNode objectNode = catalogController.rt4Service(request);
String result = objectNode.toString();
assertTrue(result.contains("\"checkRTWorst\":1"));
} catch (NacosException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
} }

View File

@ -1,128 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.selector.NoneSelector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ServiceTest extends BaseTest {
private Service service;
@Before
public void before() {
super.before();
service = new Service("test-service");
mockInjectPushServer();
mockInjectHealthCheckProcessor();
mockInjectDistroMapper();
mockInjectSwitchDomain();
}
@After
public void tearDown() throws Exception {
service.destroy();
}
@Test
public void testUpdateIPs() {
List<Instance> instances = new ArrayList<>();
Instance instance = new Instance("1.1.1.1", 1, "test-instance1");
instances.add(instance);
service.updateIPs(instances, true);
Assert.assertEquals(instances, service.allIPs(true));
instances = new ArrayList<>();
instance = new Instance();
instance.setIp("2.2.2.2");
instance.setPort(2);
instances.add(instance);
instances.add(null);
service.updateIPs(instances, true);
instances.remove(null);
Assert.assertEquals(instances, service.allIPs(true));
}
@Test
public void testSerialize() throws Exception {
String actual = new Service("test-service").toJson();
System.out.println(actual);
assertTrue(actual.contains("\"checksum\":\"394c845e1160bb880e7f26fb2149ed6d\""));
assertTrue(actual.contains("\"clusterMap\":{}"));
assertTrue(actual.contains("\"empty\":true"));
assertTrue(actual.contains("\"enabled\":true"));
assertTrue(actual.contains("\"finalizeCount\":0"));
assertTrue(actual.contains("\"ipDeleteTimeout\":30000"));
assertTrue(actual.contains("\"lastModifiedMillis\":0"));
assertTrue(actual.contains("\"metadata\":{}"));
assertTrue(actual.contains("\"name\":\"test-service\""));
assertTrue(actual.contains("\"owners\":[]"));
assertTrue(actual.contains("\"protectThreshold\":0.0"));
assertTrue(actual.contains("\"resetWeight\":false"));
assertFalse(actual.contains("clientBeatCheckTask"));
assertFalse(actual.contains("serviceString"));
assertFalse(actual.contains("pushService"));
}
@Test
@SuppressWarnings("checkstyle:linelength")
public void testDeserialize() throws Exception {
JacksonUtils.registerSubtype(NoneSelector.class, SelectorType.none.name());
String example = "{\"checksum\":\"394c845e1160bb880e7f26fb2149ed6d\",\"clusterMap\":{},\"empty\":true,\"enabled\":true,\"finalizeCount\":0,\"ipDeleteTimeout\":30000,\"lastModifiedMillis\":0,\"metadata\":{},\"name\":\"test-service\",\"owners\":[],\"protectThreshold\":0.0,\"resetWeight\":false,\"selector\":{\"type\":\"none\"}}";
Service actual = JacksonUtils.toObj(example, Service.class);
assertEquals("394c845e1160bb880e7f26fb2149ed6d", actual.getChecksum());
assertEquals("test-service", actual.getName());
assertTrue(actual.getClusterMap().isEmpty());
assertTrue(actual.isEmpty());
assertTrue(actual.getEnabled());
assertTrue(actual.getMetadata().isEmpty());
assertTrue(actual.getOwners().isEmpty());
assertEquals(0, actual.getFinalizeCount());
assertEquals(30000, actual.getIpDeleteTimeout());
assertEquals(0, actual.getLastModifiedMillis());
assertEquals(0, actual.getLastModifiedMillis());
assertEquals(0.0, actual.getProtectThreshold(), 0);
assertFalse(actual.getResetWeight());
assertThat(actual.getSelector(), instanceOf(NoneSelector.class));
}
@Test
public void testGetServiceString() {
String actual = service.getServiceString();
assertTrue(actual.contains("\"invalidIPCount\":0"));
assertTrue(actual.contains("\"name\":\"test-service\""));
assertTrue(actual.contains("\"ipCount\":0"));
assertTrue(actual.contains("\"owners\":[]"));
assertTrue(actual.contains("\"protectThreshold\":0.0"));
assertTrue(actual.contains("\"clusters\":[]"));
}
}

View File

@ -1,151 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ClientBeatCheckTaskTest {
@InjectMocks
@Spy
private ClientBeatCheckTask clientBeatCheckTask;
@Mock
private DistroMapper distroMapperSpy;
@Mock
private Service serviceSpy;
@Mock
private GlobalConfig globalConfig;
@Mock
private UdpPushService pushService;
@Mock
private SwitchDomain switchDomain;
@Mock
private UpgradeJudgement upgradeJudgement;
@Mock
private ConfigurableApplicationContext context;
@Before
public void init() {
ReflectionTestUtils.setField(clientBeatCheckTask, "service", serviceSpy);
Mockito.doReturn(distroMapperSpy).when(clientBeatCheckTask).getDistroMapper();
Mockito.doReturn(globalConfig).when(clientBeatCheckTask).getGlobalConfig();
Mockito.doReturn(pushService).when(clientBeatCheckTask).getPushService();
Mockito.doReturn(switchDomain).when(clientBeatCheckTask).getSwitchDomain();
ApplicationUtils.injectContext(context);
when(context.getBean(UpgradeJudgement.class)).thenReturn(upgradeJudgement);
}
@Test
public void testHeartBeatNotTimeout() {
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(false);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "1000000000");
instance.setMetadata(metadata);
Mockito.doReturn("test").when(serviceSpy).getName();
Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString());
clientBeatCheckTask.run();
Assert.assertTrue(instance.isHealthy());
}
@Test
public void testHeartBeatTimeout() {
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis() - 1000);
instance.setMarked(false);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "10");
instance.setMetadata(metadata);
List<Instance> instances = new ArrayList<>();
instances.add(instance);
Mockito.doReturn("test").when(serviceSpy).getName();
Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString());
Mockito.doReturn(true).when(switchDomain).isHealthCheckEnabled();
when(serviceSpy.allIPs(true)).thenReturn(instances);
clientBeatCheckTask.run();
Assert.assertFalse(instance.isHealthy());
}
@Test
public void testIpDeleteTimeOut() {
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(true);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10");
instance.setMetadata(metadata);
Mockito.doReturn(true).when(distroMapperSpy).responsible(null);
clientBeatCheckTask.run();
}
@Test
public void testIpDeleteNotTimeOut() {
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(true);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10000");
instance.setMetadata(metadata);
Mockito.doReturn(true).when(distroMapperSpy).responsible(null);
clientBeatCheckTask.run();
}
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck.extend;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessor;
import com.alibaba.nacos.naming.healthcheck.MysqlHealthCheckProcessor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.beans.factory.config.SingletonBeanRegistry;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class HealthCheckProcessorExtendV1Test {
@Mock
private SingletonBeanRegistry registry;
private HealthCheckProcessorExtendV1 healthCheckProcessorExtendV1;
private HealthCheckProcessor mysqlProcessor;
@Before
public void setUp() {
healthCheckProcessorExtendV1 = new HealthCheckProcessorExtendV1();
Collection<HealthCheckProcessor> processors = new ArrayList<>();
mysqlProcessor = new MysqlHealthCheckProcessor();
processors.add(mysqlProcessor);
ReflectionTestUtils.setField(healthCheckProcessorExtendV1, "processors", processors);
ReflectionTestUtils.setField(healthCheckProcessorExtendV1, "registry", registry);
}
@Test
public void addProcessor() {
Set<String> origin = new HashSet<>();
origin.add("HTTP");
healthCheckProcessorExtendV1.addProcessor(origin);
verify(registry).registerSingleton(healthCheckProcessorExtendV1
.lowerFirstChar(mysqlProcessor.getClass().getSimpleName()), mysqlProcessor);
}
}