1.update beatinfo when received instance modify info

2.add the unit test of this case
This commit is contained in:
horizonzy 2020-06-11 21:44:33 +08:00
parent 65c6c8cb27
commit 315a6c36c5
4 changed files with 89 additions and 28 deletions

View File

@ -95,8 +95,8 @@ public class NacosNamingService implements NamingService {
this.eventDispatcher = new EventDispatcher(); this.eventDispatcher = new EventDispatcher();
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.cacheDir, isLoadCacheAtStart(properties), this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
initPollingThreadCount(properties)); isLoadCacheAtStart(properties), initPollingThreadCount(properties));
} }
private int initClientBeatThreadCount(Properties properties) { private int initClientBeatThreadCount(Properties properties) {
@ -189,21 +189,10 @@ public class NacosNamingService implements NamingService {
@Override @Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) { if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo(); BeatInfo beatInfo = beatReactor.buildBeatInfo(instance);
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
} }
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
} }
@ -489,7 +478,7 @@ public class NacosNamingService implements NamingService {
} }
@Override @Override
public void shutDown() throws NacosException{ public void shutDown() throws NacosException {
beatReactor.shutdown(); beatReactor.shutdown();
eventDispatcher.shutdown(); eventDispatcher.shutdown();
hostReactor.shutdown(); hostReactor.shutdown();

View File

@ -87,7 +87,20 @@ public class BeatReactor implements Closeable {
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
} }
private String buildKey(String serviceName, String ip, int port) { public BeatInfo buildBeatInfo(Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(instance.getServiceName());
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
public String buildKey(String serviceName, String ip, int port) {
return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER
+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port; + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
} }

View File

@ -20,6 +20,8 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.monitor.MetricsMonitor; import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor; import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache; import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.net.NamingProxy; import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.UtilAndComs; import com.alibaba.nacos.client.naming.utils.UtilAndComs;
@ -64,6 +66,8 @@ public class HostReactor implements Closeable {
private EventDispatcher eventDispatcher; private EventDispatcher eventDispatcher;
private BeatReactor beatReactor;
private NamingProxy serverProxy; private NamingProxy serverProxy;
private FailoverReactor failoverReactor; private FailoverReactor failoverReactor;
@ -72,11 +76,11 @@ public class HostReactor implements Closeable {
private ScheduledExecutorService executor; private ScheduledExecutorService executor;
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) { public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
} }
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir, public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) { boolean loadCacheAtStart, int pollingThreadCount) {
// init executorService // init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@ -88,8 +92,8 @@ public class HostReactor implements Closeable {
return thread; return thread;
} }
}); });
this.eventDispatcher = eventDispatcher; this.eventDispatcher = eventDispatcher;
this.beatReactor = beatReactor;
this.serverProxy = serverProxy; this.serverProxy = serverProxy;
this.cacheDir = cacheDir; this.cacheDir = cacheDir;
if (loadCacheAtStart) { if (loadCacheAtStart) {
@ -187,6 +191,7 @@ public class HostReactor implements Closeable {
if (modHosts.size() > 0) { if (modHosts.size() > 0) {
changed = true; changed = true;
updateBeatInfo(modHosts);
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
+ serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts));
} }
@ -218,6 +223,16 @@ public class HostReactor implements Closeable {
return serviceInfo; return serviceInfo;
} }
private void updateBeatInfo(Set<Instance> modHosts) {
for (Instance instance : modHosts) {
String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(instance);
beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
}
}
}
private ServiceInfo getServiceInfo0(String serviceName, String clusters) { private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters); String key = ServiceInfo.getKey(serviceName, clusters);
@ -272,7 +287,6 @@ public class HostReactor implements Closeable {
} }
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return; return;
@ -317,7 +331,7 @@ public class HostReactor implements Closeable {
} }
@Override @Override
public void shutdown() throws NacosException{ public void shutdown() throws NacosException {
String className = this.getClass().getName(); String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className); NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);

View File

@ -16,20 +16,23 @@
package com.alibaba.nacos.client.naming.core; package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.net.NamingProxy;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class HostReactorTest { public class HostReactorTest {
@ -43,15 +46,31 @@ public class HostReactorTest {
private HostReactor hostReactor; private HostReactor hostReactor;
private BeatReactor beatReactor;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
hostReactor = new HostReactor(eventDispatcher, namingProxy, CACHE_DIR); beatReactor = new BeatReactor(namingProxy);
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName("testName");
beatInfo.setIp("1.1.1.1");
beatInfo.setPort(1234);
beatInfo.setCluster("clusterName");
beatInfo.setWeight(1);
beatInfo.setMetadata(new HashMap<String, String>());
beatInfo.setScheduled(false);
beatInfo.setPeriod(1000L);
beatReactor.addBeatInfo("testName", beatInfo);
hostReactor = new HostReactor(eventDispatcher, namingProxy, beatReactor, CACHE_DIR);
} }
@Test @Test
public void testProcessServiceJSON() { public void testProcessServiceJSON() {
ServiceInfo actual = hostReactor.processServiceJSON(EXAMPLE); ServiceInfo actual = hostReactor.processServiceJSON(EXAMPLE);
assertServiceInfo(actual); assertServiceInfo(actual);
hostReactor.processServiceJSON(CHANGE_DATA_EXAMPLE);
BeatInfo actualBeatInfo = beatReactor.dom2Beat.get(beatReactor.buildKey("testName", "1.1.1.1", 1234));
assertEquals(2.0, actualBeatInfo.getWeight(), 0.0);
} }
@Test @Test
@ -105,4 +124,30 @@ public class HostReactorTest {
+ "\t\"allIPs\": false,\n" + "\t\"allIPs\": false,\n"
+ "\t\"valid\": true\n" + "\t\"valid\": true\n"
+ "}"; + "}";
//the weight changed from 1.0 to 2.0
private static final String CHANGE_DATA_EXAMPLE = "{\n"
+ "\t\"name\": \"testName\",\n"
+ "\t\"clusters\": \"testClusters\",\n"
+ "\t\"cacheMillis\": 1000,\n"
+ "\t\"hosts\": [{\n"
+ "\t\t\"ip\": \"1.1.1.1\",\n"
+ "\t\t\"port\": 1234,\n"
+ "\t\t\"weight\": 2.0,\n"
+ "\t\t\"healthy\": true,\n"
+ "\t\t\"enabled\": true,\n"
+ "\t\t\"ephemeral\": true,\n"
+ "\t\t\"clusterName\": \"testClusters\",\n"
+ "\t\t\"serviceName\": \"testName\",\n"
+ "\t\t\"metadata\": {},\n"
+ "\t\t\"instanceHeartBeatInterval\": 5000,\n"
+ "\t\t\"instanceHeartBeatTimeOut\": 15000,\n"
+ "\t\t\"ipDeleteTimeout\": 30000,\n"
+ "\t\t\"instanceIdGenerator\": \"simple\"\n"
+ "\t}],\n"
+ "\t\"lastRefTime\": 0,\n"
+ "\t\"checksum\": \"\",\n"
+ "\t\"allIPs\": false,\n"
+ "\t\"valid\": true\n"
+ "}";
} }