[#359] Use SPI to make health scalable

This commit is contained in:
XCXCXCXCX 2019-04-27 13:55:04 +08:00
parent fceff72272
commit 6615ae8106
6 changed files with 164 additions and 57 deletions

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.api.naming.pojo; package com.alibaba.nacos.api.naming.pojo;
import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.serializer.SerializeWriter;
import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.common.Constants;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -45,8 +46,16 @@ public abstract class AbstractHealthChecker implements Cloneable {
* @return Another instance with exactly the same fields. * @return Another instance with exactly the same fields.
* @throws CloneNotSupportedException * @throws CloneNotSupportedException
*/ */
@Override
public abstract AbstractHealthChecker clone() throws CloneNotSupportedException; public abstract AbstractHealthChecker clone() throws CloneNotSupportedException;
/**
* used to JsonAdapter
*/
public void jsonAdapterCallback(SerializeWriter writer){
// do nothing
}
public static class None extends AbstractHealthChecker { public static class None extends AbstractHealthChecker {
public static final String TYPE = "NONE"; public static final String TYPE = "NONE";
@ -116,6 +125,17 @@ public abstract class AbstractHealthChecker implements Cloneable {
return headerMap; return headerMap;
} }
/**
* used to JsonAdapter
*
* @param writer
*/
@Override
public void jsonAdapterCallback(SerializeWriter writer) {
writer.writeFieldValue(',', "path", getPath());
writer.writeFieldValue(',', "headers", getHeaders());
}
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(path, headers, expectedResponseCode); return Objects.hash(path, headers, expectedResponseCode);
@ -215,6 +235,18 @@ public abstract class AbstractHealthChecker implements Cloneable {
this.pwd = pwd; this.pwd = pwd;
} }
/**
* used to JsonAdapter
*
* @param writer
*/
@Override
public void jsonAdapterCallback(SerializeWriter writer) {
writer.writeFieldValue(',', "user", getUser());
writer.writeFieldValue(',', "pwd", getPwd());
writer.writeFieldValue(',', "cmd", getCmd());
}
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(user, pwd, cmd); return Objects.hash(user, pwd, cmd);

View File

@ -15,48 +15,45 @@
*/ */
package com.alibaba.nacos.naming.healthcheck; package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckExtendProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* @author nacos * @author nacos
*/ */
@Component("healthCheckDelegate") @Component("healthCheckDelegate")
public class HealthCheckProcessorDelegate implements HealthCheckProcessor { public class HealthCheckProcessorDelegate implements HealthCheckProcessor {
@Autowired private Map<String, HealthCheckProcessor> healthCheckProcessorMap
private HttpHealthCheckProcessor httpProcessor; = new HashMap<>();
public HealthCheckProcessorDelegate(HealthCheckExtendProvider provider) {
provider.init();
}
@Autowired @Autowired
private TcpSuperSenseProcessor tcpProcessor; public void addProcessor(Collection<HealthCheckProcessor> processors){
healthCheckProcessorMap.putAll(processors.stream()
@Autowired .filter(processor -> processor.getType() != null)
private MysqlHealthCheckProcessor mysqlProcessor; .collect(Collectors.toMap(HealthCheckProcessor::getType, processor -> processor)));
}
@Autowired
private NoneHealthCheckProcessor noneProcessor;
@Override @Override
public void process(HealthCheckTask task) { public void process(HealthCheckTask task) {
String type = task.getCluster().getHealthChecker().getType(); String type = task.getCluster().getHealthChecker().getType();
HealthCheckProcessor processor = healthCheckProcessorMap.get(type);
if (type.equals(httpProcessor.getType())) { if(processor == null){
httpProcessor.process(task); processor = healthCheckProcessorMap.get("none");
return;
} }
if (type.equals(tcpProcessor.getType())) { processor.process(task);
tcpProcessor.process(task);
return;
}
if (type.equals(mysqlProcessor.getType())) {
mysqlProcessor.process(task);
return;
}
noneProcessor.process(task);
} }
@Override @Override

View File

@ -15,6 +15,11 @@
*/ */
package com.alibaba.nacos.naming.healthcheck; package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* @author nkorange * @author nkorange
*/ */
@ -22,17 +27,37 @@ public enum HealthCheckType {
/** /**
* TCP type * TCP type
*/ */
TCP, TCP("tcp", AbstractHealthChecker.Tcp.class),
/** /**
* HTTP type * HTTP type
*/ */
HTTP, HTTP("http", AbstractHealthChecker.Http.class),
/** /**
* MySQL type * MySQL type
*/ */
MYSQL, MYSQL("mysql", AbstractHealthChecker.Mysql.class),
/** /**
* No check * No check
*/ */
NONE NONE("none", AbstractHealthChecker.None.class);
private String name;
private Class healthCheckerClass;
private static Map<String, Class> EXTEND =
new ConcurrentHashMap<>();
HealthCheckType(String name, Class healthCheckerClass) {
this.name = name;
this.healthCheckerClass = healthCheckerClass;
}
public static void registerHealthChecker(String type, Class healthCheckerClass){
EXTEND.putIfAbsent(type, healthCheckerClass);
}
public static Class ofHealthCheckerClass(String type){
return valueOf(type) == null ? EXTEND.get(type) : valueOf(type).healthCheckerClass;
}
} }

View File

@ -42,25 +42,16 @@ public class JsonAdapter implements ObjectDeserializer, ObjectSerializer {
return INSTANCE; return INSTANCE;
} }
@SuppressWarnings("unchecked")
@Override @Override
public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) { public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) {
JSONObject jsonObj = (JSONObject) parser.parse(); JSONObject jsonObj = (JSONObject) parser.parse();
String checkType = jsonObj.getString("type"); String checkType = jsonObj.getString("type");
if (StringUtils.equals(checkType, AbstractHealthChecker.Http.TYPE)) { Class target = HealthCheckType.ofHealthCheckerClass(checkType);
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Http.class);
}
if (StringUtils.equals(checkType, AbstractHealthChecker.Tcp.TYPE)) { if(target != null){
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Tcp.class); return (T) JSON.parseObject(jsonObj.toJSONString(), target);
}
if (StringUtils.equals(checkType, AbstractHealthChecker.None.TYPE)) {
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.None.class);
}
if (StringUtils.equals(checkType, AbstractHealthChecker.Mysql.TYPE)) {
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Mysql.class);
} }
return null; return null;
@ -83,21 +74,6 @@ public class JsonAdapter implements ObjectDeserializer, ObjectSerializer {
writer.writeFieldValue(',', "type", config.getType()); writer.writeFieldValue(',', "type", config.getType());
if (StringUtils.equals(config.getType(), HealthCheckType.HTTP.name())) { config.jsonAdapterCallback(writer);
AbstractHealthChecker.Http httpCheckConfig = (AbstractHealthChecker.Http) config;
writer.writeFieldValue(',', "path", httpCheckConfig.getPath());
writer.writeFieldValue(',', "headers", httpCheckConfig.getHeaders());
}
if (StringUtils.equals(config.getType(), HealthCheckType.TCP.name())) {
// nothing sepcial to handle
}
if (StringUtils.equals(config.getType(), HealthCheckType.MYSQL.name())) {
AbstractHealthChecker.Mysql mysqlCheckConfig = (AbstractHealthChecker.Mysql) config;
writer.writeFieldValue(',', "user", mysqlCheckConfig.getUser());
writer.writeFieldValue(',', "pwd", mysqlCheckConfig.getPwd());
writer.writeFieldValue(',', "cmd", mysqlCheckConfig.getCmd());
}
} }
} }

