diff --git a/common/src/test/java/com/alibaba/nacos/common/utils/ConcurrentHashSetTest.java b/common/src/test/java/com/alibaba/nacos/common/utils/ConcurrentHashSetTest.java index 4a24f2216..752390de8 100644 --- a/common/src/test/java/com/alibaba/nacos/common/utils/ConcurrentHashSetTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/utils/ConcurrentHashSetTest.java @@ -17,12 +17,12 @@ package com.alibaba.nacos.common.utils; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * ConcurrentHashSet Test. @@ -32,108 +32,200 @@ import java.util.Set; */ public class ConcurrentHashSetTest { - Set concurrentHashSet; - - @Before - public void setUp() { - concurrentHashSet = new ConcurrentHashSet<>(); - concurrentHashSet.add(1); - concurrentHashSet.add(2); - concurrentHashSet.add(3); - concurrentHashSet.add(4); - concurrentHashSet.add(5); + @Test + public void testBasicOps() { + Set set = new ConcurrentHashSet<>(); + + // addition + Assert.assertTrue(set.add(0)); + Assert.assertTrue(set.add(1)); + Assert.assertTrue(set.contains(0)); + Assert.assertTrue(set.contains(1)); + Assert.assertFalse(set.contains(-1)); + Assert.assertEquals(2, set.size()); + + // iter + for (int i : set) { + Assert.assertTrue(i == 0 || i == 1); + } + + // removal + Assert.assertTrue(set.remove(0)); + Assert.assertFalse(set.remove(0)); + Assert.assertFalse(set.contains(0)); + Assert.assertTrue(set.contains(1)); + Assert.assertEquals(1, set.size()); + + // clear + Assert.assertFalse(set.isEmpty()); + set.clear(); + Assert.assertEquals(0, set.size()); + Assert.assertTrue(set.isEmpty()); + } + + @Test + public void testMultiThread() throws Exception { + int count = 5; + SetMultiThreadChecker hashSetChecker = new SetMultiThreadChecker(new HashSet<>()); + hashSetChecker.start(); + while (!hashSetChecker.hasConcurrentError() && hashSetChecker.isRunning()) { + TimeUnit.SECONDS.sleep(1); + if (count <= 0) { + hashSetChecker.stop(); + } + count--; + } + Assert.assertTrue(hashSetChecker.hasConcurrentError()); + + count = 5; + SetMultiThreadChecker concurrentSetChecker = new SetMultiThreadChecker(new ConcurrentHashSet<>()); + concurrentSetChecker.start(); + while (!concurrentSetChecker.hasConcurrentError() && concurrentSetChecker.isRunning()) { + TimeUnit.SECONDS.sleep(1); + if (count == 0) { + concurrentSetChecker.stop(); + } + count--; + } + Assert.assertFalse(concurrentSetChecker.hasConcurrentError()); + } + + static class SetMultiThreadChecker { + + private final AddDataThread addThread; + + private final DeleteDataThread deleteThread; + + private final IteratorThread iteratorThread; + + public SetMultiThreadChecker(Set setToCheck) { + for (int i = 0; i < 1000; i++) { + setToCheck.add(i); + } + this.addThread = new AddDataThread(setToCheck); + this.deleteThread = new DeleteDataThread(setToCheck); + this.iteratorThread = new IteratorThread(setToCheck); + } + + public void start() { + new Thread(addThread).start(); + new Thread(deleteThread).start(); + new Thread(iteratorThread).start(); + } + + public boolean hasConcurrentError() { + return addThread.hasConcurrentError() || deleteThread.hasConcurrentError() || iteratorThread.hasConcurrentError(); + } + + public boolean isRunning() { + return addThread.isRunning() || deleteThread.isRunning() || iteratorThread.isRunning(); + } + + public void stop() { + addThread.stop(); + deleteThread.stop(); + iteratorThread.stop(); + } } - @Test - public void size() { - Assert.assertEquals(concurrentHashSet.size(), 5); - } + abstract static class ConcurrentCheckThread implements Runnable { - @Test - public void contains() { - Assert.assertTrue(concurrentHashSet.contains(1)); - } + protected final Set hashSet; - @Test - public void testMultithreaded() { - try { - concurrentHashSet = new HashSet<>(); - executeThread(); - } catch (Exception e) { - Assert.assertTrue(e instanceof ConcurrentModificationException); + protected boolean concurrentError = false; + + protected boolean finish = false; + + public ConcurrentCheckThread(Set hashSet) { + this.hashSet = hashSet; + } + + public boolean hasConcurrentError() { + return concurrentError; } - try { - concurrentHashSet = new ConcurrentHashSet<>(); - executeThread(); - } catch (Exception e) { - Assert.assertNull(e); - } - } - - /** - * execute muti thread. - */ - public void executeThread() throws Exception { - for (int i = 0; i < 1000; i++) { - concurrentHashSet.add(i); + public void stop() { + finish = true; } - new Thread(new AddDataThread(concurrentHashSet)).start(); - new Thread(new DeleteDataThread(concurrentHashSet)).start(); - new Thread(new IteratorThread(concurrentHashSet)).start(); + public boolean isRunning() { + return !finish; + } + + @Override + public void run() { + try { + while (isRunning()) { + process(); + } + } catch (ConcurrentModificationException e) { + concurrentError = true; + } finally { + finish = true; + } + } + + protected abstract void process(); } //add data thread - static class AddDataThread implements Runnable { - Set hashSet; + static class AddDataThread extends ConcurrentCheckThread implements Runnable { public AddDataThread(Set hashSet) { - this.hashSet = hashSet; + super(hashSet); } @Override - public void run() { - while (true) { - int random = new Random().nextInt(); - hashSet.add(random); - } + protected void process() { + int random = new Random().nextInt(1000); + hashSet.add(random); } + } // delete data thread - static class DeleteDataThread implements Runnable { - Set hashSet; + static class DeleteDataThread extends ConcurrentCheckThread implements Runnable { public DeleteDataThread(Set hashSet) { - this.hashSet = hashSet; + super(hashSet); } @Override - public void run() { + protected void process() { int random = new Random().nextInt(1000); - while (true) { - hashSet.remove(random); - } + hashSet.remove(random); } + } - static class IteratorThread implements Runnable { - - Set hashSet; + static class IteratorThread extends ConcurrentCheckThread implements Runnable { public IteratorThread(Set hashSet) { - this.hashSet = hashSet; + super(hashSet); } @Override public void run() { System.out.println("start -- hashSet.size() : " + hashSet.size()); - for (Integer str : hashSet) { - System.out.println("value : " + str); + Integer f = null; + try { + while (isRunning()) { + for (Integer i : hashSet) { + f = i; + } + } + } catch (ConcurrentModificationException e) { + concurrentError = true; + } finally { + finish = true; } + System.out.println("finished at " + f); System.out.println("end -- hashSet.size() : " + hashSet.size()); } + + @Override + protected void process() { + } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java index 980f42a44..770432d0f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java @@ -26,6 +26,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage; import com.alibaba.nacos.core.distributed.distro.entity.DistroData; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.naming.cluster.transport.Serializer; +import com.alibaba.nacos.naming.constants.ClientConstants; import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.client.Client; import com.alibaba.nacos.naming.core.v2.client.ClientSyncData; @@ -158,7 +159,9 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro } private void handlerClientSyncData(ClientSyncData clientSyncData) { - Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); + Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}", + clientSyncData.getClientId(), + clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L)); clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); Client client = clientManager.getClient(clientSyncData.getClientId()); upgradeClient(client, clientSyncData); @@ -220,13 +223,19 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); } } + client.setRevision( + clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0)); } @Override public boolean processVerifyData(DistroData distroData, String sourceAddress) { DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), DistroClientVerifyInfo.class); - if (clientManager.verifyClient(verifyData.getClientId())) { + // If not upgraded to 2.0.X, just renew client and return. + if (!upgradeJudgement.isUseGrpcFeatures()) { + verifyData.setRevision(0L); + } + if (clientManager.verifyClient(verifyData)) { return true; } Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress); @@ -271,19 +280,22 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro @Override public List getVerifyData() { - List result = new LinkedList<>(); + List result = null; for (String each : clientManager.allClientId()) { Client client = clientManager.getClient(each); if (null == client || !client.isEphemeral()) { continue; } if (clientManager.isResponsibleClient(client)) { - // TODO add revision for client. - DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0); + DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), + client.recalculateRevision()); DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); data.setType(DataOperation.VERIFY); + if (result == null) { + result = new LinkedList<>(); + } result.add(data); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java b/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java index 1a0724ff5..cab996df6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java @@ -35,6 +35,8 @@ public class ClientConstants { public static final String PERSISTENT_IP_PORT = "persistentIpPort"; + public static final String REVISION = "revision"; + public static final String PERSISTENT_SUFFIX = "false"; public static final String CLIENT_EXPIRED_TIME_CONFIG_KEY = "nacos.naming.client.expired.time"; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java index a51e6ac01..d6f402af1 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java @@ -25,12 +25,16 @@ import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.naming.pojo.Subscriber; +import com.alibaba.nacos.naming.utils.DistroUtils; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION; /** * Abstract implementation of {@code Client}. @@ -45,8 +49,11 @@ public abstract class AbstractClient implements Client { protected volatile long lastUpdatedTime; - public AbstractClient() { + private final AtomicLong revision; + + public AbstractClient(Long revision) { lastUpdatedTime = System.currentTimeMillis(); + this.revision = new AtomicLong(revision == null ? 0 : revision); } @Override @@ -151,7 +158,9 @@ public abstract class AbstractClient implements Client { instances.add(entry.getValue()); } } - return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData); + ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData); + data.getAttributes().addClientAttribute(REVISION, getRevision()); + return data; } private static BatchInstanceData buildBatchInstanceData(BatchInstanceData batchInstanceData, List batchNamespaces, @@ -178,4 +187,22 @@ public abstract class AbstractClient implements Client { } MetricsMonitor.getIpCountMonitor().addAndGet(-1 * subscribers.size()); } + + @Override + public long recalculateRevision() { + int hash = DistroUtils.hash(this); + revision.set(hash); + return hash; + } + + @Override + public long getRevision() { + return revision.get(); + } + + @Override + public void setRevision(long revision) { + this.revision.set(revision); + } + } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java index 58f3b2189..6cbe0112d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java @@ -141,4 +141,23 @@ public interface Client { * Release current client and release resources if neccessary. */ void release(); + + /** + * Recalculate client revision and get its value. + * @return recalculated revision value + */ + long recalculateRevision(); + + /** + * Get client revision. + * @return current revision without recalculation + */ + long getRevision(); + + /** + * Set client revision. + * @param revision revision of this client to update + */ + void setRevision(long revision); + } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java index f38db8a95..67f544f24 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java @@ -21,6 +21,8 @@ import com.alibaba.nacos.naming.core.v2.client.ClientAttributes; import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory; import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient; +import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION; + /** * Client factory for {@link ConnectionBasedClient}. * @@ -35,11 +37,13 @@ public class ConnectionBasedClientFactory implements ClientFactory { + InstancePublishInfo ip = client.getInstancePublishInfo(s); + double weight = getWeight(ip); + Boolean enabled = getEnabled(ip); + String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME); + return Objects.hash( + s.getNamespace(), + s.getGroup(), + s.getName(), + s.isEphemeral(), + ip.getIp(), + ip.getPort(), + weight, + ip.isHealthy(), + enabled, + cluster, + ip.getExtendDatum() + ); + }) + .collect(Collectors.toSet())); + } + + /** + * Calculate checksum for client. + */ + public static String checksum(Client client) { + String s = buildUniqueString(client); + if (s == null) { + return "0"; + } + return MD5Utils.md5Hex(s, Constants.ENCODE); + } + + /** + * Calculate unique string for client. + */ + public static String buildUniqueString(Client client) { + if (!(client instanceof IpPortBasedClient)) { + return null; + } + StringBuilder sb = new StringBuilder(); + sb.append(client.getClientId()).append('|'); + client.getAllPublishedService().stream() + .sorted(Comparator.comparing(DistroUtils::serviceKey)) + .forEach(s -> { + InstancePublishInfo ip = client.getInstancePublishInfo(s); + double weight = getWeight(ip); + Boolean enabled = getEnabled(ip); + String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME); + sb.append(serviceKey(s)).append('_') + .append(ip.getIp()).append(':').append(ip.getPort()).append('_') + .append(weight).append('_') + .append(ip.isHealthy()).append('_') + .append(enabled).append('_') + .append(cluster).append('_') + .append(convertMap2String(ip.getExtendDatum())) + .append(','); + }); + return sb.toString(); + } + + private static boolean getEnabled(InstancePublishInfo ip) { + Object enabled0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_ENABLE); + if (!(enabled0 instanceof Boolean)) { + return true; + } else { + return (Boolean) enabled0; + } + } + + private static double getWeight(InstancePublishInfo ip) { + Object weight0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_WEIGHT); + if (!(weight0 instanceof Number)) { + return DEFAULT_INSTANCE_WEIGHT; + } else { + return ((Number) weight0).doubleValue(); + } + } + + /** + * Convert Map to KV string with ':'. + * + * @param map map need to be converted + * @return KV string with ':' + */ + private static String convertMap2String(Map map) { + if (map == null || map.isEmpty()) { + return StringUtils.EMPTY; + } + StringBuilder sb = new StringBuilder(); + List keys = new ArrayList<>(map.keySet()); + Collections.sort(keys); + for (String key : keys) { + sb.append(key); + sb.append(':'); + sb.append(map.get(key)); + sb.append(','); + } + return sb.toString(); + } + +} diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java index a66188264..4544809c1 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java @@ -47,7 +47,7 @@ public class AbstractClientTest { @Before public void setUp() { - abstractClient = new MockAbstractClient(); + abstractClient = new MockAbstractClient(0L); service = Service.newService("ns1", "group1", "serviceName001"); instancePublishInfo = new InstancePublishInfo("127.0.0.1", 8890); subscriber = new Subscriber("127.0.0.1:8848", "agent1", "appName", "127.0.0.1", diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java index 7bfd557ed..e38baf9fc 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java @@ -25,6 +25,10 @@ package com.alibaba.nacos.naming.core.v2.client; */ public class MockAbstractClient extends AbstractClient { + public MockAbstractClient(Long revision) { + super(revision); + } + @Override public String getClientId() { return "-1"; diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java index d5a61f7b7..3492fa3bd 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java @@ -32,7 +32,7 @@ public class ConnectionBasedClientTest { @Before public void setUp() throws Exception { - connectionBasedClient = new ConnectionBasedClient(connectionId, isNative); + connectionBasedClient = new ConnectionBasedClient(connectionId, isNative, null); } @Test diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java index 28b1d9f94..15ad549b9 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java @@ -53,7 +53,7 @@ public class IpPortBasedClientTest { @Before public void setUp() throws Exception { - ipPortBasedClient = new IpPortBasedClient(clientId, true); + ipPortBasedClient = new IpPortBasedClient(clientId, true, 123L); ipPortBasedClient.init(); instancePublishInfo = new InstancePublishInfo(); } @@ -84,6 +84,18 @@ public class IpPortBasedClientTest { assertEquals(allInstancePublishInfo.iterator().next(), instancePublishInfo); } + @Test + public void testRecalculateRevision() { + assertEquals(123L, ipPortBasedClient.getRevision()); + assertEquals(-1531701243L, ipPortBasedClient.recalculateRevision()); + } + + @Test + public void testConstructor0() { + IpPortBasedClient client = new IpPortBasedClient(clientId, true); + assertEquals(0, client.getRevision()); + } + @After public void tearDown() { ipPortBasedClient.release(); diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/ClientManagerDelegateTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/ClientManagerDelegateTest.java index f2e13e819..f06993f79 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/ClientManagerDelegateTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/ClientManagerDelegateTest.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.core.v2.client.manager; +import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo; import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager; import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager; import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager; @@ -86,18 +87,20 @@ public class ClientManagerDelegateTest { @Test public void testChooseEphemeralIpPortClient() { - delegate.verifyClient(ephemeralIpPortId); - verify(connectionBasedClientManager, never()).verifyClient(ephemeralIpPortId); - verify(ephemeralIpPortClientManager).verifyClient(ephemeralIpPortId); - verify(persistentIpPortClientManager, never()).verifyClient(ephemeralIpPortId); + DistroClientVerifyInfo verify = new DistroClientVerifyInfo(ephemeralIpPortId, 0); + delegate.verifyClient(verify); + verify(connectionBasedClientManager, never()).verifyClient(verify); + verify(ephemeralIpPortClientManager).verifyClient(verify); + verify(persistentIpPortClientManager, never()).verifyClient(verify); } @Test public void testChoosePersistentIpPortClient() { - delegate.verifyClient(persistentIpPortId); - verify(connectionBasedClientManager, never()).verifyClient(persistentIpPortId); - verify(ephemeralIpPortClientManager, never()).verifyClient(persistentIpPortId); - verify(persistentIpPortClientManager).verifyClient(persistentIpPortId); + DistroClientVerifyInfo verify = new DistroClientVerifyInfo(persistentIpPortId, 0); + delegate.verifyClient(verify); + verify(connectionBasedClientManager, never()).verifyClient(verify); + verify(ephemeralIpPortClientManager, never()).verifyClient(verify); + verify(persistentIpPortClientManager).verifyClient(verify); } @Test diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManagerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManagerTest.java index fa03fa937..c1d198595 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManagerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManagerTest.java @@ -19,14 +19,19 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl; import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMeta; +import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo; +import com.alibaba.nacos.naming.constants.ClientConstants; import com.alibaba.nacos.naming.core.v2.client.ClientAttributes; import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient; +import com.alibaba.nacos.sys.env.EnvUtil; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.mock.env.MockEnvironment; import java.util.Collection; @@ -54,6 +59,11 @@ public class ConnectionBasedClientManagerTest { @Mock private ClientAttributes clientAttributes; + @BeforeClass + public static void setUpBeforeClass() { + EnvUtil.setEnvironment(new MockEnvironment()); + } + @Before public void setUp() throws Exception { connectionBasedClientManager = new ConnectionBasedClientManager(); @@ -62,8 +72,9 @@ public class ConnectionBasedClientManagerTest { when(connection.getMetaInfo()).thenReturn(connectionMeta); when(connectionMeta.getLabel(RemoteConstants.LABEL_MODULE)).thenReturn(RemoteConstants.LABEL_MODULE_NAMING); + when(clientAttributes.getClientAttribute(ClientConstants.REVISION, 0)).thenReturn(0); assertTrue(connectionBasedClientManager.syncClientConnected(connectionId, clientAttributes)); - assertTrue(connectionBasedClientManager.verifyClient(connectionId)); + assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0))); connectionBasedClientManager.clientConnected(connection); } @@ -72,16 +83,16 @@ public class ConnectionBasedClientManagerTest { public void testAllClientId() { Collection allClientIds = connectionBasedClientManager.allClientId(); assertEquals(1, allClientIds.size()); - assertTrue(connectionBasedClientManager.verifyClient(connectionId)); + assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0))); assertTrue(allClientIds.contains(connectionId)); } @Test public void testContainsConnectionId() { - assertTrue(connectionBasedClientManager.verifyClient(connectionId)); + assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0))); assertTrue(connectionBasedClientManager.contains(connectionId)); String unUsedClientId = "127.0.0.1:8888#true"; - assertFalse(connectionBasedClientManager.verifyClient(unUsedClientId)); + assertFalse(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(unUsedClientId, 0))); assertFalse(connectionBasedClientManager.contains(unUsedClientId)); } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManagerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManagerTest.java index 4fa786951..62c8b7ad1 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManagerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManagerTest.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl; +import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo; +import com.alibaba.nacos.naming.constants.ClientConstants; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.v2.client.Client; import com.alibaba.nacos.naming.core.v2.client.ClientAttributes; @@ -67,7 +69,9 @@ public class EphemeralIpPortClientManagerTest { public void setUp() throws Exception { ephemeralIpPortClientManager = new EphemeralIpPortClientManager(distroMapper, switchDomain); when(client.getClientId()).thenReturn(ephemeralIpPortId); + when(client.getRevision()).thenReturn(1320L); ephemeralIpPortClientManager.clientConnected(client); + when(attributes.getClientAttribute(ClientConstants.REVISION, 0)).thenReturn(5120); ephemeralIpPortClientManager.syncClientConnected(syncedClientId, attributes); } @@ -92,4 +96,18 @@ public class EphemeralIpPortClientManagerTest { String unUsedClientId = "127.0.0.1:8888#true"; assertFalse(ephemeralIpPortClientManager.contains(unUsedClientId)); } + + @Test + public void testVerifyClient0() { + assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 0))); + assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 0))); + } + + @Test + public void testVerifyClient() { + assertFalse(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 1))); + assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 1320))); + assertFalse(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 1))); + assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 5120))); + } } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManagerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManagerTest.java index 0533276bd..30d5931dc 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManagerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl; +import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo; import com.alibaba.nacos.naming.core.v2.client.ClientAttributes; import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; import org.junit.After; @@ -85,7 +86,7 @@ public class PersistentIpPortClientManagerTest { @Test(expected = UnsupportedOperationException.class) public void makeSureNoVerify() { - persistentIpPortClientManager.verifyClient(clientId); + persistentIpPortClientManager.verifyClient(new DistroClientVerifyInfo(clientId, 0)); } @Test diff --git a/naming/src/test/java/com/alibaba/nacos/naming/utils/DistroUtilsTest.java b/naming/src/test/java/com/alibaba/nacos/naming/utils/DistroUtilsTest.java new file mode 100644 index 000000000..2930b3db4 --- /dev/null +++ b/naming/src/test/java/com/alibaba/nacos/naming/utils/DistroUtilsTest.java @@ -0,0 +1,173 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.utils; + +import com.alibaba.nacos.naming.constants.Constants; +import com.alibaba.nacos.naming.core.v2.client.Client; +import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +import static com.alibaba.nacos.api.common.Constants.DEFAULT_CLUSTER_NAME; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link DistroUtils}. + * + * @author Pixy Yuan + * on 2021/10/9 + */ +public class DistroUtilsTest { + + private static final String NAMESPACE = "testNamespace-"; + + private static final String GROUP = "testGroup-"; + + private static final String SERVICE = "testName-"; + + private static final int N = 100000; + + private IpPortBasedClient client0; + + private IpPortBasedClient client1; + + @Before + public void setUp() throws Exception { + client0 = buildClient("127.0.0.1", 8848, false, true, DEFAULT_CLUSTER_NAME, + null); + HashMap metadata = new HashMap<>(); + metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L); + metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, false); + metadata.put("Custom.metadataId1", "abc"); + metadata.put("Custom.metadataId2", 123); + metadata.put("Custom.metadataId3", null); + client1 = buildClient("127.0.0.2", 8848, true, true, "cluster1", + metadata, 20); + } + + private IpPortBasedClient buildClient(String ip, int port, boolean ephemeral, boolean healthy, String cluster, + HashMap extendDatum) { + return buildClient(ip, port, ephemeral, healthy, cluster, extendDatum, 1); + } + + private IpPortBasedClient buildClient(String ip, int port, boolean ephemeral, boolean healthy, String cluster, + HashMap extendDatum, int serviceCount) { + InstancePublishInfo instance = new InstancePublishInfo(ip, port); + instance.setCluster(cluster); + instance.setHealthy(healthy); + IpPortBasedClient client = new IpPortBasedClient(String.format("%s:%s#%s", ip, port, ephemeral), ephemeral); + if (extendDatum != null) { + instance.setExtendDatum(extendDatum); + } + for (int i = 1; i < serviceCount + 1; i++) { + client.putServiceInstance(Service.newService(DistroUtilsTest.NAMESPACE + i, + DistroUtilsTest.GROUP + i, DistroUtilsTest.SERVICE + i, ephemeral), + instance); + } + return client; + } + + @Test + public void testHash0() { + assertEquals(-1320954445, DistroUtils.hash(client0)); + } + + @Test + public void testRevision0() { + assertEquals(-1713189600L, DistroUtils.stringHash(client0)); + } + + @Test + public void testChecksum0() { + for (int i = 0; i < 3; i++) { + assertEquals("2a3f62f84a4b6f2a979434276d546ac1", DistroUtils.checksum(client0)); + } + } + + @Test + public void testBuildUniqueString0() { + assertEquals("127.0.0.1:8848#false|testNamespace-1##testGroup-1@@testName-1##false_127.0.0.1:8848_1.0_true_true_DEFAULT_,", + DistroUtils.buildUniqueString(client0)); + } + + @Test + public void testBuildUniqueString1() { + HashMap metadata = new HashMap<>(); + metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L); + metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, false); + metadata.put("Custom.metadataId1", "abc"); + metadata.put("Custom.metadataId2", 123); + metadata.put("Custom.metadataId3", null); + Client client = buildClient("128.0.0.1", 8848, false, false, DEFAULT_CLUSTER_NAME, + metadata); + assertEquals("128.0.0.1:8848#false|" + + "testNamespace-1##testGroup-1@@testName-1##false_128.0.0.1:8848_2.0_false_false_DEFAULT_" + + "Custom.metadataId1:abc,Custom.metadataId2:123,Custom.metadataId3:null," + + "publishInstanceEnable:false,publishInstanceWeight:2,,", + DistroUtils.buildUniqueString(client)); + assertEquals(2128732271L, DistroUtils.stringHash(client)); + assertEquals("ac9bf94dc4bd6a35e5ff9734868eafea", DistroUtils.checksum(client)); + } + + @Test + public void testBuildUniqueString2() { + HashMap metadata = new HashMap<>(); + metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L); + metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, true); + metadata.put("Custom.metadataId1", "abc"); + Client client = buildClient("128.0.0.2", 7001, true, false, "cluster1", + metadata); + assertEquals("128.0.0.2:7001#true|" + + "testNamespace-1##testGroup-1@@testName-1##true_128.0.0.2:7001_2.0_false_true_cluster1_" + + "Custom.metadataId1:abc,publishInstanceEnable:true,publishInstanceWeight:2,,", + DistroUtils.buildUniqueString(client)); + assertEquals(775352583L, DistroUtils.stringHash(client)); + assertEquals("82d8e086a880f088320349b895b22948", DistroUtils.checksum(client)); + } + + @Test + public void performanceTestOfChecksum() { + long start = System.nanoTime(); + for (int i = 0; i < N; i++) { + DistroUtils.checksum(client1); + } + System.out.printf("Distro Verify Checksum Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N); + } + + @Test + public void performanceTestOfStringHash() { + long start = System.nanoTime(); + for (int i = 0; i < N; i++) { + DistroUtils.stringHash(client1); + } + System.out.printf("Distro Verify Revision Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N); + } + + @Test + public void performanceTestOfHash() { + long start = System.nanoTime(); + for (int i = 0; i < N; i++) { + DistroUtils.hash(client1); + } + System.out.printf("Distro Verify Hash Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N); + } + +} \ No newline at end of file