diff --git a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java index 901f6f869..e5d61a3fd 100644 --- a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java +++ b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java @@ -15,6 +15,8 @@ */ package com.alibaba.nacos.api.common; +import java.util.concurrent.TimeUnit; + /** * Constant * @@ -143,6 +145,12 @@ public class Constants { public static final String DEFAULT_CLUSTER_NAME = "DEFAULT"; + public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); + + public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30); + + public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); + public static final String DEFAULT_NAMESPACE_ID = "public"; public static final boolean DEFAULT_USE_CLOUD_NAMESPACE_PARSING = true; @@ -152,4 +160,6 @@ public class Constants { public static final String SERVICE_INFO_SPLITER = "@@"; public static final String NULL_STRING = "null"; + + public static final String NUMBER_PATTERN = "^\\d+$"; } diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/PreservedMetadataKeys.java b/api/src/main/java/com/alibaba/nacos/api/naming/PreservedMetadataKeys.java index 21dc8ccd2..d2a90adc5 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/PreservedMetadataKeys.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/PreservedMetadataKeys.java @@ -27,4 +27,10 @@ public class PreservedMetadataKeys { * The key to indicate the registry source of service instance, such as Dubbo, SpringCloud, etc. */ public static final String REGISTER_SOURCE = "preserved.register.source"; + + public static final String HEART_BEAT_TIMEOUT = "preserved.heart.beat.timeout"; + + public static final String IP_DELETE_TIMEOUT = "preserved.ip.delete.timeout"; + + public static final String HEART_BEAT_INTERVAL = "preserved.heart.beat.interval"; } diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java b/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java index ea097cb90..2a31ecc44 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/pojo/Instance.java @@ -16,10 +16,15 @@ package com.alibaba.nacos.api.naming.pojo; import com.alibaba.fastjson.JSON; +import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import org.apache.commons.lang3.StringUtils; import java.util.HashMap; import java.util.Map; +import static com.alibaba.nacos.api.common.Constants.NUMBER_PATTERN; + /** * Instance * @@ -192,4 +197,27 @@ public class Instance { return str1 == null ? str2 == null : str1.equals(str2); } + public long getInstanceHeartBeatInterval() { + return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL); + } + + public long getInstanceHeartBeatTimeOut() { + return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, Constants.DEFAULT_HEART_BEAT_TIMEOUT); + } + + public long getIpDeleteTimeout() { + return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT, Constants.DEFAULT_IP_DELETE_TIMEOUT); + } + + private long getMetaDataByKeyWithDefault( String key, long defaultValue) { + if (getMetadata() == null || getMetadata().isEmpty()) { + return defaultValue; + } + String value = getMetadata().get(key); + if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) { + return Long.valueOf(value); + } + return defaultValue; + } + } diff --git a/client/pom.xml b/client/pom.xml index 2e3fcac5d..dec0c14df 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -111,6 +111,11 @@ simpleclient 0.5.0 + + org.mockito + mockito-core + test + diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index d92492d3a..d37971441 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -19,6 +19,7 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; @@ -39,6 +40,7 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.math.NumberUtils; import java.util.*; +import java.util.concurrent.TimeUnit; /** * @author nkorange @@ -46,6 +48,7 @@ import java.util.*; @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") public class NacosNamingService implements NamingService { private static final String DEFAULT_PORT = "8080"; + private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); /** * Each Naming instance should have different namespace. @@ -148,7 +151,7 @@ public class NacosNamingService implements NamingService { cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace; } } - + @Override public void registerInstance(String serviceName, String ip, int port) throws NacosException { registerInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME); @@ -193,6 +196,8 @@ public class NacosNamingService implements NamingService { beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); + long instanceInterval = instance.getInstanceHeartBeatInterval(); + beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } @@ -200,6 +205,7 @@ public class NacosNamingService implements NamingService { serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); } + @Override public void deregisterInstance(String serviceName, String ip, int port) throws NacosException { deregisterInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java index 98d6d40b2..d405838f7 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java @@ -31,6 +31,8 @@ public class BeatInfo { private String cluster; private Map metadata; private volatile boolean scheduled; + private volatile long period; + private volatile boolean stopped; @Override public String toString() { @@ -92,4 +94,20 @@ public class BeatInfo { public void setScheduled(boolean scheduled) { this.scheduled = scheduled; } + + public long getPeriod() { + return period; + } + + public void setPeriod(long period) { + this.period = period; + } + + public boolean isStopped() { + return stopped; + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java index b0792ffdf..700555538 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java @@ -54,48 +54,27 @@ public class BeatReactor { return thread; } }); - - executorService.schedule(new BeatProcessor(), 0, TimeUnit.MILLISECONDS); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); + executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } public void removeBeatInfo(String serviceName, String ip, int port) { NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port); - dom2Beat.remove(buildKey(serviceName, ip, port)); + BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port)); + beatInfo.setStopped(true); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } - public String buildKey(String serviceName, String ip, int port) { + private String buildKey(String serviceName, String ip, int port) { return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port; } - class BeatProcessor implements Runnable { - - @Override - public void run() { - try { - for (Map.Entry entry : dom2Beat.entrySet()) { - BeatInfo beatInfo = entry.getValue(); - if (beatInfo.isScheduled()) { - continue; - } - beatInfo.setScheduled(true); - executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); - } - } catch (Exception e) { - NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e); - } finally { - executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS); - } - } - } - class BeatTask implements Runnable { BeatInfo beatInfo; @@ -107,10 +86,11 @@ public class BeatReactor { @Override public void run() { long result = serverProxy.sendBeat(beatInfo); - beatInfo.setScheduled(false); - if (result > 0) { - clientBeatInterval = result; + if (beatInfo.isStopped()) { + return; } + long nextTime = result > 0 ? result : beatInfo.getPeriod(); + executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } } diff --git a/client/src/test/java/com/alibaba/nacos/client/BeatReactorTest.java b/client/src/test/java/com/alibaba/nacos/client/BeatReactorTest.java new file mode 100644 index 000000000..b9043d529 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/BeatReactorTest.java @@ -0,0 +1,57 @@ +package com.alibaba.nacos.client; + +import com.alibaba.nacos.client.naming.beat.BeatInfo; +import com.alibaba.nacos.client.naming.beat.BeatReactor; +import com.alibaba.nacos.client.naming.net.NamingProxy; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * @author caoyixiong + */ +@RunWith(MockitoJUnitRunner.class) +public class BeatReactorTest { + + @Mock + private NamingProxy namingProxy; + + @Test + public void test() throws NoSuchFieldException, IllegalAccessException, InterruptedException { + BeatReactor beatReactor = new BeatReactor(namingProxy); + + BeatInfo beatInfo = new BeatInfo(); + beatInfo.setServiceName("test"); + beatInfo.setIp("11.11.11.11"); + beatInfo.setPort(1234); + beatInfo.setCluster("clusterName"); + beatInfo.setWeight(1); + beatInfo.setMetadata(new HashMap()); + beatInfo.setScheduled(false); + beatInfo.setPeriod(1000L); + + Mockito.doReturn(0L).when(namingProxy).sendBeat(beatInfo); + beatReactor.addBeatInfo("testService", beatInfo); + + Assert.assertEquals(1, getActiveThread(beatReactor)); + Thread.sleep(1100L); + beatReactor.removeBeatInfo("testService", beatInfo.getIp(), beatInfo.getPort()); + Thread.sleep(3100L); + Assert.assertEquals(0, getActiveThread(beatReactor)); + } + + private int getActiveThread(BeatReactor beatReactor) throws NoSuchFieldException, IllegalAccessException { + Field field = BeatReactor.class.getDeclaredField("executorService"); + field.setAccessible(true); + ScheduledThreadPoolExecutor scheduledExecutorService = (ScheduledThreadPoolExecutor) field.get(beatReactor); + return scheduledExecutorService.getQueue().size(); + } + +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java index af4b38e2f..67623d468 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java @@ -262,7 +262,7 @@ public class InstanceController { } service.processClientBeat(clientBeat); - + result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval()); return result; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index 50fa3a628..9b6e609ac 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -17,6 +17,8 @@ package com.alibaba.nacos.naming.healthcheck; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.SpringContext; import com.alibaba.nacos.naming.core.DistroMapper; @@ -26,9 +28,14 @@ import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.push.PushService; import com.ning.http.client.AsyncCompletionHandler; import com.ning.http.client.Response; +import org.springframework.util.StringUtils; import java.net.HttpURLConnection; import java.util.List; +import java.util.Map; + +import static com.alibaba.nacos.naming.misc.UtilsAndCommons.NUMBER_PATTERN; + /** * Check and update statues of ephemeral instances, remove them if they have been expired. @@ -73,13 +80,13 @@ public class ClientBeatCheckTask implements Runnable { // first set health status of instances: for (Instance instance : instances) { - if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) { + if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), - UtilsAndCommons.LOCALHOST_SITE, ClientBeatProcessor.CLIENT_BEAT_TIMEOUT, instance.getLastBeat()); + UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service.getNamespaceId(), service.getName()); } } @@ -92,7 +99,7 @@ public class ClientBeatCheckTask implements Runnable { // then remove obsolete instances: for (Instance instance : instances) { - if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) { + if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance)); deleteIP(instance); @@ -105,6 +112,7 @@ public class ClientBeatCheckTask implements Runnable { } + private void deleteIP(Instance instance) { try { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java index 7e115a62f..c77b672a5 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java @@ -118,6 +118,8 @@ public class UtilsAndCommons { public static final String DATA_BASE_DIR = NACOS_HOME + File.separator + "data" + File.separator + "naming"; + public static final String NUMBER_PATTERN = "^\\d+$"; + public static final ScheduledExecutorService SERVICE_SYNCHRONIZATION_EXECUTOR; public static final ScheduledExecutorService SERVICE_UPDATE_EXECUTOR; diff --git a/naming/src/test/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskTest.java b/naming/src/test/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskTest.java new file mode 100644 index 000000000..5012360de --- /dev/null +++ b/naming/src/test/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskTest.java @@ -0,0 +1,129 @@ +package com.alibaba.nacos.naming.healthcheck; + +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import com.alibaba.nacos.core.utils.SystemUtils; +import com.alibaba.nacos.naming.core.*; +import com.alibaba.nacos.naming.misc.GlobalConfig; +import com.alibaba.nacos.naming.push.PushService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.util.ReflectionTestUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author caoyixiong + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class ClientBeatCheckTaskTest { + + @InjectMocks + @Spy + private ClientBeatCheckTask clientBeatCheckTask; + @Mock + private DistroMapper distroMapperSpy; + @Mock + private Service serviceSpy; + @Mock + private GlobalConfig globalConfig; + @Mock + private PushService pushService; + + + @Before + public void init() { + ReflectionTestUtils.setField(clientBeatCheckTask, "service", serviceSpy); + Mockito.doReturn(distroMapperSpy).when(clientBeatCheckTask).getDistroMapper(); + Mockito.doReturn(globalConfig).when(clientBeatCheckTask).getGlobalConfig(); + Mockito.doReturn(pushService).when(clientBeatCheckTask).getPushService(); + } + + @Test + public void testHeartBeatNotTimeout() { + List instances = new ArrayList<>(); + Instance instance = new Instance(); + instance.setLastBeat(System.currentTimeMillis()); + instance.setMarked(false); + instance.setHealthy(true); + Map metadata = new HashMap<>(); + metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "1000000000"); + instance.setMetadata(metadata); + instances.add(instance); + Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances); + + Mockito.doReturn("test").when(serviceSpy).getName(); + Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString()); + clientBeatCheckTask.run(); + Assert.assertTrue(instance.isHealthy()); + } + + @Test + public void testHeartBeatTimeout() { + List instances = new ArrayList<>(); + Instance instance = new Instance(); + instance.setLastBeat(System.currentTimeMillis() - 1000); + instance.setMarked(false); + instance.setHealthy(true); + Map metadata = new HashMap<>(); + metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, "10"); + instance.setMetadata(metadata); + instances.add(instance); + Mockito.doReturn("test").when(serviceSpy).getName(); + Mockito.doReturn(true).when(distroMapperSpy).responsible(Mockito.anyString()); + + Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances); + + clientBeatCheckTask.run(); + Assert.assertFalse(instance.isHealthy()); + } + + @Test + public void testIpDeleteTimeOut() { + List instances = new ArrayList<>(); + Instance instance = new Instance(); + instance.setLastBeat(System.currentTimeMillis()); + instance.setMarked(true); + instance.setHealthy(true); + Map metadata = new HashMap<>(); + metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10"); + instance.setMetadata(metadata); + instances.add(instance); + Mockito.doReturn(true).when(distroMapperSpy).responsible(null); + Mockito.doReturn(true).when(globalConfig).isExpireInstance(); + Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances); + + clientBeatCheckTask.run(); + } + + @Test + public void testIpDeleteNotTimeOut() { + List instances = new ArrayList<>(); + Instance instance = new Instance(); + instance.setLastBeat(System.currentTimeMillis()); + instance.setMarked(true); + instance.setHealthy(true); + Map metadata = new HashMap<>(); + metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, "10000"); + instance.setMetadata(metadata); + instances.add(instance); + + Mockito.doReturn(true).when(distroMapperSpy).responsible(null); + Mockito.doReturn(true).when(globalConfig).isExpireInstance(); + Mockito.when(serviceSpy.allIPs(true)).thenReturn(instances); + + clientBeatCheckTask.run(); + } +}