From 315a6c36c50f77b0d05ca11da5cad7234d99d9bb Mon Sep 17 00:00:00 2001 From: horizonzy <1060026287@qq.com> Date: Thu, 11 Jun 2020 21:44:33 +0800 Subject: [PATCH] 1.update beatinfo when received instance modify info 2.add the unit test of this case --- .../client/naming/NacosNamingService.java | 19 ++----- .../nacos/client/naming/beat/BeatReactor.java | 15 ++++- .../nacos/client/naming/core/HostReactor.java | 26 +++++++-- .../client/naming/core/HostReactorTest.java | 57 +++++++++++++++++-- 4 files changed, 89 insertions(+), 28 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index eefa75bce..0077776d4 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -95,8 +95,8 @@ public class NacosNamingService implements NamingService { this.eventDispatcher = new EventDispatcher(); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); - this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.cacheDir, isLoadCacheAtStart(properties), - initPollingThreadCount(properties)); + this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, + isLoadCacheAtStart(properties), initPollingThreadCount(properties)); } private int initClientBeatThreadCount(Properties properties) { @@ -189,21 +189,10 @@ public class NacosNamingService implements NamingService { @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { - if (instance.isEphemeral()) { - BeatInfo beatInfo = new BeatInfo(); - beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); - beatInfo.setIp(instance.getIp()); - beatInfo.setPort(instance.getPort()); - beatInfo.setCluster(instance.getClusterName()); - beatInfo.setWeight(instance.getWeight()); - beatInfo.setMetadata(instance.getMetadata()); - beatInfo.setScheduled(false); - beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); - + BeatInfo beatInfo = beatReactor.buildBeatInfo(instance); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } - serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); } @@ -489,7 +478,7 @@ public class NacosNamingService implements NamingService { } @Override - public void shutDown() throws NacosException{ + public void shutDown() throws NacosException { beatReactor.shutdown(); eventDispatcher.shutdown(); hostReactor.shutdown(); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java index 47dcdaa97..c339afb3f 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java @@ -87,7 +87,20 @@ public class BeatReactor implements Closeable { MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } - private String buildKey(String serviceName, String ip, int port) { + public BeatInfo buildBeatInfo(Instance instance) { + BeatInfo beatInfo = new BeatInfo(); + beatInfo.setServiceName(instance.getServiceName()); + beatInfo.setIp(instance.getIp()); + beatInfo.setPort(instance.getPort()); + beatInfo.setCluster(instance.getClusterName()); + beatInfo.setWeight(instance.getWeight()); + beatInfo.setMetadata(instance.getMetadata()); + beatInfo.setScheduled(false); + beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); + return beatInfo; + } + + public String buildKey(String serviceName, String ip, int port) { return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port; } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java index b6d1ccef4..036517a6a 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java @@ -20,6 +20,8 @@ import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.client.monitor.MetricsMonitor; import com.alibaba.nacos.client.naming.backups.FailoverReactor; +import com.alibaba.nacos.client.naming.beat.BeatInfo; +import com.alibaba.nacos.client.naming.beat.BeatReactor; import com.alibaba.nacos.client.naming.cache.DiskCache; import com.alibaba.nacos.client.naming.net.NamingProxy; import com.alibaba.nacos.client.naming.utils.UtilAndComs; @@ -64,6 +66,8 @@ public class HostReactor implements Closeable { private EventDispatcher eventDispatcher; + private BeatReactor beatReactor; + private NamingProxy serverProxy; private FailoverReactor failoverReactor; @@ -72,11 +76,11 @@ public class HostReactor implements Closeable { private ScheduledExecutorService executor; - public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) { - this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); + public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) { + this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } - public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir, + public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { // init executorService this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @@ -88,8 +92,8 @@ public class HostReactor implements Closeable { return thread; } }); - this.eventDispatcher = eventDispatcher; + this.beatReactor = beatReactor; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { @@ -187,6 +191,7 @@ public class HostReactor implements Closeable { if (modHosts.size() > 0) { changed = true; + updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } @@ -218,6 +223,16 @@ public class HostReactor implements Closeable { return serviceInfo; } + private void updateBeatInfo(Set modHosts) { + for (Instance instance : modHosts) { + String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort()); + if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) { + BeatInfo beatInfo = beatReactor.buildBeatInfo(instance); + beatReactor.addBeatInfo(instance.getServiceName(), beatInfo); + } + } + } + private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); @@ -272,7 +287,6 @@ public class HostReactor implements Closeable { } - public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; @@ -317,7 +331,7 @@ public class HostReactor implements Closeable { } @Override - public void shutdown() throws NacosException{ + public void shutdown() throws NacosException { String className = this.getClass().getName(); NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/core/HostReactorTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/core/HostReactorTest.java index 2462a32b3..2df457e8e 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/core/HostReactorTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/core/HostReactorTest.java @@ -16,20 +16,23 @@ package com.alibaba.nacos.client.naming.core; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.client.naming.beat.BeatInfo; +import com.alibaba.nacos.client.naming.beat.BeatReactor; +import com.alibaba.nacos.client.naming.net.NamingProxy; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.HashMap; + import static org.junit.Assert.*; import static org.mockito.Mockito.when; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.pojo.Instance; -import com.alibaba.nacos.api.naming.pojo.ServiceInfo; -import com.alibaba.nacos.client.naming.net.NamingProxy; - @RunWith(MockitoJUnitRunner.class) public class HostReactorTest { @@ -43,15 +46,31 @@ public class HostReactorTest { private HostReactor hostReactor; + private BeatReactor beatReactor; + @Before public void setUp() throws Exception { - hostReactor = new HostReactor(eventDispatcher, namingProxy, CACHE_DIR); + beatReactor = new BeatReactor(namingProxy); + BeatInfo beatInfo = new BeatInfo(); + beatInfo.setServiceName("testName"); + beatInfo.setIp("1.1.1.1"); + beatInfo.setPort(1234); + beatInfo.setCluster("clusterName"); + beatInfo.setWeight(1); + beatInfo.setMetadata(new HashMap()); + beatInfo.setScheduled(false); + beatInfo.setPeriod(1000L); + beatReactor.addBeatInfo("testName", beatInfo); + hostReactor = new HostReactor(eventDispatcher, namingProxy, beatReactor, CACHE_DIR); } @Test public void testProcessServiceJSON() { ServiceInfo actual = hostReactor.processServiceJSON(EXAMPLE); assertServiceInfo(actual); + hostReactor.processServiceJSON(CHANGE_DATA_EXAMPLE); + BeatInfo actualBeatInfo = beatReactor.dom2Beat.get(beatReactor.buildKey("testName", "1.1.1.1", 1234)); + assertEquals(2.0, actualBeatInfo.getWeight(), 0.0); } @Test @@ -105,4 +124,30 @@ public class HostReactorTest { + "\t\"allIPs\": false,\n" + "\t\"valid\": true\n" + "}"; + + //the weight changed from 1.0 to 2.0 + private static final String CHANGE_DATA_EXAMPLE = "{\n" + + "\t\"name\": \"testName\",\n" + + "\t\"clusters\": \"testClusters\",\n" + + "\t\"cacheMillis\": 1000,\n" + + "\t\"hosts\": [{\n" + + "\t\t\"ip\": \"1.1.1.1\",\n" + + "\t\t\"port\": 1234,\n" + + "\t\t\"weight\": 2.0,\n" + + "\t\t\"healthy\": true,\n" + + "\t\t\"enabled\": true,\n" + + "\t\t\"ephemeral\": true,\n" + + "\t\t\"clusterName\": \"testClusters\",\n" + + "\t\t\"serviceName\": \"testName\",\n" + + "\t\t\"metadata\": {},\n" + + "\t\t\"instanceHeartBeatInterval\": 5000,\n" + + "\t\t\"instanceHeartBeatTimeOut\": 15000,\n" + + "\t\t\"ipDeleteTimeout\": 30000,\n" + + "\t\t\"instanceIdGenerator\": \"simple\"\n" + + "\t}],\n" + + "\t\"lastRefTime\": 0,\n" + + "\t\"checksum\": \"\",\n" + + "\t\"allIPs\": false,\n" + + "\t\"valid\": true\n" + + "}"; }