View File

@ -0,0 +1,77 @@
package com.alibaba.nacos.naming.healthcheck.extend;
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckType;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SingletonBeanRegistry;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.Set;
/**
* @author XCXCXCXCX
*/
@Component
public class HealthCheckExtendProvider implements BeanFactoryAware{
private ServiceLoader<HealthCheckProcessor> processorLoader
= ServiceLoader.load(HealthCheckProcessor.class);
private ServiceLoader<AbstractHealthChecker> checkerLoader
= ServiceLoader.load(AbstractHealthChecker.class);
private SingletonBeanRegistry registry;
private static final char LOWER_A = 'A';
private static final char LOWER_Z = 'Z';
public void init(){
loadExtend();
}
private void loadExtend() {
Iterator<HealthCheckProcessor> processorIt = processorLoader.iterator();
Iterator<AbstractHealthChecker> healthCheckerIt = checkerLoader.iterator();
Set<String> processorType = new HashSet<>();
Set<String> healthCheckerType = new HashSet<>();
while(processorIt.hasNext()){
HealthCheckProcessor processor = processorIt.next();
processorType.add(processor.getType());
registry.registerSingleton(lowerFirstChar(processor.getClass().getSimpleName()), processor);
}
while(healthCheckerIt.hasNext()){
AbstractHealthChecker checker = healthCheckerIt.next();
healthCheckerType.add(checker.getType());
HealthCheckType.registerHealthChecker(checker.getType(), checker.getClass());
}
if(!processorType.equals(healthCheckerType)){
throw new RuntimeException("An unmatched processor and healthChecker are detected in the extension package.");
}
}
private String lowerFirstChar(String simpleName) {
if(simpleName == null || "".equals(simpleName)){
throw new IllegalArgumentException("can't find extend processor class name");
}
char[] chars = simpleName.toCharArray();
if(chars[0] >= LOWER_A && chars[0] <= LOWER_Z){
chars[0] = (char)(chars[0] + 32);
}
return String.valueOf(chars);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
if(beanFactory instanceof SingletonBeanRegistry){
this.registry = (SingletonBeanRegistry) beanFactory;
}
}
}