Support client hash revision for DISTRO (#9214)

* [ISSUE #9210] Using hash as ephemeral clients' revision for DISTRO verification.

* [ISSUE #9210] Improve performance of ephemeral client hash calc.

* [ISSUE #9210] Remove calculation for ephemeral client's subscribers.

* [ISSUE #9210] Persist and sync revision for clients.

* [ISSUE #9210] Improve hash of ephemeral client.

* [ISSUE #9210] Improve performance of hash calc for clients.

* [ISSUE #9210] Make distro verification compatible with zero revision from old servers.

* [ISSUE #9210] Log clientId when VERIFY-FAILED.

* [ISSUE #9210] Fix long class cast exception to REVISION.

* [ISSUE #9210] Just renew clients when verify v2 clients on v1 mode.

* [ISSUE #9210] Add UTs for ephemeral client verification.

* [ISSUE #9210] Fix codestyle.

* [ISSUE #9210] Fix IpPortBasedClientTest.

* [ISSUE #9210] Fix codestyle.

* [ISSUE #9210] Fix license.

* [ISSUE #9210] Fix ConcurrentHashSetTest cases to verify concurrent modification properly.

* [ISSUE #9210] Fix codestyle.
This commit is contained in:
Pixy Yuan 2022-09-28 13:56:33 +08:00 committed by GitHub
parent 2d84c0aea7
commit 04fe7eab2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 693 additions and 106 deletions

View File

@ -17,12 +17,12 @@
package com.alibaba.nacos.common.utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* ConcurrentHashSet Test.
@ -32,108 +32,200 @@ import java.util.Set;
*/
public class ConcurrentHashSetTest {
Set<Integer> concurrentHashSet;
@Before
public void setUp() {
concurrentHashSet = new ConcurrentHashSet<>();
concurrentHashSet.add(1);
concurrentHashSet.add(2);
concurrentHashSet.add(3);
concurrentHashSet.add(4);
concurrentHashSet.add(5);
@Test
public void testBasicOps() {
Set<Integer> set = new ConcurrentHashSet<>();
// addition
Assert.assertTrue(set.add(0));
Assert.assertTrue(set.add(1));
Assert.assertTrue(set.contains(0));
Assert.assertTrue(set.contains(1));
Assert.assertFalse(set.contains(-1));
Assert.assertEquals(2, set.size());
// iter
for (int i : set) {
Assert.assertTrue(i == 0 || i == 1);
}
// removal
Assert.assertTrue(set.remove(0));
Assert.assertFalse(set.remove(0));
Assert.assertFalse(set.contains(0));
Assert.assertTrue(set.contains(1));
Assert.assertEquals(1, set.size());
// clear
Assert.assertFalse(set.isEmpty());
set.clear();
Assert.assertEquals(0, set.size());
Assert.assertTrue(set.isEmpty());
}
@Test
public void testMultiThread() throws Exception {
int count = 5;
SetMultiThreadChecker hashSetChecker = new SetMultiThreadChecker(new HashSet<>());
hashSetChecker.start();
while (!hashSetChecker.hasConcurrentError() && hashSetChecker.isRunning()) {
TimeUnit.SECONDS.sleep(1);
if (count <= 0) {
hashSetChecker.stop();
}
count--;
}
Assert.assertTrue(hashSetChecker.hasConcurrentError());
count = 5;
SetMultiThreadChecker concurrentSetChecker = new SetMultiThreadChecker(new ConcurrentHashSet<>());
concurrentSetChecker.start();
while (!concurrentSetChecker.hasConcurrentError() && concurrentSetChecker.isRunning()) {
TimeUnit.SECONDS.sleep(1);
if (count == 0) {
concurrentSetChecker.stop();
}
count--;
}
Assert.assertFalse(concurrentSetChecker.hasConcurrentError());
}
static class SetMultiThreadChecker {
private final AddDataThread addThread;
private final DeleteDataThread deleteThread;
private final IteratorThread iteratorThread;
public SetMultiThreadChecker(Set<Integer> setToCheck) {
for (int i = 0; i < 1000; i++) {
setToCheck.add(i);
}
this.addThread = new AddDataThread(setToCheck);
this.deleteThread = new DeleteDataThread(setToCheck);
this.iteratorThread = new IteratorThread(setToCheck);
}
public void start() {
new Thread(addThread).start();
new Thread(deleteThread).start();
new Thread(iteratorThread).start();
}
public boolean hasConcurrentError() {
return addThread.hasConcurrentError() || deleteThread.hasConcurrentError() || iteratorThread.hasConcurrentError();
}
public boolean isRunning() {
return addThread.isRunning() || deleteThread.isRunning() || iteratorThread.isRunning();
}
public void stop() {
addThread.stop();
deleteThread.stop();
iteratorThread.stop();
}
}
@Test
public void size() {
Assert.assertEquals(concurrentHashSet.size(), 5);
}
abstract static class ConcurrentCheckThread implements Runnable {
@Test
public void contains() {
Assert.assertTrue(concurrentHashSet.contains(1));
}
protected final Set<Integer> hashSet;
@Test
public void testMultithreaded() {
try {
concurrentHashSet = new HashSet<>();
executeThread();
} catch (Exception e) {
Assert.assertTrue(e instanceof ConcurrentModificationException);
protected boolean concurrentError = false;
protected boolean finish = false;
public ConcurrentCheckThread(Set<Integer> hashSet) {
this.hashSet = hashSet;
}
public boolean hasConcurrentError() {
return concurrentError;
}
try {
concurrentHashSet = new ConcurrentHashSet<>();
executeThread();
} catch (Exception e) {
Assert.assertNull(e);
}
}
/**
* execute muti thread.
*/
public void executeThread() throws Exception {
for (int i = 0; i < 1000; i++) {
concurrentHashSet.add(i);
public void stop() {
finish = true;
}
new Thread(new AddDataThread(concurrentHashSet)).start();
new Thread(new DeleteDataThread(concurrentHashSet)).start();
new Thread(new IteratorThread(concurrentHashSet)).start();
public boolean isRunning() {
return !finish;
}
@Override
public void run() {
try {
while (isRunning()) {
process();
}
} catch (ConcurrentModificationException e) {
concurrentError = true;
} finally {
finish = true;
}
}
protected abstract void process();
}
//add data thread
static class AddDataThread implements Runnable {
Set<Integer> hashSet;
static class AddDataThread extends ConcurrentCheckThread implements Runnable {
public AddDataThread(Set<Integer> hashSet) {
this.hashSet = hashSet;
super(hashSet);
}
@Override
public void run() {
while (true) {
int random = new Random().nextInt();
hashSet.add(random);
}
protected void process() {
int random = new Random().nextInt(1000);
hashSet.add(random);
}
}
// delete data thread
static class DeleteDataThread implements Runnable {
Set<Integer> hashSet;
static class DeleteDataThread extends ConcurrentCheckThread implements Runnable {
public DeleteDataThread(Set<Integer> hashSet) {
this.hashSet = hashSet;
super(hashSet);
}
@Override
public void run() {
protected void process() {
int random = new Random().nextInt(1000);
while (true) {
hashSet.remove(random);
}
hashSet.remove(random);
}
}
static class IteratorThread implements Runnable {
Set<Integer> hashSet;
static class IteratorThread extends ConcurrentCheckThread implements Runnable {
public IteratorThread(Set<Integer> hashSet) {
this.hashSet = hashSet;
super(hashSet);
}
@Override
public void run() {
System.out.println("start -- hashSet.size() : " + hashSet.size());
for (Integer str : hashSet) {
System.out.println("value : " + str);
Integer f = null;
try {
while (isRunning()) {
for (Integer i : hashSet) {
f = i;
}
}
} catch (ConcurrentModificationException e) {
concurrentError = true;
} finally {
finish = true;
}
System.out.println("finished at " + f);
System.out.println("end -- hashSet.size() : " + hashSet.size());
}
@Override
protected void process() {
}
}
}

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientSyncData;
@ -158,7 +159,9 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
}
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}",
clientSyncData.getClientId(),
clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
upgradeClient(client, clientSyncData);
@ -220,13 +223,19 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
client.setRevision(
clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}
@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
if (clientManager.verifyClient(verifyData.getClientId())) {
// If not upgraded to 2.0.X, just renew client and return.
if (!upgradeJudgement.isUseGrpcFeatures()) {
verifyData.setRevision(0L);
}
if (clientManager.verifyClient(verifyData)) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
@ -271,19 +280,22 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>();
List<DistroData> result = null;
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
if (clientManager.isResponsibleClient(client)) {
// TODO add revision for client.
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
client.recalculateRevision());
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
data.setType(DataOperation.VERIFY);
if (result == null) {
result = new LinkedList<>();
}
result.add(data);
}
}

View File

@ -35,6 +35,8 @@ public class ClientConstants {
public static final String PERSISTENT_IP_PORT = "persistentIpPort";
public static final String REVISION = "revision";
public static final String PERSISTENT_SUFFIX = "false";
public static final String CLIENT_EXPIRED_TIME_CONFIG_KEY = "nacos.naming.client.expired.time";

View File

@ -25,12 +25,16 @@ import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.DistroUtils;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;
/**
* Abstract implementation of {@code Client}.
@ -45,8 +49,11 @@ public abstract class AbstractClient implements Client {
protected volatile long lastUpdatedTime;
public AbstractClient() {
private final AtomicLong revision;
public AbstractClient(Long revision) {
lastUpdatedTime = System.currentTimeMillis();
this.revision = new AtomicLong(revision == null ? 0 : revision);
}
@Override
@ -151,7 +158,9 @@ public abstract class AbstractClient implements Client {
instances.add(entry.getValue());
}
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
data.getAttributes().addClientAttribute(REVISION, getRevision());
return data;
}
private static BatchInstanceData buildBatchInstanceData(BatchInstanceData batchInstanceData, List<String> batchNamespaces,
@ -178,4 +187,22 @@ public abstract class AbstractClient implements Client {
}
MetricsMonitor.getIpCountMonitor().addAndGet(-1 * subscribers.size());
}
@Override
public long recalculateRevision() {
int hash = DistroUtils.hash(this);
revision.set(hash);
return hash;
}
@Override
public long getRevision() {
return revision.get();
}
@Override
public void setRevision(long revision) {
this.revision.set(revision);
}
}

View File

@ -141,4 +141,23 @@ public interface Client {
* Release current client and release resources if neccessary.
*/
void release();
/**
* Recalculate client revision and get its value.
* @return recalculated revision value
*/
long recalculateRevision();
/**
* Get client revision.
* @return current revision without recalculation
*/
long getRevision();
/**
* Set client revision.
* @param revision revision of this client to update
*/
void setRevision(long revision);
}

View File

@ -21,6 +21,8 @@ import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient;
import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;
/**
* Client factory for {@link ConnectionBasedClient}.
*
@ -35,11 +37,13 @@ public class ConnectionBasedClientFactory implements ClientFactory<ConnectionBas
@Override
public ConnectionBasedClient newClient(String clientId, ClientAttributes attributes) {
return new ConnectionBasedClient(clientId, true);
long revision = attributes.getClientAttribute(REVISION, 0);
return new ConnectionBasedClient(clientId, true, revision);
}
@Override
public ConnectionBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {
return new ConnectionBasedClient(clientId, false);
long revision = attributes.getClientAttribute(REVISION, 0);
return new ConnectionBasedClient(clientId, false, revision);
}
}

View File

@ -21,6 +21,8 @@ import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;
/**
* Client factory for ephemeral {@link IpPortBasedClient}.
*
@ -35,11 +37,13 @@ public class EphemeralIpPortClientFactory implements ClientFactory<IpPortBasedCl
@Override
public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
return new IpPortBasedClient(clientId, true);
long revision = attributes.getClientAttribute(REVISION, 0);
return new IpPortBasedClient(clientId, true, revision);
}
@Override
public IpPortBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {
return new IpPortBasedClient(clientId, true);
long revision = attributes.getClientAttribute(REVISION, 0);
return new IpPortBasedClient(clientId, true, revision);
}
}

View File

@ -41,8 +41,8 @@ public class ConnectionBasedClient extends AbstractClient {
*/
private volatile long lastRenewTime;
public ConnectionBasedClient(String connectionId, boolean isNative) {
super();
public ConnectionBasedClient(String connectionId, boolean isNative, Long revision) {
super(revision);
this.connectionId = connectionId;
this.isNative = isNative;
lastRenewTime = getLastUpdatedTime();

View File

@ -51,6 +51,11 @@ public class IpPortBasedClient extends AbstractClient {
private HealthCheckTaskV2 healthCheckTaskV2;
public IpPortBasedClient(String clientId, boolean ephemeral) {
this(clientId, ephemeral, null);
}
public IpPortBasedClient(String clientId, boolean ephemeral, Long revision) {
super(revision);
this.ephemeral = ephemeral;
this.clientId = clientId;
this.responsibleId = getResponsibleTagFromId();

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
@ -96,8 +97,8 @@ public interface ClientManager {
/**
* verify client.
*
* @param clientId client id
* @param verifyData verify data from remote responsible server
* @return true if client is valid, otherwise is false.
*/
boolean verifyClient(String clientId);
boolean verifyClient(DistroClientVerifyInfo verifyData);
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
@ -96,8 +97,8 @@ public class ClientManagerDelegate implements ClientManager {
}
@Override
public boolean verifyClient(String clientId) {
return getClientManagerById(clientId).verifyClient(clientId);
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);
}
private ClientManager getClientManagerById(String clientId) {

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
@ -126,11 +127,17 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener
}
@Override
public boolean verifyClient(String clientId) {
ConnectionBasedClient client = clients.get(clientId);
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
ConnectionBasedClient client = clients.get(verifyData.getClientId());
if (null != client) {
client.setLastRenewTime();
return true;
// remote node of old version will always verify with zero revision
if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
client.setLastRenewTime();
return true;
} else {
Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] ConnectionBasedClient[{}] revision local={}, remote={}",
client.getClientId(), client.getRevision(), verifyData.getRevision());
}
}
return false;
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.v2.client.Client;
@ -118,12 +119,19 @@ public class EphemeralIpPortClientManager implements ClientManager {
}
@Override
public boolean verifyClient(String clientId) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
String clientId = verifyData.getClientId();
IpPortBasedClient client = clients.get(clientId);
if (null != client) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
// remote node of old version will always verify with zero revision
if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
} else {
Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient[{}] revision local={}, remote={}",
client.getClientId(), client.getRevision(), verifyData.getRevision());
}
}
return false;
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager.impl;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
@ -114,7 +115,7 @@ public class PersistentIpPortClientManager implements ClientManager {
}
@Override
public boolean verifyClient(String clientId) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
throw new UnsupportedOperationException("");
}

View File

@ -69,6 +69,7 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
@ -113,6 +114,7 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
}
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
client.setLastUpdatedTime();
client.recalculateRevision();
if (null != removedInstance) {
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(

View File

@ -0,0 +1,180 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.utils;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.alibaba.nacos.naming.constants.Constants.DEFAULT_INSTANCE_WEIGHT;
import static com.alibaba.nacos.naming.constants.Constants.PUBLISH_INSTANCE_ENABLE;
import static com.alibaba.nacos.naming.constants.Constants.PUBLISH_INSTANCE_WEIGHT;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DEFAULT_CLUSTER_NAME;
/**
* Utils to generate revision/checksum of distro clients.
*
* @author Pixy Yuan
* on 2021/10/9
*/
public class DistroUtils {
/**
* Build service key.
*/
public static String serviceKey(Service service) {
return service.getNamespace()
+ "##"
+ service.getGroupedServiceName()
+ "##"
+ service.isEphemeral();
}
/**
* Calculate hash of unique string built by client's metadata.
*/
public static int stringHash(Client client) {
String s = buildUniqueString(client);
if (s == null) {
return 0;
}
return s.hashCode();
}
/**
* Calculate hash for client. Reduce strings in memory and cpu costs.
*/
public static int hash(Client client) {
if (!(client instanceof IpPortBasedClient)) {
return 0;
}
return Objects.hash(client.getClientId(),
client.getAllPublishedService().stream()
.map(s -> {
InstancePublishInfo ip = client.getInstancePublishInfo(s);
double weight = getWeight(ip);
Boolean enabled = getEnabled(ip);
String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME);
return Objects.hash(
s.getNamespace(),
s.getGroup(),
s.getName(),
s.isEphemeral(),
ip.getIp(),
ip.getPort(),
weight,
ip.isHealthy(),
enabled,
cluster,
ip.getExtendDatum()
);
})
.collect(Collectors.toSet()));
}
/**
* Calculate checksum for client.
*/
public static String checksum(Client client) {
String s = buildUniqueString(client);
if (s == null) {
return "0";
}
return MD5Utils.md5Hex(s, Constants.ENCODE);
}
/**
* Calculate unique string for client.
*/
public static String buildUniqueString(Client client) {
if (!(client instanceof IpPortBasedClient)) {
return null;
}
StringBuilder sb = new StringBuilder();
sb.append(client.getClientId()).append('|');
client.getAllPublishedService().stream()
.sorted(Comparator.comparing(DistroUtils::serviceKey))
.forEach(s -> {
InstancePublishInfo ip = client.getInstancePublishInfo(s);
double weight = getWeight(ip);
Boolean enabled = getEnabled(ip);
String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME);
sb.append(serviceKey(s)).append('_')
.append(ip.getIp()).append(':').append(ip.getPort()).append('_')
.append(weight).append('_')
.append(ip.isHealthy()).append('_')
.append(enabled).append('_')
.append(cluster).append('_')
.append(convertMap2String(ip.getExtendDatum()))
.append(',');
});
return sb.toString();
}
private static boolean getEnabled(InstancePublishInfo ip) {
Object enabled0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_ENABLE);
if (!(enabled0 instanceof Boolean)) {
return true;
} else {
return (Boolean) enabled0;
}
}
private static double getWeight(InstancePublishInfo ip) {
Object weight0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_WEIGHT);
if (!(weight0 instanceof Number)) {
return DEFAULT_INSTANCE_WEIGHT;
} else {
return ((Number) weight0).doubleValue();
}
}
/**
* Convert Map to KV string with ':'.
*
* @param map map need to be converted
* @return KV string with ':'
*/
private static String convertMap2String(Map<String, Object> map) {
if (map == null || map.isEmpty()) {
return StringUtils.EMPTY;
}
StringBuilder sb = new StringBuilder();
List<String> keys = new ArrayList<>(map.keySet());
Collections.sort(keys);
for (String key : keys) {
sb.append(key);
sb.append(':');
sb.append(map.get(key));
sb.append(',');
}
return sb.toString();
}
}

View File

@ -47,7 +47,7 @@ public class AbstractClientTest {
@Before
public void setUp() {
abstractClient = new MockAbstractClient();
abstractClient = new MockAbstractClient(0L);
service = Service.newService("ns1", "group1", "serviceName001");
instancePublishInfo = new InstancePublishInfo("127.0.0.1", 8890);
subscriber = new Subscriber("127.0.0.1:8848", "agent1", "appName", "127.0.0.1",

View File

@ -25,6 +25,10 @@ package com.alibaba.nacos.naming.core.v2.client;
*/
public class MockAbstractClient extends AbstractClient {
public MockAbstractClient(Long revision) {
super(revision);
}
@Override
public String getClientId() {
return "-1";

View File

@ -32,7 +32,7 @@ public class ConnectionBasedClientTest {
@Before
public void setUp() throws Exception {
connectionBasedClient = new ConnectionBasedClient(connectionId, isNative);
connectionBasedClient = new ConnectionBasedClient(connectionId, isNative, null);
}
@Test

View File

@ -53,7 +53,7 @@ public class IpPortBasedClientTest {
@Before
public void setUp() throws Exception {
ipPortBasedClient = new IpPortBasedClient(clientId, true);
ipPortBasedClient = new IpPortBasedClient(clientId, true, 123L);
ipPortBasedClient.init();
instancePublishInfo = new InstancePublishInfo();
}
@ -84,6 +84,18 @@ public class IpPortBasedClientTest {
assertEquals(allInstancePublishInfo.iterator().next(), instancePublishInfo);
}
@Test
public void testRecalculateRevision() {
assertEquals(123L, ipPortBasedClient.getRevision());
assertEquals(-1531701243L, ipPortBasedClient.recalculateRevision());
}
@Test
public void testConstructor0() {
IpPortBasedClient client = new IpPortBasedClient(clientId, true);
assertEquals(0, client.getRevision());
}
@After
public void tearDown() {
ipPortBasedClient.release();

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager;
@ -86,18 +87,20 @@ public class ClientManagerDelegateTest {
@Test
public void testChooseEphemeralIpPortClient() {
delegate.verifyClient(ephemeralIpPortId);
verify(connectionBasedClientManager, never()).verifyClient(ephemeralIpPortId);
verify(ephemeralIpPortClientManager).verifyClient(ephemeralIpPortId);
verify(persistentIpPortClientManager, never()).verifyClient(ephemeralIpPortId);
DistroClientVerifyInfo verify = new DistroClientVerifyInfo(ephemeralIpPortId, 0);
delegate.verifyClient(verify);
verify(connectionBasedClientManager, never()).verifyClient(verify);
verify(ephemeralIpPortClientManager).verifyClient(verify);
verify(persistentIpPortClientManager, never()).verifyClient(verify);
}
@Test
public void testChoosePersistentIpPortClient() {
delegate.verifyClient(persistentIpPortId);
verify(connectionBasedClientManager, never()).verifyClient(persistentIpPortId);
verify(ephemeralIpPortClientManager, never()).verifyClient(persistentIpPortId);
verify(persistentIpPortClientManager).verifyClient(persistentIpPortId);
DistroClientVerifyInfo verify = new DistroClientVerifyInfo(persistentIpPortId, 0);
delegate.verifyClient(verify);
verify(connectionBasedClientManager, never()).verifyClient(verify);
verify(ephemeralIpPortClientManager, never()).verifyClient(verify);
verify(persistentIpPortClientManager).verifyClient(verify);
}
@Test

View File

@ -19,14 +19,19 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMeta;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.env.MockEnvironment;
import java.util.Collection;
@ -54,6 +59,11 @@ public class ConnectionBasedClientManagerTest {
@Mock
private ClientAttributes clientAttributes;
@BeforeClass
public static void setUpBeforeClass() {
EnvUtil.setEnvironment(new MockEnvironment());
}
@Before
public void setUp() throws Exception {
connectionBasedClientManager = new ConnectionBasedClientManager();
@ -62,8 +72,9 @@ public class ConnectionBasedClientManagerTest {
when(connection.getMetaInfo()).thenReturn(connectionMeta);
when(connectionMeta.getLabel(RemoteConstants.LABEL_MODULE)).thenReturn(RemoteConstants.LABEL_MODULE_NAMING);
when(clientAttributes.getClientAttribute(ClientConstants.REVISION, 0)).thenReturn(0);
assertTrue(connectionBasedClientManager.syncClientConnected(connectionId, clientAttributes));
assertTrue(connectionBasedClientManager.verifyClient(connectionId));
assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0)));
connectionBasedClientManager.clientConnected(connection);
}
@ -72,16 +83,16 @@ public class ConnectionBasedClientManagerTest {
public void testAllClientId() {
Collection<String> allClientIds = connectionBasedClientManager.allClientId();
assertEquals(1, allClientIds.size());
assertTrue(connectionBasedClientManager.verifyClient(connectionId));
assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0)));
assertTrue(allClientIds.contains(connectionId));
}
@Test
public void testContainsConnectionId() {
assertTrue(connectionBasedClientManager.verifyClient(connectionId));
assertTrue(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(connectionId, 0)));
assertTrue(connectionBasedClientManager.contains(connectionId));
String unUsedClientId = "127.0.0.1:8888#true";
assertFalse(connectionBasedClientManager.verifyClient(unUsedClientId));
assertFalse(connectionBasedClientManager.verifyClient(new DistroClientVerifyInfo(unUsedClientId, 0)));
assertFalse(connectionBasedClientManager.contains(unUsedClientId));
}

View File

@ -16,6 +16,8 @@
package com.alibaba.nacos.naming.core.v2.client.manager.impl;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
@ -67,7 +69,9 @@ public class EphemeralIpPortClientManagerTest {
public void setUp() throws Exception {
ephemeralIpPortClientManager = new EphemeralIpPortClientManager(distroMapper, switchDomain);
when(client.getClientId()).thenReturn(ephemeralIpPortId);
when(client.getRevision()).thenReturn(1320L);
ephemeralIpPortClientManager.clientConnected(client);
when(attributes.getClientAttribute(ClientConstants.REVISION, 0)).thenReturn(5120);
ephemeralIpPortClientManager.syncClientConnected(syncedClientId, attributes);
}
@ -92,4 +96,18 @@ public class EphemeralIpPortClientManagerTest {
String unUsedClientId = "127.0.0.1:8888#true";
assertFalse(ephemeralIpPortClientManager.contains(unUsedClientId));
}
@Test
public void testVerifyClient0() {
assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 0)));
assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 0)));
}
@Test
public void testVerifyClient() {
assertFalse(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 1)));
assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(ephemeralIpPortId, 1320)));
assertFalse(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 1)));
assertTrue(ephemeralIpPortClientManager.verifyClient(new DistroClientVerifyInfo(syncedClientId, 5120)));
}
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager.impl;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import org.junit.After;
@ -85,7 +86,7 @@ public class PersistentIpPortClientManagerTest {
@Test(expected = UnsupportedOperationException.class)
public void makeSureNoVerify() {
persistentIpPortClientManager.verifyClient(clientId);
persistentIpPortClientManager.verifyClient(new DistroClientVerifyInfo(clientId, 0));
}
@Test

View File

@ -0,0 +1,173 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.utils;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import static com.alibaba.nacos.api.common.Constants.DEFAULT_CLUSTER_NAME;
import static org.junit.Assert.assertEquals;
/**
* Tests for {@link DistroUtils}.
*
* @author Pixy Yuan
* on 2021/10/9
*/
public class DistroUtilsTest {
private static final String NAMESPACE = "testNamespace-";
private static final String GROUP = "testGroup-";
private static final String SERVICE = "testName-";
private static final int N = 100000;
private IpPortBasedClient client0;
private IpPortBasedClient client1;
@Before
public void setUp() throws Exception {
client0 = buildClient("127.0.0.1", 8848, false, true, DEFAULT_CLUSTER_NAME,
null);
HashMap<String, Object> metadata = new HashMap<>();
metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L);
metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, false);
metadata.put("Custom.metadataId1", "abc");
metadata.put("Custom.metadataId2", 123);
metadata.put("Custom.metadataId3", null);
client1 = buildClient("127.0.0.2", 8848, true, true, "cluster1",
metadata, 20);
}
private IpPortBasedClient buildClient(String ip, int port, boolean ephemeral, boolean healthy, String cluster,
HashMap<String, Object> extendDatum) {
return buildClient(ip, port, ephemeral, healthy, cluster, extendDatum, 1);
}
private IpPortBasedClient buildClient(String ip, int port, boolean ephemeral, boolean healthy, String cluster,
HashMap<String, Object> extendDatum, int serviceCount) {
InstancePublishInfo instance = new InstancePublishInfo(ip, port);
instance.setCluster(cluster);
instance.setHealthy(healthy);
IpPortBasedClient client = new IpPortBasedClient(String.format("%s:%s#%s", ip, port, ephemeral), ephemeral);
if (extendDatum != null) {
instance.setExtendDatum(extendDatum);
}
for (int i = 1; i < serviceCount + 1; i++) {
client.putServiceInstance(Service.newService(DistroUtilsTest.NAMESPACE + i,
DistroUtilsTest.GROUP + i, DistroUtilsTest.SERVICE + i, ephemeral),
instance);
}
return client;
}
@Test
public void testHash0() {
assertEquals(-1320954445, DistroUtils.hash(client0));
}
@Test
public void testRevision0() {
assertEquals(-1713189600L, DistroUtils.stringHash(client0));
}
@Test
public void testChecksum0() {
for (int i = 0; i < 3; i++) {
assertEquals("2a3f62f84a4b6f2a979434276d546ac1", DistroUtils.checksum(client0));
}
}
@Test
public void testBuildUniqueString0() {
assertEquals("127.0.0.1:8848#false|testNamespace-1##testGroup-1@@testName-1##false_127.0.0.1:8848_1.0_true_true_DEFAULT_,",
DistroUtils.buildUniqueString(client0));
}
@Test
public void testBuildUniqueString1() {
HashMap<String, Object> metadata = new HashMap<>();
metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L);
metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, false);
metadata.put("Custom.metadataId1", "abc");
metadata.put("Custom.metadataId2", 123);
metadata.put("Custom.metadataId3", null);
Client client = buildClient("128.0.0.1", 8848, false, false, DEFAULT_CLUSTER_NAME,
metadata);
assertEquals("128.0.0.1:8848#false|"
+ "testNamespace-1##testGroup-1@@testName-1##false_128.0.0.1:8848_2.0_false_false_DEFAULT_"
+ "Custom.metadataId1:abc,Custom.metadataId2:123,Custom.metadataId3:null,"
+ "publishInstanceEnable:false,publishInstanceWeight:2,,",
DistroUtils.buildUniqueString(client));
assertEquals(2128732271L, DistroUtils.stringHash(client));
assertEquals("ac9bf94dc4bd6a35e5ff9734868eafea", DistroUtils.checksum(client));
}
@Test
public void testBuildUniqueString2() {
HashMap<String, Object> metadata = new HashMap<>();
metadata.put(Constants.PUBLISH_INSTANCE_WEIGHT, 2L);
metadata.put(Constants.PUBLISH_INSTANCE_ENABLE, true);
metadata.put("Custom.metadataId1", "abc");
Client client = buildClient("128.0.0.2", 7001, true, false, "cluster1",
metadata);
assertEquals("128.0.0.2:7001#true|"
+ "testNamespace-1##testGroup-1@@testName-1##true_128.0.0.2:7001_2.0_false_true_cluster1_"
+ "Custom.metadataId1:abc,publishInstanceEnable:true,publishInstanceWeight:2,,",
DistroUtils.buildUniqueString(client));
assertEquals(775352583L, DistroUtils.stringHash(client));
assertEquals("82d8e086a880f088320349b895b22948", DistroUtils.checksum(client));
}
@Test
public void performanceTestOfChecksum() {
long start = System.nanoTime();
for (int i = 0; i < N; i++) {
DistroUtils.checksum(client1);
}
System.out.printf("Distro Verify Checksum Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N);
}
@Test
public void performanceTestOfStringHash() {
long start = System.nanoTime();
for (int i = 0; i < N; i++) {
DistroUtils.stringHash(client1);
}
System.out.printf("Distro Verify Revision Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N);
}
@Test
public void performanceTestOfHash() {
long start = System.nanoTime();
for (int i = 0; i < N; i++) {
DistroUtils.hash(client1);
}
System.out.printf("Distro Verify Hash Performance: %.2f ivk/ns\n", ((double) System.nanoTime() - start) / N);
}
}