[Refactor] Remove KvStorage and ConsistencyService. (#12489)

* Remove KvStorage and ConsistencyService.

* Fix switch manager dead lock until timeout.

* Ignore env vote leader effect in IT.

* Wait leader voted before start do test case.
This commit is contained in:
杨翊 SionYang 2024-08-15 17:10:05 +08:00 committed by GitHub
parent 7133411bc2
commit bbed5a4eb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 393 additions and 2497 deletions

View File

@ -1,51 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.storage.kv.FileKvStorage;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.storage.kv.MemoryKvStorage;
/**
* Ket-value Storage factory.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class StorageFactory {
/**
* Create {@link KvStorage} implementation.
*
* @param type type of {@link KvStorage}
* @param label label for {@code RocksStorage}
* @param baseDir base dir of storage file.
* @return implementation of {@link KvStorage}
* @throws Exception exception during creating {@link KvStorage}
*/
public static KvStorage createKvStorage(KvStorage.KvType type, final String label, final String baseDir)
throws Exception {
switch (type) {
case File:
return new FileKvStorage(baseDir);
case Memory:
return new MemoryKvStorage();
default:
throw new IllegalArgumentException("this kv type : [" + type.name() + "] not support");
}
}
}

View File

@ -1,195 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage.kv;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.sys.utils.DiskUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Kv storage based on file system. // TODO 写文件的方式需要优化
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class FileKvStorage implements KvStorage {
private final String baseDir;
/**
* Ensure that a consistent view exists when implementing file copies.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
public FileKvStorage(String baseDir) throws IOException {
this.baseDir = baseDir;
DiskUtils.forceMkdir(baseDir);
}
@Override
public byte[] get(byte[] key) throws KvStorageException {
readLock.lock();
try {
final String fileName = new String(key);
File file = Paths.get(baseDir, fileName).toFile();
if (file.exists()) {
return DiskUtils.readFileBytes(file);
}
return null;
} finally {
readLock.unlock();
}
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KvStorageException {
readLock.lock();
try {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = get(key);
if (val != null) {
result.put(key, val);
}
}
return result;
} finally {
readLock.unlock();
}
}
@Override
public void put(byte[] key, byte[] value) throws KvStorageException {
readLock.lock();
try {
final String fileName = new String(key);
File file = Paths.get(baseDir, fileName).toFile();
try {
DiskUtils.touch(file);
DiskUtils.writeFile(file, value, false);
} catch (IOException e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError, e);
}
} finally {
readLock.unlock();
}
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KvStorageException {
readLock.lock();
try {
if (keys.size() != values.size()) {
throw new KvStorageException(ErrorCode.KVStorageBatchWriteError,
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i++) {
put(keys.get(i), values.get(i));
}
} finally {
readLock.unlock();
}
}
@Override
public void delete(byte[] key) throws KvStorageException {
readLock.lock();
try {
final String fileName = new String(key);
DiskUtils.deleteFile(baseDir, fileName);
} finally {
readLock.unlock();
}
}
@Override
public void batchDelete(List<byte[]> keys) throws KvStorageException {
readLock.lock();
try {
for (byte[] key : keys) {
delete(key);
}
} finally {
readLock.unlock();
}
}
@Override
public void doSnapshot(String backupPath) throws KvStorageException {
writeLock.lock();
try {
File srcDir = Paths.get(baseDir).toFile();
File descDir = Paths.get(backupPath).toFile();
DiskUtils.copyDirectory(srcDir, descDir);
} catch (IOException e) {
throw new KvStorageException(ErrorCode.IOCopyDirError, e);
} finally {
writeLock.unlock();
}
}
@Override
public void snapshotLoad(String path) throws KvStorageException {
writeLock.lock();
try {
File srcDir = Paths.get(path).toFile();
// If snapshot path is non-exist, means snapshot is empty
if (srcDir.exists()) {
// First clean up the local file information, before the file copy
DiskUtils.deleteDirThenMkdir(baseDir);
File descDir = Paths.get(baseDir).toFile();
DiskUtils.copyDirectory(srcDir, descDir);
}
} catch (IOException e) {
throw new KvStorageException(ErrorCode.IOCopyDirError, e);
} finally {
writeLock.unlock();
}
}
@Override
public List<byte[]> allKeys() throws KvStorageException {
List<byte[]> result = new LinkedList<>();
File[] files = new File(baseDir).listFiles();
if (null != files) {
for (File each : files) {
if (each.isFile()) {
result.add(ByteUtils.toBytes(each.getName()));
}
}
}
return result;
}
@Override
public void shutdown() {
}
}

View File

@ -1,130 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage.kv;
import com.alibaba.nacos.core.exception.KvStorageException;
import java.util.List;
import java.util.Map;
/**
* Universal KV storage interface.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public interface KvStorage {
enum KvType {
/**
* Local file storage.
*/
File,
/**
* Local memory storage.
*/
Memory,
/**
* RocksDB storage.
*/
RocksDB,
}
/**
* get data by key.
*
* @param key byte[]
* @return byte[]
* @throws KvStorageException KVStorageException
*/
byte[] get(byte[] key) throws KvStorageException;
/**
* batch get by List byte[].
*
* @param keys List byte[]
* @return Map byte[], byte[]
* @throws KvStorageException KvStorageException
*/
Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KvStorageException;
/**
* write data.
*
* @param key byte[]
* @param value byte[]
* @throws KvStorageException KvStorageException
*/
void put(byte[] key, byte[] value) throws KvStorageException;
/**
* batch write.
*
* @param keys List byte[]
* @param values List byte[]
* @throws KvStorageException KvStorageException
*/
void batchPut(List<byte[]> keys, List<byte[]> values) throws KvStorageException;
/**
* delete with key.
*
* @param key byte[]
* @throws KvStorageException KvStorageException
*/
void delete(byte[] key) throws KvStorageException;
/**
* batch delete with keys.
*
* @param keys List byte[]
* @throws KvStorageException KvStorageException
*/
void batchDelete(List<byte[]> keys) throws KvStorageException;
/**
* do snapshot.
*
* @param backupPath snapshot file save path
* @throws KvStorageException KVStorageException
*/
void doSnapshot(final String backupPath) throws KvStorageException;
/**
* load snapshot.
*
* @param path The path to the snapshot file
* @throws KvStorageException KVStorageException
*/
void snapshotLoad(String path) throws KvStorageException;
/**
* Get all keys.
*
* @return all keys
* @throws KvStorageException KVStorageException
*/
List<byte[]> allKeys() throws KvStorageException;
/**
* shutdown.
*/
void shutdown();
}

View File

