Merge pull request #1353 from IanCao/master

TTL and client beat interval should be customized at instance level.
This commit is contained in:
Fury Zhu 2019-06-24 10:16:12 +08:00 committed by GitHub
commit 042b7a2268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 282 additions and 33 deletions

View File

@ -15,6 +15,8 @@
*/
package com.alibaba.nacos.api.common;
import java.util.concurrent.TimeUnit;
/**
* Constant
*
@ -143,6 +145,12 @@ public class Constants {
public static final String DEFAULT_CLUSTER_NAME = "DEFAULT";
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
public static final String DEFAULT_NAMESPACE_ID = "public";
public static final boolean DEFAULT_USE_CLOUD_NAMESPACE_PARSING = true;
@ -152,4 +160,6 @@ public class Constants {
public static final String SERVICE_INFO_SPLITER = "@@";
public static final String NULL_STRING = "null";
public static final String NUMBER_PATTERN = "^\\d+$";
}

View File

@ -27,4 +27,10 @@ public class PreservedMetadataKeys {
* The key to indicate the registry source of service instance, such as Dubbo, SpringCloud, etc.
*/
public static final String REGISTER_SOURCE = "preserved.register.source";
public static final String HEART_BEAT_TIMEOUT = "preserved.heart.beat.timeout";
public static final String IP_DELETE_TIMEOUT = "preserved.ip.delete.timeout";
public static final String HEART_BEAT_INTERVAL = "preserved.heart.beat.interval";
}

View File

@ -16,10 +16,15 @@
package com.alibaba.nacos.api.naming.pojo;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import static com.alibaba.nacos.api.common.Constants.NUMBER_PATTERN;
/**
* Instance
*
@ -192,4 +197,27 @@ public class Instance {
return str1 == null ? str2 == null : str1.equals(str2);
}
public long getInstanceHeartBeatInterval() {
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
public long getInstanceHeartBeatTimeOut() {
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, Constants.DEFAULT_HEART_BEAT_TIMEOUT);
}
public long getIpDeleteTimeout() {
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT, Constants.DEFAULT_IP_DELETE_TIMEOUT);
}
private long getMetaDataByKeyWithDefault( String key, long defaultValue) {
if (getMetadata() == null || getMetadata().isEmpty()) {
return defaultValue;
}
String value = getMetadata().get(key);
if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {
return Long.valueOf(value);
}
return defaultValue;
}
}

View File

@ -111,6 +111,11 @@
<artifactId>simpleclient</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -19,6 +19,7 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
@ -39,6 +40,7 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author nkorange
@ -46,6 +48,7 @@ import java.util.*;
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {
private static final String DEFAULT_PORT = "8080";
private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
/**
* Each Naming instance should have different namespace.
@ -148,7 +151,7 @@ public class NacosNamingService implements NamingService {
cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
}
}
@Override
public void registerInstance(String serviceName, String ip, int port) throws NacosException {
registerInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME);
@ -193,6 +196,8 @@ public class NacosNamingService implements NamingService {
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
@ -200,6 +205,7 @@ public class NacosNamingService implements NamingService {
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
@Override
public void deregisterInstance(String serviceName, String ip, int port) throws NacosException {
deregisterInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME);

View File

@ -31,6 +31,8 @@ public class BeatInfo {
private String cluster;
private Map<String, String> metadata;
private volatile boolean scheduled;
private volatile long period;
private volatile boolean stopped;
@Override
public String toString() {
@ -92,4 +94,20 @@ public class BeatInfo {
public void setScheduled(boolean scheduled) {
this.scheduled = scheduled;
}
public long getPeriod() {
return period;
}
public void setPeriod(long period) {
this.period = period;
}
public boolean isStopped() {
return stopped;
}
public void setStopped(boolean stopped) {
this.stopped = stopped;
}
}

View File

@ -54,48 +54,27 @@ public class BeatReactor {
return thread;
}
});
executorService.schedule(new BeatProcessor(), 0, TimeUnit.MILLISECONDS);
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public void removeBeatInfo(String serviceName, String ip, int port) {
NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);
dom2Beat.remove(buildKey(serviceName, ip, port));
BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
beatInfo.setStopped(true);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public String buildKey(String serviceName, String ip, int port) {
private String buildKey(String serviceName, String ip, int port) {
return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER
+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
}
class BeatProcessor implements Runnable {
@Override
public void run() {
try {
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
@ -107,10 +86,11 @@ public class BeatReactor {
@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
if (beatInfo.isStopped()) {
return;
}
long nextTime = result > 0 ? result : beatInfo.getPeriod();
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -0,0 +1,57 @@
package com.alibaba.nacos.client;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* @author caoyixiong
*/
@RunWith(MockitoJUnitRunner.class)
public class BeatReactorTest {
@Mock
private NamingProxy namingProxy;
@Test
public void test() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
BeatReactor beatReactor = new BeatReactor(namingProxy);
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName("test");
beatInfo.setIp("11.11.11.11");
beatInfo.setPort(1234);
beatInfo.setCluster("clusterName");
beatInfo.setWeight(1);
beatInfo.setMetadata(new HashMap<String, String>());
beatInfo.setScheduled(false);
beatInfo.setPeriod(1000L);
Mockito.doReturn(0L).when(namingProxy).sendBeat(beatInfo);
beatReactor.addBeatInfo("testService", beatInfo);
Assert.assertEquals(1, getActiveThread(beatReactor));
Thread.sleep(1100L);
beatReactor.removeBeatInfo("testService", beatInfo.getIp(), beatInfo.getPort());
Thread.sleep(3100L);
Assert.assertEquals(0, getActiveThread(beatReactor));
}
private int getActiveThread(BeatReactor beatReactor) throws NoSuchFieldException, IllegalAccessException {
Field field = BeatReactor.class.getDeclaredField("executorService");
field.setAccessible(true);
ScheduledThreadPoolExecutor scheduledExecutorService = (ScheduledThreadPoolExecutor) field.get(beatReactor);
return scheduledExecutorService.getQueue().size();
}
}

