Merge remote-tracking branch 'origin/0.3.0' into feature_ServiceMesh

This commit is contained in:
wyp12 2018-10-17 19:46:12 +08:00
commit 27d797ae60
4 changed files with 80 additions and 30 deletions

View File

@ -21,8 +21,15 @@ import java.util.concurrent.*;
* @author nacos * @author nacos
*/ */
public class HealthCheckReactor { public class HealthCheckReactor {
private static final ScheduledExecutorService EXECUTOR = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() / 2, new ThreadFactory() { private static final ScheduledExecutorService EXECUTOR;
static {
int processorCount = Runtime.getRuntime().availableProcessors();
EXECUTOR
= Executors
.newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread thread = new Thread(r); Thread thread = new Thread(r);
@ -31,6 +38,7 @@ public class HealthCheckReactor {
return thread; return thread;
} }
}); });
}
public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) { public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
task.setStartTime(System.currentTimeMillis()); task.setStartTime(System.currentTimeMillis());

View File

@ -48,8 +48,13 @@ public class MysqlHealthCheckProcessor extends AbstractHealthCheckProcessor {
private static ConcurrentMap<String, Connection> CONNECTION_POOL private static ConcurrentMap<String, Connection> CONNECTION_POOL
= new ConcurrentHashMap<String, Connection>(); = new ConcurrentHashMap<String, Connection>();
private static ExecutorService EXECUTOR private static ExecutorService EXECUTOR;
= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2,
static {
int processorCount = Runtime.getRuntime().availableProcessors();
EXECUTOR
= Executors.newFixedThreadPool(processorCount <= 1 ? 1 : processorCount / 2,
new ThreadFactory() { new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -60,6 +65,7 @@ public class MysqlHealthCheckProcessor extends AbstractHealthCheckProcessor {
} }
} }
); );
}
public MysqlHealthCheckProcessor() { public MysqlHealthCheckProcessor() {
} }
@ -129,7 +135,8 @@ public class MysqlHealthCheckProcessor extends AbstractHealthCheckProcessor {
Statement statement = null; Statement statement = null;
ResultSet resultSet = null; ResultSet resultSet = null;
try {; try {
;
Cluster cluster = task.getCluster(); Cluster cluster = task.getCluster();
String key = cluster.getDom().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort(); String key = cluster.getDom().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();
Connection connection = CONNECTION_POOL.get(key); Connection connection = CONNECTION_POOL.get(key);
@ -191,7 +198,7 @@ public class MysqlHealthCheckProcessor extends AbstractHealthCheckProcessor {
reEvaluateCheckRT(Switch.getMysqlHealthParams().getMax(), task, Switch.getMysqlHealthParams()); reEvaluateCheckRT(Switch.getMysqlHealthParams().getMax(), task, Switch.getMysqlHealthParams());
} finally { } finally {
ip.setCheckRT(System.currentTimeMillis() - startTime); ip.setCheckRT(System.currentTimeMillis() - startTime);
if (statement!=null) { if (statement != null) {
try { try {
statement.close(); statement.close();
} catch (SQLException e) { } catch (SQLException e) {

View File

@ -47,7 +47,8 @@ public class TcpSuperSenseProcessor extends AbstractHealthCheckProcessor impleme
/** /**
* this value has been carefully tuned, do not modify unless you're confident * this value has been carefully tuned, do not modify unless you're confident
*/ */
public static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() / 2; public static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ?
1 : Runtime.getRuntime().availableProcessors() / 2;
/** /**
* because some hosts doesn't support keep-alive connections, disabled temporarily * because some hosts doesn't support keep-alive connections, disabled temporarily

View File

@ -29,7 +29,9 @@ import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.test.naming.NamingBase.*; import static com.alibaba.nacos.test.naming.NamingBase.*;
@ -49,24 +51,28 @@ public class RegisterInstance_ITCase {
private NamingService naming2; private NamingService naming2;
@LocalServerPort @LocalServerPort
private int port; private int port;
@Before @Before
public void init() throws Exception{ public void init() throws Exception {
if (naming == null) { if (naming == null) {
TimeUnit.SECONDS.sleep(10); TimeUnit.SECONDS.sleep(10);
naming = NamingFactory.createNamingService("127.0.0.1"+":"+port); naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
} }
} }
/** /**
* 注册一个默认cluster的Instance并验证 * 注册一个默认cluster的Instance并验证
*
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void regDomTest() throws Exception{ public void regDomTest() throws Exception {
String serviceName = randomDomainName(); String serviceName = randomDomainName();
naming.registerInstance(serviceName, TEST_IP_4_DOM_1, TEST_PORT); naming.registerInstance(serviceName, TEST_IP_4_DOM_1, TEST_PORT);
System.out.println(serviceName);
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
List<Instance> instances = naming.getAllInstances(serviceName); List<Instance> instances = naming.getAllInstances(serviceName);
@ -80,10 +86,11 @@ public class RegisterInstance_ITCase {
/** /**
* 注册一个自定义cluster的Instance并验证 * 注册一个自定义cluster的Instance并验证
*
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void regDomClusterTest() throws Exception{ public void regDomClusterTest() throws Exception {
String serviceName = randomDomainName(); String serviceName = randomDomainName();
naming.registerInstance(serviceName, TEST_IP_4_DOM_1, TEST_PORT, TEST_NEW_CLUSTER_4_DOM_1); naming.registerInstance(serviceName, TEST_IP_4_DOM_1, TEST_PORT, TEST_NEW_CLUSTER_4_DOM_1);
@ -111,6 +118,7 @@ public class RegisterInstance_ITCase {
/** /**
* 注册一个自定义的Instance并验证 * 注册一个自定义的Instance并验证
*
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -132,6 +140,7 @@ public class RegisterInstance_ITCase {
/** /**
* 注册一个不健康的Instance并验证 * 注册一个不健康的Instance并验证
*
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -145,9 +154,34 @@ public class RegisterInstance_ITCase {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
List<Instance> instances = naming.selectInstances(serviceName,false); List<Instance> instances = naming.selectInstances(serviceName, false);
Assert.assertEquals(instances.size(), 1); Assert.assertEquals(instances.size(), 1);
Assert.assertEquals(instances.get(0).isHealthy(), false); Assert.assertEquals(instances.get(0).isHealthy(), false);
} }
@Test
public void regServiceWithMetadata() throws Exception {
String serviceName = randomDomainName();
System.out.println(serviceName);
Instance instance = new Instance();
instance.setIp("1.1.1.2");
instance.setPort(9999);
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("version", "1.0");
metadata.put("env", "prod");
instance.setMetadata(metadata);
naming.registerInstance(serviceName, instance);
TimeUnit.SECONDS.sleep(3);
List<Instance> instances = naming.getAllInstances(serviceName);
Assert.assertEquals(1, instances.size());
Assert.assertEquals("1.0", instances.get(0).getMetadata().get("version"));
Assert.assertEquals("prod", instances.get(0).getMetadata().get("env"));
}
} }