@ -1,140 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage.kv;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alipay.sofa.jraft.util.BytesUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* Realization of KV storage based on memory.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class MemoryKvStorage implements KvStorage {
private final Map<Key, byte[]> storage = new ConcurrentSkipListMap<>();
@Override
public byte[] get(byte[] key) throws KvStorageException {
return storage.get(new Key(key));
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KvStorageException {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = storage.get(new Key(key));
if (val != null) {
result.put(key, val);
}
}
return result;
}
@Override
public void put(byte[] key, byte[] value) throws KvStorageException {
storage.put(new Key(key), value);
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KvStorageException {
if (keys.size() != values.size()) {
throw new KvStorageException(ErrorCode.KVStorageBatchWriteError.getCode(),
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i++) {
storage.put(new Key(keys.get(i)), values.get(i));
}
}
@Override
public void delete(byte[] key) throws KvStorageException {
storage.remove(new Key(key));
}
@Override
public void batchDelete(List<byte[]> keys) throws KvStorageException {
for (byte[] key : keys) {
storage.remove(new Key(key));
}
}
@Override
public void doSnapshot(String backupPath) throws KvStorageException {
throw new UnsupportedOperationException();
}
@Override
public void snapshotLoad(String path) throws KvStorageException {
throw new UnsupportedOperationException();
}
@Override
public List<byte[]> allKeys() throws KvStorageException {
List<byte[]> result = new LinkedList<>();
for (Key each : storage.keySet()) {
result.add(each.origin);
}
return result;
}
@Override
public void shutdown() {
storage.clear();
}
private static class Key implements Comparable<Key> {
private final byte[] origin;
private Key(byte[] origin) {
this.origin = origin;
}
@Override
public int compareTo(Key o) {
return BytesUtil.compare(origin, o.origin);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key key = (Key) o;
return Arrays.equals(origin, key.origin);
}
@Override
public int hashCode() {
return Arrays.hashCode(origin);
}
}
}

View File

@ -1,154 +0,0 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.FileKvStorage;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
/**
* {@link FileKvStorage} unit tests.
*
* @author chenglu
* @date 2021-06-10 18:27
*/
class FileKvStorageTest {
private KvStorage kvStorage;
private String baseDir;
@AfterAll
static void clean() {
String dir = System.getProperty("user.home") + File.separator + "nacos_file_kv_storage_test_brotherluxcq";
String backupDir = System.getProperty("user.home") + File.separator + "nacos_file_kv_storage_test_backup_brotherluxcq";
try {
FileUtils.deleteDirectory(new File(dir));
FileUtils.deleteDirectory(new File(backupDir));
} catch (IOException e) {
e.printStackTrace();
}
}
@BeforeEach
void init() {
try {
baseDir = System.getProperty("user.home");
String dir = baseDir + File.separator + "nacos_file_kv_storage_test_brotherluxcq";
kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, null, dir);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testPutAndGetAndDelete() {
try {
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
kvStorage.put(key, value);
byte[] value1 = kvStorage.get(key);
assertArrayEquals(value, value1);
assertNotNull(kvStorage.allKeys());
kvStorage.delete(key);
assertNull(kvStorage.get(key));
kvStorage.put(key, value);
kvStorage.shutdown();
assertEquals(kvStorage.allKeys().size(), kvStorage.allKeys().size());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testBatchPutAndGet() {
try {
List<byte[]> keys = Arrays.asList("key1".getBytes(), "key2".getBytes());
List<byte[]> values = Arrays.asList("value1".getBytes(), "value2".getBytes());
kvStorage.batchPut(keys, values);
Map<byte[], byte[]> res = kvStorage.batchGet(keys);
assertNotNull(res);
res.forEach((key, value) -> {
if (Arrays.equals(key, "key1".getBytes())) {
assertArrayEquals("value1".getBytes(), value);
} else if (Arrays.equals(key, "key2".getBytes())) {
assertArrayEquals("value2".getBytes(), value);
} else {
fail();
}
});
kvStorage.batchDelete(keys);
assertEquals(0, kvStorage.batchGet(values).size());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testSnapshot() {
String backupDir = baseDir + File.separator + "nacos_file_kv_storage_test_backup_brotherluxcq";
try {
File file = new File(backupDir);
if (!file.exists()) {
boolean dirResult = file.mkdirs();
if (!dirResult) {
return;
}
}
kvStorage.doSnapshot(backupDir);
} catch (KvStorageException e) {
e.printStackTrace();
fail();
}
try {
kvStorage.snapshotLoad(backupDir);
byte[] key = "key".getBytes();
byte[] value = kvStorage.get(key);
assertArrayEquals("value".getBytes(), value);
} catch (KvStorageException e) {
e.printStackTrace();
fail();
}
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.storage.kv.MemoryKvStorage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* {@link MemoryKvStorage} unit tests.
*
* @author chenglu
* @date 2021-06-10 18:02
*/
class MemoryKvStorageTest {
private KvStorage kvStorage;
@BeforeEach
void init() {
try {
kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.Memory, null, null);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testPutAndGetAndDelete() {
try {
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
kvStorage.put(key, value);
byte[] value1 = kvStorage.get(key);
assertArrayEquals(value, value1);
assertNotNull(kvStorage.allKeys());
kvStorage.delete(key);
assertNull(kvStorage.get(key));
kvStorage.put(key, value);
kvStorage.shutdown();
assertEquals(0, kvStorage.allKeys().size());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testBatchPutAndGet() {
try {
List<byte[]> keys = Arrays.asList("key1".getBytes(), "key2".getBytes());
List<byte[]> values = Arrays.asList("value1".getBytes(), "value2".getBytes());
kvStorage.batchPut(keys, values);
Map<byte[], byte[]> res = kvStorage.batchGet(keys);
assertNotNull(res);
res.forEach((key, value) -> {
if (Arrays.equals(key, "key1".getBytes())) {
assertArrayEquals("value1".getBytes(), value);
} else if (Arrays.equals(key, "key2".getBytes())) {
assertArrayEquals("value2".getBytes(), value);
} else {
fail();
}
});
kvStorage.batchDelete(keys);
assertEquals(0, kvStorage.batchGet(values).size());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
void testSnapshot() {
try {
kvStorage.doSnapshot("/");
} catch (Exception e) {
assertTrue(e instanceof UnsupportedOperationException);
}
try {
kvStorage.snapshotLoad("/");
} catch (Exception e) {
assertTrue(e instanceof UnsupportedOperationException);
}
}
}

View File

@ -1,59 +0,0 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.storage.kv.FileKvStorage;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.storage.kv.MemoryKvStorage;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* {@link StorageFactory} unit tests.
*
* @author chenglu
* @date 2021-06-10 17:55
*/
class StorageFactoryTest {
@Test
void testCreateKvStorage() {
try {
KvStorage kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.Memory, "", "/");
assertTrue(kvStorage instanceof MemoryKvStorage);
} catch (Exception e) {
e.printStackTrace();
fail();
}
try {
KvStorage kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "", "/");
assertTrue(kvStorage instanceof FileKvStorage);
} catch (Exception e) {
e.printStackTrace();
fail();
}
try {
StorageFactory.createKvStorage(KvStorage.KvType.RocksDB, "", "/");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}
}

View File

@ -16,14 +16,15 @@
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alipay.sofa.jraft.RouteTable;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Optional;
/**
@ -35,14 +36,14 @@ import java.util.Optional;
@Service
public class ServerStatusManager {
@Resource(name = "persistentConsistencyServiceDelegate")
private ConsistencyService consistencyService;
private final ProtocolManager protocolManager;
private final SwitchDomain switchDomain;
private ServerStatus serverStatus = ServerStatus.STARTING;
public ServerStatusManager(SwitchDomain switchDomain) {
public ServerStatusManager(ProtocolManager protocolManager, SwitchDomain switchDomain) {
this.protocolManager = protocolManager;
this.switchDomain = switchDomain;
}
@ -58,19 +59,30 @@ public class ServerStatusManager {
return;
}
if (consistencyService.isAvailable()) {
if (hasLeader()) {
serverStatus = ServerStatus.UP;
} else {
serverStatus = ServerStatus.DOWN;
}
}
private boolean hasLeader() {
if (protocolManager.getCpProtocol() == null) {
return false;
}
return null != RouteTable.getInstance().selectLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP);
}
public ServerStatus getServerStatus() {
return serverStatus;
}
public Optional<String> getErrorMsg() {
return consistencyService.getErrorMsg();
if (hasLeader()) {
return Optional.empty();
}
return Optional.of("No leader for raft group " + Constants.NAMING_PERSISTENT_SERVICE_GROUP
+ ", please see logs `alipay-jraft.log` or `naming-raft.log` to see details.");
}
public class ServerStatusUpdater implements Runnable {

View File

@ -1,95 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.pojo.Record;
import java.util.Optional;
/**
* Consistence service for all implementations to derive.
*
* <p>We announce this consistency service to decouple the specific consistency implementation with business logic.
* User should not be aware of what consistency protocol is being used.
*
* <p>In this way, we also provide space for user to extend the underlying consistency protocols, as long as they obey
* our consistency baseline.
*
* @author nkorange
* @since 1.0.0
*/
public interface ConsistencyService {
/**
* Put a data related to a key to Nacos cluster.
*
* @param key key of data, this key should be globally unique
* @param value value of data
* @throws NacosException nacos exception
*/
void put(String key, Record value) throws NacosException;
/**
* Remove a data from Nacos cluster.
*
* @param key key of data
* @throws NacosException nacos exception
*/
void remove(String key) throws NacosException;
/**
* Get a data from Nacos cluster.
*
* @param key key of data
* @return data related to the key
* @throws NacosException nacos exception
*/
Datum get(String key) throws NacosException;
/**
* Listen for changes of a data.
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException nacos exception
*/
void listen(String key, RecordListener listener) throws NacosException;
/**
* Cancel listening of a data.
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException nacos exception
*/
void unListen(String key, RecordListener listener) throws NacosException;
/**
* Get the error message of the consistency protocol.
*
* @return the consistency protocol error message.
*/
Optional<String> getErrorMsg();
/**
* Tell the status of this consistency service.
*
* @return true if available
*/
boolean isAvailable();
}

View File

@ -1,60 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.naming.pojo.Record;
/**
* Data listener public interface.
*
* @author nacos
*/
public interface RecordListener<T extends Record> {
/**
* Determine if the listener was registered with this key.
*
* @param key candidate key
* @return true if the listener was registered with this key
*/
boolean interests(String key);
/**
* Determine if the listener is to be removed by matching the 'key'.
*
* @param key key to match
* @return true if match success
*/
boolean matchUnlistenKey(String key);
/**
* Action to do if data of target key has changed.
*
* @param key target key
* @param value data of the key
* @throws Exception exception
*/
void onChange(String key, T value) throws Exception;
/**
* Action to do if data of target key has been removed.
*
* @param key target key
* @throws Exception exception
*/
void onDelete(String key) throws Exception;
}

View File

@ -1,88 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.pojo.Record;
/**
* The value changes events. //TODO Recipients need to implement the ability to receive batch events
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class ValueChangeEvent extends Event {
private final String key;
private final Record value;
private final DataOperation action;
public ValueChangeEvent(String key, Record value, DataOperation action) {
this.key = key;
this.value = value;
this.action = action;
}
public String getKey() {
return key;
}
public Record getValue() {
return value;
}
public DataOperation getAction() {
return action;
}
public static ValueChangeEventBuilder builder() {
return new ValueChangeEventBuilder();
}
public static final class ValueChangeEventBuilder {
private String key;
private Record value;
private DataOperation action;
private ValueChangeEventBuilder() {
}
public ValueChangeEventBuilder key(String key) {
this.key = key;
return this;
}
public ValueChangeEventBuilder value(Record value) {
this.value = value;
return this;
}
public ValueChangeEventBuilder action(DataOperation action) {
this.action = action;
return this;
}
public ValueChangeEvent build() {
return new ValueChangeEvent(key, value, action);
}
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
/**
* A consistency service that guarantee CP consistency for the published data.
*
* <p>CP consistency is hereby defined as follows:
*
* <p>Once the writing operation returned client a success, the data within the operation is guaranteed to be
* successfully written to the cluster. And the data should be consistent between servers after some time without any
* outside interfere.
*
* @author nkorange
* @since 1.0.0
*/
public interface PersistentConsistencyService extends ConsistencyService {
}

View File

@ -1,91 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.impl.StandalonePersistentServiceProcessor;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* Persistent consistency service delegate.
*
* @author xiweng.yy
*/
@DependsOn("ProtocolManager")
@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {
private final BasePersistentServiceProcessor persistentServiceProcessor;
public PersistentConsistencyServiceDelegateImpl(ProtocolManager protocolManager) throws Exception {
this.persistentServiceProcessor = createPersistentServiceProcessor(protocolManager);
}
@Override
public void put(String key, Record value) throws NacosException {
persistentServiceProcessor.put(key, value);
}
@Override
public void remove(String key) throws NacosException {
persistentServiceProcessor.remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return persistentServiceProcessor.get(key);
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
persistentServiceProcessor.listen(key, listener);
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
persistentServiceProcessor.unListen(key, listener);
}
@Override
public boolean isAvailable() {
return persistentServiceProcessor.isAvailable();
}
@Override
public Optional<String> getErrorMsg() {
return persistentServiceProcessor.getErrorMsg();
}
private BasePersistentServiceProcessor createPersistentServiceProcessor(ProtocolManager protocolManager)
throws Exception {
final BasePersistentServiceProcessor processor =
EnvUtil.getStandaloneMode() ? new StandalonePersistentServiceProcessor()
: new PersistentServiceProcessor(protocolManager);
processor.afterConstruct();
return processor;
}
}

View File

@ -1,121 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* persistent notifier, It is responsible for notifying interested listeners of all write changes to the data.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>(32);
private final Function<String, Record> find;
public PersistentNotifier(Function<String, Record> find) {
this.find = find;
}
/**
* register listener with key.
*
* @param key key
* @param listener {@link RecordListener}
*/
public void registerListener(final String key, final RecordListener listener) {
listenerMap.computeIfAbsent(key, s -> new ConcurrentHashSet<>()).add(listener);
}
/**
* deregister listener by key.
*
* @param key key
* @param listener {@link RecordListener}
*/
public void deregisterListener(final String key, final RecordListener listener) {
if (!listenerMap.containsKey(key)) {
return;
}
listenerMap.get(key).remove(listener);
}
/**
* deregister all listener by key.
*
* @param key key
*/
public void deregisterAllListener(final String key) {
listenerMap.remove(key);
}
public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
return listenerMap;
}
/**
* notify value to listener with {@link DataOperation} and key.
*
* @param key key
* @param action {@link DataOperation}
* @param value value
* @param <T> type
*/
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (!listenerMap.containsKey(key)) {
return;
}
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
@Override
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}
@Override
public Class<? extends Event> subscribeType() {
return ValueChangeEvent.class;
}
}

View File

@ -1,265 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.TypeUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.google.protobuf.ByteString;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* New service data persistence handler.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP
implements PersistentConsistencyService {
enum Op {
/**
* write ops.
*/
Write("Write"),
/**
* read ops.
*/
Read("Read"),
/**
* delete ops.
*/
Delete("Delete");
protected final String desc;
Op(String desc) {
this.desc = desc;
}
}
protected final KvStorage kvStorage;
protected final Serializer serializer;
/**
* Whether an unrecoverable error occurred.
*/
protected volatile boolean hasError = false;
protected volatile String jRaftErrorMsg;
/**
* If use old raft, should not notify listener even new listener add.
*/
protected volatile boolean startNotify = false;
/**
* During snapshot processing, the processing of other requests needs to be paused.
*/
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
protected final PersistentNotifier notifier;
protected final int queueMaxSize = 16384;
protected final int priority = 10;
public BasePersistentServiceProcessor() throws Exception {
this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> {
try {
byte[] data = kvStorage.get(ByteUtils.toBytes(key));
Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key));
return null != datum ? datum.value : null;
} catch (KvStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.getMessage());
}
});
}
@SuppressWarnings("unchecked")
public void afterConstruct() {
NotifyCenter.registerToPublisher(ValueChangeEvent.class, queueMaxSize);
}
@Override
public Response onRequest(ReadRequest request) {
final Lock lock = readLock;
lock.lock();
try {
final List<byte[]> keys = serializer
.deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class));
final Map<byte[], byte[]> result = kvStorage.batchGet(keys);
final BatchReadResponse response = new BatchReadResponse();
result.forEach(response::append);
return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response)))
.build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} catch (Exception e) {
Loggers.RAFT.warn("On read request failed, ", e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
} finally {
lock.unlock();
}
}
@Override
public Response onApply(WriteRequest request) {
final byte[] data = request.getData().toByteArray();
final Lock lock = readLock;
lock.lock();
try {
final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class);
final Op op = Op.valueOf(request.getOperation());
switch (op) {
case Write:
kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues());
break;
case Delete:
kvStorage.batchDelete(bwRequest.getKeys());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build();
}
publishValueChangeEvent(op, bwRequest);
return Response.newBuilder().setSuccess(true).build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} catch (Exception e) {
Loggers.RAFT.warn("On apply write request failed, ", e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
} finally {
lock.unlock();
}
}
private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) {
final List<byte[]> keys = request.getKeys();
final List<byte[]> values = request.getValues();
for (int i = 0; i < keys.size(); i++) {
final String key = new String(keys.get(i));
// Ignore old 1.x version data
if (!KeyBuilder.matchSwitchKey(key)) {
continue;
}
final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key));
final Record value = null != datum ? datum.value : null;
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
}
}
@Override
public String group() {
return Constants.NAMING_PERSISTENT_SERVICE_GROUP;
}
@Override
public List<SnapshotOperation> loadSnapshotOperate() {
return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock, serializer));
}
@Override
public void onError(Throwable error) {
super.onError(error);
hasError = true;
jRaftErrorMsg = error.getMessage();
}
protected Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
protected Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) {
return com.alibaba.nacos.naming.misc.SwitchDomain.class;
}
return Record.class;
}
protected void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException {
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
notifierAllServiceMeta(listener);
} else {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
/**
* This notify should only notify once during startup.
*/
private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
for (byte[] each : kvStorage.allKeys()) {
String key = new String(each);
if (listener.interests(key)) {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
}
private void notifierDatum(String key, Datum datum, RecordListener listener) {
try {
listener.onChange(key, datum.value);
} catch (Exception e) {
Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e);
}
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.StorageFactory;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.storage.kv.MemoryKvStorage;
import com.alibaba.nacos.sys.utils.TimerContext;
import com.alibaba.nacos.naming.misc.Loggers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Kv storage implementation for naming.
*
* @author xiweng.yy
*/
public class NamingKvStorage extends MemoryKvStorage {
private static final String LOAD_SNAPSHOT = NamingKvStorage.class.getSimpleName() + ".snapshotLoad";
private static final String LABEL = "naming-persistent";
private final String baseDir;
private final KvStorage baseDirStorage;
public NamingKvStorage(final String baseDir) throws Exception {
this.baseDir = baseDir;
this.baseDirStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, LABEL, baseDir);
}
@Override
public byte[] get(byte[] key) throws KvStorageException {
// First get the data from the memory Cache
byte[] result = super.get(key);
if (null == result) {
try {
KvStorage storage = getStorage();
result = null == storage ? null : storage.get(key);
if (null != result) {
super.put(key, result);
}
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(),
"Get data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e);
}
}
return result;
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KvStorageException {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = get(key);
if (val != null) {
result.put(key, val);
}
}
return result;
}
@Override
public void put(byte[] key, byte[] value) throws KvStorageException {
try {
KvStorage storage = getStorage();
storage.put(key, value);
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(),
"Put data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e);
}
// after actual storage put success, put it in memory, memory put should success all the time
super.put(key, value);
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KvStorageException {
if (keys.size() != values.size()) {
throw new KvStorageException(ErrorCode.KVStorageBatchWriteError,
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i++) {
put(keys.get(i), values.get(i));
}
}
@Override
public void delete(byte[] key) throws KvStorageException {
try {
KvStorage storage = getStorage();
if (null != storage) {
storage.delete(key);
}
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageDeleteError.getCode(),
"Delete data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e);
}
// after actual storage delete success, put it in memory, memory delete should success all the time
super.delete(key);
}
@Override
public void batchDelete(List<byte[]> keys) throws KvStorageException {
for (byte[] each : keys) {
delete(each);
}
}
@Override
public void doSnapshot(String backupPath) throws KvStorageException {
baseDirStorage.doSnapshot(backupPath);
}
@Override
public void snapshotLoad(String path) throws KvStorageException {
TimerContext.start(LOAD_SNAPSHOT);
try {
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
} finally {
TimerContext.end(LOAD_SNAPSHOT, Loggers.RAFT);
}
}
private void loadSnapshotFromActualStorage(KvStorage actualStorage) throws KvStorageException {
for (byte[] each : actualStorage.allKeys()) {
byte[] datum = actualStorage.get(each);
super.put(each, datum);
}
}
@Override
public List<byte[]> allKeys() throws KvStorageException {
return super.allKeys();
}
@Override
public void shutdown() {
baseDirStorage.shutdown();
super.shutdown();
}
private KvStorage getStorage() throws Exception {
return baseDirStorage;
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
/**
* Adapter old version data operation {@link com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor.Op}.
*
* @author xiweng.yy
*/
public enum OldDataOperation {
/**
* write ops.
*/
Write("Write"),
/**
* read ops.
*/
Read("Read"),
/**
* delete ops.
*/
Delete("Delete");
private final String desc;
OldDataOperation(String desc) {
this.desc = desc;
}
public String getDesc() {
return desc;
}
}

View File

@ -1,174 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.consistency.ProtocolMetaData;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.MetadataKey;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* In cluster mode, start the Raft protocol.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
private final CPProtocol protocol;
/**
* Is there a leader node currently.
*/
private volatile boolean hasLeader = false;
public PersistentServiceProcessor(ProtocolManager protocolManager) throws Exception {
this.protocol = protocolManager.getCpProtocol();
}
@Override
public void afterConstruct() {
super.afterConstruct();
String raftGroup = Constants.NAMING_PERSISTENT_SERVICE_GROUP;
this.protocol.protocolMetaData().subscribe(raftGroup, MetadataKey.LEADER_META_DATA, o -> {
if (!(o instanceof ProtocolMetaData.ValueItem)) {
return;
}
Object leader = ((ProtocolMetaData.ValueItem) o).getData();
hasLeader = StringUtils.isNotBlank(String.valueOf(leader));
Loggers.RAFT.info("Raft group {} has leader {}", raftGroup, leader);
});
this.protocol.addRequestProcessors(Collections.singletonList(this));
// If you choose to use the new RAFT protocol directly, there will be no compatible logical execution
if (EnvUtil.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) {
NotifyCenter.registerSubscriber(notifier);
waitLeader();
startNotify = true;
}
}
private void waitLeader() {
while (!hasLeader && !hasError) {
Loggers.RAFT.info("Waiting Jraft leader vote ...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
}
}
}
@Override
public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public void remove(String key) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY);
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public Datum get(String key) throws NacosException {
final List<byte[]> keys = new ArrayList<>(1);
keys.add(ByteUtils.toBytes(key));
final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setData(ByteString.copyFrom(serializer.serialize(keys))).build();
try {
Response resp = protocol.getData(req);
if (resp.getSuccess()) {
BatchReadResponse response = serializer
.deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
final List<byte[]> rValues = response.getValues();
return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
}
throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
} catch (Throwable e) {
throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage());
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
notifier.registerListener(key, listener);
if (startNotify) {
notifierDatumIfAbsent(key, listener);
}
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
notifier.deregisterListener(key, listener);
}
@Override
public boolean isAvailable() {
return hasLeader && !hasError;
}
@Override
public Optional<String> getErrorMsg() {
String errorMsg;
if (hasLeader && hasError) {
errorMsg = "The raft peer is in error: " + jRaftErrorMsg;
} else if (hasLeader && !hasError) {
errorMsg = null;
} else if (!hasLeader && hasError) {
errorMsg = "Could not find leader! And the raft peer is in error: " + jRaftErrorMsg;
} else {
errorMsg = "Could not find leader!";
}
return Optional.ofNullable(errorMsg);
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.pojo.Record;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* Persistent service manipulation layer in stand-alone mode.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class StandalonePersistentServiceProcessor extends BasePersistentServiceProcessor {
public StandalonePersistentServiceProcessor() throws Exception {
}
@Override
public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
onApply(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public void remove(String key) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY);
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
onApply(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public Datum get(String key) throws NacosException {
final List<byte[]> keys = Collections.singletonList(ByteUtils.toBytes(key));
final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setData(ByteString.copyFrom(serializer.serialize(keys))).build();
try {
final Response resp = onRequest(req);
if (resp.getSuccess()) {
BatchReadResponse response = serializer
.deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
final List<byte[]> rValues = response.getValues();
return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
}
throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
} catch (Throwable e) {
throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage());
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
notifier.registerListener(key, listener);
if (startNotify) {
notifierDatumIfAbsent(key, listener);
}
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
notifier.deregisterListener(key, listener);
}
@Override
public boolean isAvailable() {
return !hasError;
}
@Override
public Optional<String> getErrorMsg() {
String errorMsg;
if (hasError) {
errorMsg = "The raft peer is in error: " + jRaftErrorMsg;
} else {
errorMsg = null;
}
return Optional.ofNullable(errorMsg);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,56 +14,40 @@
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.TypeUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.snapshot.LocalFileMeta;
import com.alibaba.nacos.consistency.snapshot.Reader;
import com.alibaba.nacos.consistency.snapshot.Writer;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.consistency.persistent.impl.AbstractSnapshotOperation;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alipay.sofa.jraft.util.CRC64;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.Checksum;
/**
* Snapshot processing of persistent service data for accelerated Raft protocol recovery and data synchronization.
* Switch Domain snapshot operation.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author xiweng.yy
*/
public class NamingSnapshotOperation extends AbstractSnapshotOperation {
private static final String NAMING_SNAPSHOT_SAVE = NamingSnapshotOperation.class.getSimpleName() + ".SAVE";
private static final String NAMING_SNAPSHOT_LOAD = NamingSnapshotOperation.class.getSimpleName() + ".LOAD";
public class SwitchDomainSnapshotOperation extends AbstractSnapshotOperation {
private final String snapshotDir = "naming_persistent";
private final String snapshotArchive = "naming_persistent.zip";
private final KvStorage storage;
private final SwitchManager switchManager;
private final Serializer serializer;
public NamingSnapshotOperation(KvStorage storage, ReentrantReadWriteLock lock, Serializer serializer) {
public SwitchDomainSnapshotOperation(ReentrantReadWriteLock lock, SwitchManager switchManager,
Serializer serializer) {
super(lock);
this.storage = storage;
this.switchManager = switchManager;
this.serializer = serializer;
}
@ -74,7 +58,7 @@ public class NamingSnapshotOperation extends AbstractSnapshotOperation {
DiskUtils.deleteDirectory(parentPath);
DiskUtils.forceMkdir(parentPath);
storage.doSnapshot(parentPath);
this.switchManager.dumpSnapshot(parentPath);
final String outputFile = Paths.get(writePath, snapshotArchive).toString();
final Checksum checksum = new CRC64();
DiskUtils.compress(writePath, snapshotDir, outputFile, checksum);
@ -98,56 +82,19 @@ public class NamingSnapshotOperation extends AbstractSnapshotOperation {
}
}
final String loadPath = Paths.get(readerPath, snapshotDir).toString();
storage.snapshotLoad(loadPath);
// publish value change
publishValueChangeEvent();
Loggers.RAFT.info("snapshot load from : {}", loadPath);
this.switchManager.loadSnapshot(loadPath);
DiskUtils.deleteDirectory(loadPath);
return true;
}
/**
* publish value change event.
*
* @throws KvStorageException throw to invoker
*/
private void publishValueChangeEvent() throws KvStorageException {
List<byte[]> keys = storage.allKeys();
for (int i = 0; i < keys.size(); i++) {
String key = new String(keys.get(i));
// Ignore old 1.x version data
if (!KeyBuilder.matchSwitchKey(key)) {
continue;
}
Datum datum = serializer.deserialize(storage.get(keys.get(i)), getDatumTypeFromKey(key));
Record value = (datum != null) ? datum.value : null;
// report for refreshing <code>SwitchDomain</code> message
if (value != null) {
ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
}
}
}
private Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
private Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) {
return SwitchDomain.class;
}
return Record.class;
}
@Override
protected String getSnapshotSaveTag() {
return NAMING_SNAPSHOT_SAVE;
return SwitchDomainSnapshotOperation.class.getSimpleName() + ".SAVE";
}
@Override
protected String getSnapshotLoadTag() {
return NAMING_SNAPSHOT_LOAD;
return SwitchDomainSnapshotOperation.class.getSimpleName() + ".LOAD";
}
}

View File

@ -18,23 +18,43 @@ package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.common.utils.TypeUtils;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchReadResponse;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchWriteRequest;
import com.alibaba.nacos.naming.consistency.persistent.impl.OldDataOperation;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.google.protobuf.ByteString;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Switch manager.
@ -43,27 +63,36 @@ import java.util.concurrent.locks.ReentrantLock;
* @since 1.0.0
*/
@Component
public class SwitchManager implements RecordListener<SwitchDomain> {
public class SwitchManager extends RequestProcessor4CP {
@Autowired
private SwitchDomain switchDomain;
private final SwitchDomain switchDomain;
@Resource(name = "persistentConsistencyServiceDelegate")
private ConsistencyService consistencyService;
private final ProtocolManager protocolManager;
ReentrantLock lock = new ReentrantLock();
private final ReentrantReadWriteLock raftLock;
/**
* Init switch manager.
*/
@PostConstruct
public void init() {
private final ReentrantLock requestLock;
private final Serializer serializer;
private final SwitchDomainSnapshotOperation snapshotOperation;
private final File dataFile;
public SwitchManager(SwitchDomain switchDomain, ProtocolManager protocolManager) {
this.switchDomain = switchDomain;
this.protocolManager = protocolManager;
this.raftLock = new ReentrantReadWriteLock();
this.requestLock = new ReentrantLock();
this.serializer = SerializeFactory.getSerializer("JSON");
this.snapshotOperation = new SwitchDomainSnapshotOperation(this.raftLock, this, this.serializer);
this.dataFile = Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data", KeyBuilder.getSwitchDomainKey()).toFile();
try {
consistencyService.listen(KeyBuilder.getSwitchDomainKey(), this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen switch service failed.", e);
DiskUtils.forceMkdir(this.dataFile.getParent());
} catch (IOException e) {
Loggers.RAFT.error("Init Switch Domain directory failed: ", e);
}
protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this));
}
/**
@ -76,22 +105,15 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
*/
public void update(String entry, String value, boolean debug) throws Exception {
lock.lock();
this.requestLock.lock();
try {
Datum datum = consistencyService.get(KeyBuilder.getSwitchDomainKey());
SwitchDomain switchDomain;
if (datum != null && datum.value != null) {
switchDomain = (SwitchDomain) datum.value;
} else {
switchDomain = this.switchDomain.clone();
}
SwitchDomain tempSwitchDomain = this.switchDomain.clone();
if (SwitchEntry.BATCH.equals(entry)) {
//batch update
SwitchDomain dom = JacksonUtils.toObj(value, SwitchDomain.class);
dom.setEnableStandalone(switchDomain.isEnableStandalone());
dom.setEnableStandalone(tempSwitchDomain.isEnableStandalone());
if (dom.getHttpHealthParams().getMin() < SwitchDomain.HttpHealthParams.MIN_MIN
|| dom.getTcpHealthParams().getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) {
@ -110,7 +132,7 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
throw new IllegalArgumentException("malformed factor");
}
switchDomain = dom;
tempSwitchDomain = dom;
}
if (entry.equals(SwitchEntry.DISTRO_THRESHOLD)) {
@ -118,12 +140,12 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
if (threshold <= 0) {
throw new IllegalArgumentException("distroThreshold can not be zero or negative: " + threshold);
}
switchDomain.setDistroThreshold(threshold);
tempSwitchDomain.setDistroThreshold(threshold);
}
if (entry.equals(SwitchEntry.CLIENT_BEAT_INTERVAL)) {
long clientBeatInterval = Long.parseLong(value);
switchDomain.setClientBeatInterval(clientBeatInterval);
tempSwitchDomain.setClientBeatInterval(clientBeatInterval);
}
if (entry.equals(SwitchEntry.PUSH_VERSION)) {
@ -137,13 +159,13 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
}
if (StringUtils.equals(SwitchEntry.CLIENT_JAVA, type)) {
switchDomain.setPushJavaVersion(version);
tempSwitchDomain.setPushJavaVersion(version);
} else if (StringUtils.equals(SwitchEntry.CLIENT_PYTHON, type)) {
switchDomain.setPushPythonVersion(version);
tempSwitchDomain.setPushPythonVersion(version);
} else if (StringUtils.equals(SwitchEntry.CLIENT_C, type)) {
switchDomain.setPushCVersion(version);
tempSwitchDomain.setPushCVersion(version);
} else if (StringUtils.equals(SwitchEntry.CLIENT_GO, type)) {
switchDomain.setPushGoVersion(version);
tempSwitchDomain.setPushGoVersion(version);
} else {
throw new IllegalArgumentException("unsupported client type: " + type);
}
@ -156,7 +178,7 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
throw new IllegalArgumentException("min cache time for http or tcp is too small(<10000)");
}
switchDomain.setDefaultPushCacheMillis(cacheMillis);
tempSwitchDomain.setDefaultPushCacheMillis(cacheMillis);
}
// extremely careful while modifying this, cause it will affect all clients without pushing enabled
@ -167,27 +189,27 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
throw new IllegalArgumentException("min default cache time is too small(<1000)");
}
switchDomain.setDefaultCacheMillis(cacheMillis);
tempSwitchDomain.setDefaultCacheMillis(cacheMillis);
}
if (entry.equals(SwitchEntry.MASTERS)) {
List<String> masters = Arrays.asList(value.split(","));
switchDomain.setMasters(masters);
tempSwitchDomain.setMasters(masters);
}
if (entry.equals(SwitchEntry.DISTRO)) {
boolean enabled = Boolean.parseBoolean(value);
switchDomain.setDistroEnabled(enabled);
tempSwitchDomain.setDistroEnabled(enabled);
}
if (entry.equals(SwitchEntry.CHECK)) {
boolean enabled = Boolean.parseBoolean(value);
switchDomain.setHealthCheckEnabled(enabled);
tempSwitchDomain.setHealthCheckEnabled(enabled);
}
if (entry.equals(SwitchEntry.PUSH_ENABLED)) {
boolean enabled = Boolean.parseBoolean(value);
switchDomain.setPushEnabled(enabled);
tempSwitchDomain.setPushEnabled(enabled);
}
if (entry.equals(SwitchEntry.SERVICE_STATUS_SYNC_PERIOD)) {
@ -197,7 +219,7 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
throw new IllegalArgumentException("serviceStatusSynchronizationPeriodMillis is too small(<5000)");
}
switchDomain.setServiceStatusSynchronizationPeriodMillis(millis);
tempSwitchDomain.setServiceStatusSynchronizationPeriodMillis(millis);
}
if (entry.equals(SwitchEntry.SERVER_STATUS_SYNC_PERIOD)) {
@ -207,25 +229,25 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
throw new IllegalArgumentException("serverStatusSynchronizationPeriodMillis is too small(<15000)");
}
switchDomain.setServerStatusSynchronizationPeriodMillis(millis);
tempSwitchDomain.setServerStatusSynchronizationPeriodMillis(millis);
}
if (entry.equals(SwitchEntry.HEALTH_CHECK_TIMES)) {
int times = Integer.parseInt(value);
switchDomain.setCheckTimes(times);
tempSwitchDomain.setCheckTimes(times);
}
if (entry.equals(SwitchEntry.DISABLE_ADD_IP)) {
boolean disableAddIp = Boolean.parseBoolean(value);
switchDomain.setDisableAddIP(disableAddIp);
tempSwitchDomain.setDisableAddIP(disableAddIp);
}
if (entry.equals(SwitchEntry.SEND_BEAT_ONLY)) {
boolean sendBeatOnly = Boolean.parseBoolean(value);
switchDomain.setSendBeatOnly(sendBeatOnly);
tempSwitchDomain.setSendBeatOnly(sendBeatOnly);
}
if (entry.equals(SwitchEntry.LIMITED_URL_MAP)) {
@ -253,14 +275,14 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
}
switchDomain.setLimitedUrlMap(limitedUrlMap);
tempSwitchDomain.setLimitedUrlMap(limitedUrlMap);
}
}
if (entry.equals(SwitchEntry.ENABLE_STANDALONE)) {
if (!StringUtils.isNotEmpty(value)) {
switchDomain.setEnableStandalone(Boolean.parseBoolean(value));
tempSwitchDomain.setEnableStandalone(Boolean.parseBoolean(value));
}
}
@ -269,33 +291,33 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
if (Constants.NULL_STRING.equals(status)) {
status = StringUtils.EMPTY;
}
switchDomain.setOverriddenServerStatus(status);
tempSwitchDomain.setOverriddenServerStatus(status);
}
if (entry.equals(SwitchEntry.DEFAULT_INSTANCE_EPHEMERAL)) {
switchDomain.setDefaultInstanceEphemeral(Boolean.parseBoolean(value));
tempSwitchDomain.setDefaultInstanceEphemeral(Boolean.parseBoolean(value));
}
if (entry.equals(SwitchEntry.DISTRO_SERVER_EXPIRED_MILLIS)) {
switchDomain.setDistroServerExpiredMillis(Long.parseLong(value));
tempSwitchDomain.setDistroServerExpiredMillis(Long.parseLong(value));
}
if (entry.equals(SwitchEntry.LIGHT_BEAT_ENABLED)) {
switchDomain.setLightBeatEnabled(ConvertUtils.toBoolean(value));
tempSwitchDomain.setLightBeatEnabled(ConvertUtils.toBoolean(value));
}
if (entry.equals(SwitchEntry.AUTO_CHANGE_HEALTH_CHECK_ENABLED)) {
switchDomain.setAutoChangeHealthCheckEnabled(ConvertUtils.toBoolean(value));
tempSwitchDomain.setAutoChangeHealthCheckEnabled(ConvertUtils.toBoolean(value));
}
if (debug) {
update(switchDomain);
update(tempSwitchDomain);
} else {
consistencyService.put(KeyBuilder.getSwitchDomainKey(), switchDomain);
updateWithConsistency(tempSwitchDomain);
}
} finally {
lock.unlock();
this.requestLock.unlock();
}
}
@ -340,27 +362,145 @@ public class SwitchManager implements RecordListener<SwitchDomain> {
switchDomain.setLightBeatEnabled(newSwitchDomain.isLightBeatEnabled());
}
private void updateWithConsistency(SwitchDomain tempSwitchDomain) throws NacosException {
try {
final BatchWriteRequest req = new BatchWriteRequest();
String switchDomainKey = KeyBuilder.getSwitchDomainKey();
Datum datum = Datum.createDatum(switchDomainKey, tempSwitchDomain);
req.append(ByteUtils.toBytes(switchDomainKey), serializer.serialize(datum));
WriteRequest operationLog = WriteRequest.newBuilder().setGroup(group())
.setOperation(OldDataOperation.Write.getDesc()).setData(ByteString.copyFrom(serializer.serialize(req)))
.build();
protocolManager.getCpProtocol().write(operationLog);
} catch (Exception e) {
Loggers.RAFT.error("Submit switch domain failed: ", e);
throw new NacosException(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage());
}
}
public SwitchDomain getSwitchDomain() {
return switchDomain;
}
@Override
public boolean interests(String key) {
return KeyBuilder.matchSwitchKey(key);
public List<SnapshotOperation> loadSnapshotOperate() {
return Collections.singletonList(snapshotOperation);
}
/**
* Load Snapshot from snapshot dir.
*
* @param snapshotPath snapshot dir
*/
public void loadSnapshot(String snapshotPath) {
this.raftLock.writeLock().lock();
try {
File srcDir = Paths.get(snapshotPath).toFile();
// If snapshot path is non-exist, means snapshot is empty
if (srcDir.exists()) {
// First clean up the local file information, before the file copy
String baseDir = this.dataFile.getParent();
DiskUtils.deleteDirThenMkdir(baseDir);
File descDir = Paths.get(baseDir).toFile();
DiskUtils.copyDirectory(srcDir, descDir);
if (!this.dataFile.exists()) {
return;
}
byte[] snapshotData = DiskUtils.readFileBytes(this.dataFile);
final Datum datum = serializer.deserialize(snapshotData, getDatumType());
final Record value = null != datum ? datum.value : null;
if (!(value instanceof SwitchDomain)) {
return;
}
update((SwitchDomain) value);
}
} catch (IOException e) {
throw new NacosRuntimeException(ErrorCode.IOCopyDirError.getCode(), e);
} finally {
this.raftLock.writeLock().unlock();
}
}
/**
* Dump data from data dir to snapshot dir.
*
* @param backupPath snapshot dir
*/
public void dumpSnapshot(String backupPath) {
this.raftLock.writeLock().lock();
try {
File srcDir = Paths.get(this.dataFile.getParent()).toFile();
File descDir = Paths.get(backupPath).toFile();
DiskUtils.copyDirectory(srcDir, descDir);
} catch (IOException e) {
throw new NacosRuntimeException(ErrorCode.IOCopyDirError.getCode(), e);
} finally {
this.raftLock.writeLock().unlock();
}
}
@Override
public boolean matchUnlistenKey(String key) {
return KeyBuilder.matchSwitchKey(key);
public Response onRequest(ReadRequest request) {
this.raftLock.readLock().lock();
try {
final List<byte[]> keys = serializer.deserialize(request.getData().toByteArray(),
TypeUtils.parameterize(List.class, byte[].class));
if (isNotSwitchDomainKey(keys)) {
return Response.newBuilder().setSuccess(false).setErrMsg("not switch domain key").build();
}
Datum datum = Datum.createDatum(KeyBuilder.getSwitchDomainKey(), switchDomain);
final BatchReadResponse response = new BatchReadResponse();
response.append(ByteUtils.toBytes(KeyBuilder.getSwitchDomainKey()), serializer.serialize(datum));
return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response)))
.build();
} catch (Exception e) {
Loggers.RAFT.warn("On read switch domain failed, ", e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
} finally {
this.raftLock.readLock().unlock();
}
}
@Override
public void onChange(String key, SwitchDomain domain) throws Exception {
update(domain);
public Response onApply(WriteRequest log) {
this.raftLock.writeLock().lock();
try {
BatchWriteRequest bwRequest = serializer.deserialize(log.getData().toByteArray(), BatchWriteRequest.class);
if (isNotSwitchDomainKey(bwRequest.getKeys())) {
return Response.newBuilder().setSuccess(false).setErrMsg("not switch domain key").build();
}
final Datum datum = serializer.deserialize(bwRequest.getValues().get(0), getDatumType());
final Record value = null != datum ? datum.value : null;
if (!(value instanceof SwitchDomain)) {
return Response.newBuilder().setSuccess(false).setErrMsg("datum is not switch domain").build();
}
DiskUtils.touch(dataFile);
DiskUtils.writeFile(dataFile, bwRequest.getValues().get(0), false);
SwitchDomain switchDomain = (SwitchDomain) value;
update(switchDomain);
return Response.newBuilder().setSuccess(true).build();
} catch (Exception e) {
Loggers.RAFT.warn("On apply switch domain failed, ", e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
} finally {
this.raftLock.writeLock().unlock();
}
}
@Override
public void onDelete(String key) throws Exception {
public String group() {
return com.alibaba.nacos.naming.constants.Constants.NAMING_PERSISTENT_SERVICE_GROUP;
}
private boolean isNotSwitchDomainKey(List<byte[]> keys) {
if (1 != keys.size()) {
return false;
}
String keyString = new String(keys.get(0));
return !KeyBuilder.getSwitchDomainKey().equals(keyString);
}
private Type getDatumType() {
return TypeUtils.parameterize(Datum.class, SwitchDomain.class);
}
}

View File

@ -18,27 +18,44 @@
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.entity.PeerId;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.mock.env.MockEnvironment;
import java.lang.reflect.Field;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ServerStatusManagerTest {
@Mock
SwitchDomain switchDomain;
@Mock
ProtocolManager protocolManager;
@Mock
CPProtocol cpProtocol;
@BeforeEach
void setUp() {
EnvUtil.setEnvironment(new MockEnvironment());
@ -47,7 +64,7 @@ class ServerStatusManagerTest {
@Test
void testInit() {
try (MockedStatic mocked = mockStatic(GlobalExecutor.class)) {
ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class));
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
serverStatusManager.init();
mocked.verify(() -> GlobalExecutor.registerServerStatusUpdater(any()));
}
@ -55,30 +72,24 @@ class ServerStatusManagerTest {
@Test
void testGetServerStatus() {
ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class));
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
ServerStatus serverStatus = serverStatusManager.getServerStatus();
assertEquals(ServerStatus.STARTING, serverStatus);
}
@Test
void testGetErrorMsg() throws NoSuchFieldException, IllegalAccessException {
ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class));
Field field = ServerStatusManager.class.getDeclaredField("consistencyService");
field.setAccessible(true);
ConsistencyService consistencyService = mock(ConsistencyService.class);
when(consistencyService.getErrorMsg()).thenReturn(Optional.empty());
field.set(serverStatusManager, consistencyService);
void testGetErrorMsg() {
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
Optional<String> errorMsg = serverStatusManager.getErrorMsg();
assertFalse(errorMsg.isPresent());
assertTrue(errorMsg.isPresent());
}
@Test
void testUpdaterFromSwitch() {
SwitchDomain switchDomain = mock(SwitchDomain.class);
String expect = ServerStatus.DOWN.toString();
when(switchDomain.getOverriddenServerStatus()).thenReturn(expect);
ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain);
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater();
//then
updater.run();
@ -88,30 +99,36 @@ class ServerStatusManagerTest {
}
@Test
void testUpdaterFromConsistency1() throws NoSuchFieldException, IllegalAccessException {
SwitchDomain switchDomain = mock(SwitchDomain.class);
ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain);
Field field = ServerStatusManager.class.getDeclaredField("consistencyService");
field.setAccessible(true);
ConsistencyService consistencyService = mock(ConsistencyService.class);
when(consistencyService.isAvailable()).thenReturn(true);
field.set(serverStatusManager, consistencyService);
void testUpdaterFromConsistency1() {
try (MockedStatic mocked = mockStatic(RouteTable.class)) {
RouteTable mockTable = mock(RouteTable.class);
when(mockTable.selectLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)).thenReturn(new PeerId());
mocked.when(RouteTable::getInstance).thenReturn(mockTable);
when(protocolManager.getCpProtocol()).thenReturn(cpProtocol);
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater();
//then
updater.run();
//then
assertEquals(ServerStatus.UP, serverStatusManager.getServerStatus());
assertFalse(serverStatusManager.getErrorMsg().isPresent());
}
}
@Test
void testUpdaterFromConsistency2() throws NoSuchFieldException, IllegalAccessException {
SwitchDomain switchDomain = mock(SwitchDomain.class);
ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain);
Field field = ServerStatusManager.class.getDeclaredField("consistencyService");
field.setAccessible(true);
ConsistencyService consistencyService = mock(ConsistencyService.class);
when(consistencyService.isAvailable()).thenReturn(false);
field.set(serverStatusManager, consistencyService);
void testUpdaterFromConsistency2() {
when(protocolManager.getCpProtocol()).thenReturn(cpProtocol);
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater();
//then
updater.run();
//then
assertEquals(ServerStatus.DOWN, serverStatusManager.getServerStatus());
}
@Test
void testUpdaterFromConsistency3() {
ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain);
ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater();
//then
updater.run();

View File

@ -1,97 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.FileKvStorage;
import com.alibaba.nacos.sys.utils.DiskUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.IOException;
import java.lang.reflect.Field;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
// todo remove this
@MockitoSettings(strictness = Strictness.LENIENT)
class NamingKvStorageTest {
private final byte[] key = "fileName_test".getBytes();
private final String str = "str_test";
private NamingKvStorage namingKvStorage;
@Mock
private FileKvStorage baseDirStorageMock;
@BeforeEach
void setUp() throws Exception {
namingKvStorage = new NamingKvStorage("baseDir_test");
Field baseDirStorageField = NamingKvStorage.class.getDeclaredField("baseDirStorage");
baseDirStorageField.setAccessible(true);
baseDirStorageField.set(namingKvStorage, baseDirStorageMock);
when(baseDirStorageMock.get(key)).thenReturn(null);
}
@AfterEach
void tearDown() throws IOException {
DiskUtils.deleteDirectory("baseDir_test");
}
@Test
void testGet() throws KvStorageException {
namingKvStorage.get(key);
verify(baseDirStorageMock).get(key);
}
@Test
void testPut() throws KvStorageException {
byte[] value = "value_test".getBytes();
namingKvStorage.put(key, value);
verify(baseDirStorageMock).put(key, value);
}
@Test
void testDelete() throws KvStorageException {
namingKvStorage.delete(key);
verify(baseDirStorageMock).delete(key);
}
@Test
void testDoSnapshot() throws KvStorageException {
namingKvStorage.doSnapshot(str);
verify(baseDirStorageMock).doSnapshot(str);
}
@Test
void testSnapshotLoad() throws KvStorageException {
namingKvStorage.snapshotLoad(str);
verify(baseDirStorageMock).snapshotLoad(str);
}
}

View File

@ -1,95 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.snapshot.Reader;
import com.alibaba.nacos.consistency.snapshot.Writer;
import com.alibaba.nacos.core.distributed.raft.RaftConfig;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.mock.env.MockEnvironment;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@ExtendWith(MockitoExtension.class)
class NamingSnapshotOperationTest {
static {
RaftExecutor.init(new RaftConfig());
EnvUtil.setEnvironment(new MockEnvironment());
}
private final String snapshotDir = Paths.get(EnvUtil.getNacosTmpDir(), "rocks_snapshot_test").toString();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Mock
private KvStorage storage;
private boolean isSnapshoted = false;
@BeforeEach
void init() throws Exception {
doAnswer(invocationOnMock -> {
isSnapshoted = true;
return null;
}).when(storage).doSnapshot(any(String.class));
}
@AfterEach
void after() {
storage.shutdown();
}
@Test
void testNamingSnapshot() throws InterruptedException {
AtomicBoolean result = new AtomicBoolean(false);
NamingSnapshotOperation operation = new NamingSnapshotOperation(storage, lock, Mockito.mock(Serializer.class));
final Writer writer = new Writer(snapshotDir);
final CountDownLatch latch = new CountDownLatch(1);
operation.onSnapshotSave(writer, (isOk, throwable) -> {
result.set(isOk && throwable == null);
latch.countDown();
});
latch.await(10, TimeUnit.SECONDS);
assertTrue(isSnapshoted);
assertTrue(result.get());
final Reader reader = new Reader(snapshotDir, writer.listFiles());
boolean res = operation.onSnapshotLoad(reader);
assertTrue(res);
}
}

View File

@ -29,27 +29,16 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.test.naming.NamingBase.NAMING_CONTROLLER_PATH;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_GROUP_1;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_NAMESPACE_1;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_NAMESPACE_2;
import static com.alibaba.nacos.test.naming.NamingBase.TEST_PORT2_4_DOM_1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -58,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@SpringBootTest(classes = Nacos.class, properties = {
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class CPInstancesAPI_ITCase {
class CPInstancesAPI_ITCase extends NamingBase {
private NamingService naming;
@ -69,11 +58,6 @@ class CPInstancesAPI_ITCase {
@LocalServerPort
private int port;
private URL base;
@Autowired
private TestRestTemplate restTemplate;
@BeforeEach
void setUp() throws Exception {
String url = String.format("http://localhost:%d/", port);
@ -90,6 +74,7 @@ class CPInstancesAPI_ITCase {
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE_2);
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port);
naming2 = NamingFactory.createNamingService(properties);
isNamingServerReady();
}
@AfterEach
@ -182,13 +167,14 @@ class CPInstancesAPI_ITCase {
String serviceName = NamingBase.randomDomainName();
ResponseEntity<String> registerResponse = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance",
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "11.11.11.11").appendParam("port", "80")
.appendParam("namespaceId", TEST_NAMESPACE_1).done(), String.class, HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "11.11.11.11")
.appendParam("port", "80").appendParam("namespaceId", TEST_NAMESPACE_1).done(), String.class,
HttpMethod.POST);
assertTrue(registerResponse.getStatusCode().is2xxSuccessful());
ResponseEntity<String> deleteServiceResponse = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1).done(),
String.class, HttpMethod.DELETE);
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1)
.done(), String.class, HttpMethod.DELETE);
assertTrue(deleteServiceResponse.getStatusCode().is4xxClientError());
}
@ -210,8 +196,8 @@ class CPInstancesAPI_ITCase {
//get service
response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1).done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1)
.done(), String.class);
assertTrue(response.getStatusCode().is2xxSuccessful());
@ -254,8 +240,8 @@ class CPInstancesAPI_ITCase {
//get service
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list",
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1")
.appendParam("pageSize", "150").done(), String.class);
System.out.println("json = " + response.getBody());
assertTrue(response.getStatusCode().is2xxSuccessful());
@ -337,7 +323,8 @@ class CPInstancesAPI_ITCase {
assertEquals(1, json.get("hosts").size());
instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, "33.33.33.33", TEST_PORT2_4_DOM_1);
instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1, "22.22.22.22", TEST_PORT2_4_DOM_1);
instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1, "22.22.22.22",
TEST_PORT2_4_DOM_1);
namingServiceDelete(serviceName, Constants.DEFAULT_NAMESPACE_ID);
namingServiceDelete(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1);
@ -349,17 +336,19 @@ class CPInstancesAPI_ITCase {
private void instanceDeregister(String serviceName, String namespace, String groupName, String ip, String port) {
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance",
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip).appendParam("port", port)
.appendParam("namespaceId", namespace).appendParam("groupName", groupName).appendParam("ephemeral", "false").done(),
String.class, HttpMethod.DELETE);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip)
.appendParam("port", port).appendParam("namespaceId", namespace)
.appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), String.class,
HttpMethod.DELETE);
assertTrue(response.getStatusCode().is2xxSuccessful());
}
private void instanceRegister(String serviceName, String namespace, String groupName, String ip, String port) {
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance",
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip).appendParam("port", port)
.appendParam("namespaceId", namespace).appendParam("groupName", groupName).appendParam("ephemeral", "false").done(),
String.class, HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip)
.appendParam("port", port).appendParam("namespaceId", namespace)
.appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), String.class,
HttpMethod.POST);
assertTrue(response.getStatusCode().is2xxSuccessful());
}
@ -374,7 +363,8 @@ class CPInstancesAPI_ITCase {
private void namingServiceCreate(String serviceName, String namespace, String groupName) {
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.appendParam("namespaceId", namespace).appendParam("groupName", groupName).done(), String.class, HttpMethod.POST);
.appendParam("namespaceId", namespace).appendParam("groupName", groupName).done(), String.class,
HttpMethod.POST);
System.out.println(response);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
@ -393,26 +383,4 @@ class CPInstancesAPI_ITCase {
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
}
private <T> ResponseEntity<T> request(String path, MultiValueMap<String, String> params, Class<T> clazz) {
HttpHeaders headers = new HttpHeaders();
HttpEntity<?> entity = new HttpEntity<T>(headers);
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(this.base.toString() + path).queryParams(params);
return this.restTemplate.exchange(builder.toUriString(), HttpMethod.GET, entity, clazz);
}
private <T> ResponseEntity<T> request(String path, MultiValueMap<String, String> params, Class<T> clazz, HttpMethod httpMethod) {
HttpHeaders headers = new HttpHeaders();
HttpEntity<?> entity = new HttpEntity<T>(headers);
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(this.base.toString() + path).queryParams(params);
return this.restTemplate.exchange(builder.toUriString(), httpMethod, entity, clazz);
}
}