View File

@ -262,7 +262,7 @@ public class InstanceController {
}
service.processClientBeat(clientBeat);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
return result;
}

View File

@ -17,6 +17,8 @@ package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.boot.SpringContext;
import com.alibaba.nacos.naming.core.DistroMapper;
@ -26,9 +28,14 @@ import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.push.PushService;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response;
import org.springframework.util.StringUtils;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.NUMBER_PATTERN;
/**
* Check and update statues of ephemeral instances, remove them if they have been expired.
@ -73,13 +80,13 @@ public class ClientBeatCheckTask implements Runnable {
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
UtilsAndCommons.LOCALHOST_SITE, ClientBeatProcessor.CLIENT_BEAT_TIMEOUT, instance.getLastBeat());
UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service.getNamespaceId(), service.getName());
}
}
@ -92,7 +99,7 @@ public class ClientBeatCheckTask implements Runnable {
// then remove obsolete instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
deleteIP(instance);
@ -105,6 +112,7 @@ public class ClientBeatCheckTask implements Runnable {
}
private void deleteIP(Instance instance) {
try {

View File

@ -118,6 +118,8 @@ public class UtilsAndCommons {
public static final String DATA_BASE_DIR = NACOS_HOME + File.separator + "data" + File.separator + "naming";
public static final String NUMBER_PATTERN = "^\\d+$";
public static final ScheduledExecutorService SERVICE_SYNCHRONIZATION_EXECUTOR;
public static final ScheduledExecutorService SERVICE_UPDATE_EXECUTOR;

View File

@ -0,0 +1,129 @@
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.core.utils.SystemUtils;
import com.alibaba.nacos.naming.core.*;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.push.PushService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author caoyixiong
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ClientBeatCheckTaskTest {
@InjectMocks
@Spy
private ClientBeatCheckTask clientBeatCheckTask;
@Mock
private DistroMapper distroMapperSpy;
@Mock
private Service serviceSpy;
@Mock
private GlobalConfig globalConfig;
@Mock
private PushService pushService;
@Before
public void init() {
ReflectionTestUtils.setField(clientBeatCheckTask, "service", serviceSpy);
Mockito.doReturn(distroMapperSpy).when(clientBeatCheckTask).getDistroMapper();
Mockito.doReturn(globalConfig).when(clientBeatCheckTask).getGlobalConfig();
Mockito.doReturn(pushService).when(clientBeatCheckTask).getPushService();
}
@Test
public void testHeartBeatNotTimeout() {
List<Instance> instances = new ArrayList<>();
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(false);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "1000000000");
instance.setMetadata(metadata);
instances.add(instance);
Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances);
Mockito.doReturn("test").when(serviceSpy).getName();
Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString());
clientBeatCheckTask.run();
Assert.assertTrue(instance.isHealthy());
}
@Test
public void testHeartBeatTimeout() {
List<Instance> instances = new ArrayList<>();
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis() - 1000);
instance.setMarked(false);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "10");
instance.setMetadata(metadata);
instances.add(instance);
Mockito.doReturn("test").when(serviceSpy).getName();
Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString());
Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances);
clientBeatCheckTask.run();
Assert.assertFalse(instance.isHealthy());
}
@Test
public void testIpDeleteTimeOut() {
List<Instance> instances = new ArrayList<>();
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(true);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10");
instance.setMetadata(metadata);
instances.add(instance);
Mockito.doReturn(true).when(distroMapperSpy).responsible(null);
Mockito.doReturn(true).when(globalConfig).isExpireInstance();
Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances);
clientBeatCheckTask.run();
}
@Test
public void testIpDeleteNotTimeOut() {
List<Instance> instances = new ArrayList<>();
Instance instance = new Instance();
instance.setLastBeat(System.currentTimeMillis());
instance.setMarked(true);
instance.setHealthy(true);
Map<String, String> metadata = new HashMap<>();
metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10000");
instance.setMetadata(metadata);
instances.add(instance);
Mockito.doReturn(true).when(distroMapperSpy).responsible(null);
Mockito.doReturn(true).when(globalConfig).isExpireInstance();
Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances);
clientBeatCheckTask.run();
}
}