View File

@ -24,11 +24,14 @@ import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.test.base.HttpClient4Test;
import com.alibaba.nacos.test.base.Params;
import org.apache.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -232,4 +235,17 @@ public class NamingBase extends HttpClient4Test {
}
return contextPath.startsWith("/") ? contextPath : "/" + contextPath;
}
protected void isNamingServerReady() throws InterruptedException {
int retry = 0;
while (retry < 3) {
ResponseEntity<String> response = request("/nacos/v1/ns/operator/metrics", Params.newParams().done(),
String.class);
if (response.getStatusCode().is2xxSuccessful() && response.getBody().contains("UP")) {
break;
}
retry++;
TimeUnit.SECONDS.sleep(5);
}
}
}

View File

@ -49,6 +49,7 @@ class RestAPI_ITCase extends NamingBase {
void setUp() throws Exception {
String url = String.format("http://localhost:%d/", port);
this.base = new URL(url);
isNamingServerReady();
//prepareData();
}
@ -85,8 +86,8 @@ class RestAPI_ITCase extends NamingBase {
void createService() throws Exception {
String serviceName = NamingBase.randomDomainName();
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.done(), String.class, HttpMethod.POST);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
@ -102,14 +103,15 @@ class RestAPI_ITCase extends NamingBase {
void getService() throws Exception {
String serviceName = NamingBase.randomDomainName();
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.done(), String.class, HttpMethod.POST);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
//get service
response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.done(), String.class);
assertTrue(response.getStatusCode().is2xxSuccessful());
@ -129,8 +131,8 @@ class RestAPI_ITCase extends NamingBase {
String serviceName = NamingBase.randomDomainName();
//get service
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list",
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1")
.appendParam("pageSize", "150").done(), String.class);
assertTrue(response.getStatusCode().is2xxSuccessful());
JsonNode json = JacksonUtils.toObj(response.getBody());
@ -138,14 +140,14 @@ class RestAPI_ITCase extends NamingBase {
assertTrue(count >= 0);
response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.done(), String.class, HttpMethod.POST);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list",
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(),
String.class);
Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1")
.appendParam("pageSize", "150").done(), String.class);
assertTrue(response.getStatusCode().is2xxSuccessful());
json = JacksonUtils.toObj(response.getBody());
@ -163,8 +165,8 @@ class RestAPI_ITCase extends NamingBase {
void updateService() throws Exception {
String serviceName = NamingBase.randomDomainName();
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.6").done(), String.class,
HttpMethod.POST);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.6")
.done(), String.class, HttpMethod.POST);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());
@ -208,8 +210,8 @@ class RestAPI_ITCase extends NamingBase {
private void namingServiceDelete(String serviceName) {
//delete service
ResponseEntity<String> response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service",
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class,
HttpMethod.DELETE);
Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3")
.done(), String.class, HttpMethod.DELETE);
assertTrue(response.getStatusCode().is2xxSuccessful());
assertEquals("ok", response.getBody());