From bbed5a4eb70c49b3780f96389df60bad15b6e993 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E7=BF=8A=20SionYang?= Date: Thu, 15 Aug 2024 17:10:05 +0800 Subject: [PATCH] [Refactor] Remove KvStorage and ConsistencyService. (#12489) * Remove KvStorage and ConsistencyService. * Fix switch manager dead lock until timeout. * Ignore env vote leader effect in IT. * Wait leader voted before start do test case. --- .../nacos/core/storage/StorageFactory.java | 51 ---- .../nacos/core/storage/kv/FileKvStorage.java | 195 ------------- .../nacos/core/storage/kv/KvStorage.java | 130 --------- .../core/storage/kv/MemoryKvStorage.java | 140 --------- .../nacos/core/storage/FileKvStorageTest.java | 154 ---------- .../core/storage/MemoryKvStorageTest.java | 120 -------- .../core/storage/StorageFactoryTest.java | 59 ---- .../naming/cluster/ServerStatusManager.java | 28 +- .../consistency/ConsistencyService.java | 95 ------ .../naming/consistency/RecordListener.java | 60 ---- .../naming/consistency/ValueChangeEvent.java | 88 ------ .../PersistentConsistencyService.java | 35 --- ...sistentConsistencyServiceDelegateImpl.java | 91 ------ .../persistent/PersistentNotifier.java | 121 -------- .../impl/BasePersistentServiceProcessor.java | 265 ----------------- .../persistent/impl/NamingKvStorage.java | 166 ----------- .../persistent/impl/OldDataOperation.java | 50 ++++ .../impl/PersistentServiceProcessor.java | 174 ----------- .../StandalonePersistentServiceProcessor.java | 120 -------- .../SwitchDomainSnapshotOperation.java} | 81 +----- .../nacos/naming/misc/SwitchManager.java | 270 +++++++++++++----- .../cluster/ServerStatusManagerTest.java | 79 +++-- .../persistent/impl/NamingKvStorageTest.java | 97 ------- .../impl/NamingSnapshotOperationTest.java | 95 ------ .../test/naming/CPInstancesAPI_ITCase.java | 78 ++--- .../alibaba/nacos/test/naming/NamingBase.java | 16 ++ .../nacos/test/naming/RestAPI_ITCase.java | 32 ++- 27 files changed, 393 insertions(+), 2497 deletions(-) delete mode 100644 core/src/main/java/com/alibaba/nacos/core/storage/StorageFactory.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java delete mode 100644 core/src/test/java/com/alibaba/nacos/core/storage/FileKvStorageTest.java delete mode 100644 core/src/test/java/com/alibaba/nacos/core/storage/MemoryKvStorageTest.java delete mode 100644 core/src/test/java/com/alibaba/nacos/core/storage/StorageFactoryTest.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/RecordListener.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/ValueChangeEvent.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyService.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/OldDataOperation.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java rename naming/src/main/java/com/alibaba/nacos/naming/{consistency/persistent/impl/NamingSnapshotOperation.java => misc/SwitchDomainSnapshotOperation.java} (51%) delete mode 100644 naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorageTest.java delete mode 100644 naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperationTest.java diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/StorageFactory.java b/core/src/main/java/com/alibaba/nacos/core/storage/StorageFactory.java deleted file mode 100644 index 13fa26a11..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/storage/StorageFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage; - -import com.alibaba.nacos.core.storage.kv.FileKvStorage; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.core.storage.kv.MemoryKvStorage; - -/** - * Ket-value Storage factory. - * - * @author liaochuntao - */ -public final class StorageFactory { - - /** - * Create {@link KvStorage} implementation. - * - * @param type type of {@link KvStorage} - * @param label label for {@code RocksStorage} - * @param baseDir base dir of storage file. - * @return implementation of {@link KvStorage} - * @throws Exception exception during creating {@link KvStorage} - */ - public static KvStorage createKvStorage(KvStorage.KvType type, final String label, final String baseDir) - throws Exception { - switch (type) { - case File: - return new FileKvStorage(baseDir); - case Memory: - return new MemoryKvStorage(); - default: - throw new IllegalArgumentException("this kv type : [" + type.name() + "] not support"); - } - } - -} diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java deleted file mode 100644 index b730a219d..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage.kv; - -import com.alibaba.nacos.common.utils.ByteUtils; -import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.sys.utils.DiskUtils; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Kv storage based on file system. // TODO 写文件的方式需要优化 - * - * @author liaochuntao - */ -public class FileKvStorage implements KvStorage { - - private final String baseDir; - - /** - * Ensure that a consistent view exists when implementing file copies. - */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - - private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - - public FileKvStorage(String baseDir) throws IOException { - this.baseDir = baseDir; - DiskUtils.forceMkdir(baseDir); - } - - @Override - public byte[] get(byte[] key) throws KvStorageException { - readLock.lock(); - try { - final String fileName = new String(key); - File file = Paths.get(baseDir, fileName).toFile(); - if (file.exists()) { - return DiskUtils.readFileBytes(file); - } - return null; - } finally { - readLock.unlock(); - } - } - - @Override - public Map batchGet(List keys) throws KvStorageException { - readLock.lock(); - try { - Map result = new HashMap<>(keys.size()); - for (byte[] key : keys) { - byte[] val = get(key); - if (val != null) { - result.put(key, val); - } - } - return result; - } finally { - readLock.unlock(); - } - } - - @Override - public void put(byte[] key, byte[] value) throws KvStorageException { - readLock.lock(); - try { - final String fileName = new String(key); - File file = Paths.get(baseDir, fileName).toFile(); - try { - DiskUtils.touch(file); - DiskUtils.writeFile(file, value, false); - } catch (IOException e) { - throw new KvStorageException(ErrorCode.KVStorageWriteError, e); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void batchPut(List keys, List values) throws KvStorageException { - readLock.lock(); - try { - if (keys.size() != values.size()) { - throw new KvStorageException(ErrorCode.KVStorageBatchWriteError, - "key's size must be equal to value's size"); - } - int size = keys.size(); - for (int i = 0; i < size; i++) { - put(keys.get(i), values.get(i)); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void delete(byte[] key) throws KvStorageException { - readLock.lock(); - try { - final String fileName = new String(key); - DiskUtils.deleteFile(baseDir, fileName); - } finally { - readLock.unlock(); - } - } - - @Override - public void batchDelete(List keys) throws KvStorageException { - readLock.lock(); - try { - for (byte[] key : keys) { - delete(key); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void doSnapshot(String backupPath) throws KvStorageException { - writeLock.lock(); - try { - File srcDir = Paths.get(baseDir).toFile(); - File descDir = Paths.get(backupPath).toFile(); - DiskUtils.copyDirectory(srcDir, descDir); - } catch (IOException e) { - throw new KvStorageException(ErrorCode.IOCopyDirError, e); - } finally { - writeLock.unlock(); - } - } - - @Override - public void snapshotLoad(String path) throws KvStorageException { - writeLock.lock(); - try { - File srcDir = Paths.get(path).toFile(); - // If snapshot path is non-exist, means snapshot is empty - if (srcDir.exists()) { - // First clean up the local file information, before the file copy - DiskUtils.deleteDirThenMkdir(baseDir); - File descDir = Paths.get(baseDir).toFile(); - DiskUtils.copyDirectory(srcDir, descDir); - } - } catch (IOException e) { - throw new KvStorageException(ErrorCode.IOCopyDirError, e); - } finally { - writeLock.unlock(); - } - } - - @Override - public List allKeys() throws KvStorageException { - List result = new LinkedList<>(); - File[] files = new File(baseDir).listFiles(); - if (null != files) { - for (File each : files) { - if (each.isFile()) { - result.add(ByteUtils.toBytes(each.getName())); - } - } - } - return result; - } - - @Override - public void shutdown() { - } -} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java deleted file mode 100644 index a976daed0..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage.kv; - -import com.alibaba.nacos.core.exception.KvStorageException; - -import java.util.List; -import java.util.Map; - -/** - * Universal KV storage interface. - * - * @author liaochuntao - */ -public interface KvStorage { - - enum KvType { - /** - * Local file storage. - */ - File, - - /** - * Local memory storage. - */ - Memory, - - /** - * RocksDB storage. - */ - RocksDB, - } - - - /** - * get data by key. - * - * @param key byte[] - * @return byte[] - * @throws KvStorageException KVStorageException - */ - byte[] get(byte[] key) throws KvStorageException; - - /** - * batch get by List byte[]. - * - * @param keys List byte[] - * @return Map byte[], byte[] - * @throws KvStorageException KvStorageException - */ - Map batchGet(List keys) throws KvStorageException; - - /** - * write data. - * - * @param key byte[] - * @param value byte[] - * @throws KvStorageException KvStorageException - */ - void put(byte[] key, byte[] value) throws KvStorageException; - - /** - * batch write. - * - * @param keys List byte[] - * @param values List byte[] - * @throws KvStorageException KvStorageException - */ - void batchPut(List keys, List values) throws KvStorageException; - - /** - * delete with key. - * - * @param key byte[] - * @throws KvStorageException KvStorageException - */ - void delete(byte[] key) throws KvStorageException; - - /** - * batch delete with keys. - * - * @param keys List byte[] - * @throws KvStorageException KvStorageException - */ - void batchDelete(List keys) throws KvStorageException; - - /** - * do snapshot. - * - * @param backupPath snapshot file save path - * @throws KvStorageException KVStorageException - */ - void doSnapshot(final String backupPath) throws KvStorageException; - - /** - * load snapshot. - * - * @param path The path to the snapshot file - * @throws KvStorageException KVStorageException - */ - void snapshotLoad(String path) throws KvStorageException; - - /** - * Get all keys. - * - * @return all keys - * @throws KvStorageException KVStorageException - */ - List allKeys() throws KvStorageException; - - /** - * shutdown. - */ - void shutdown(); - -} diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java deleted file mode 100644 index 34577f701..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage.kv; - -import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alipay.sofa.jraft.util.BytesUtil; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; - -/** - * Realization of KV storage based on memory. - * - * @author liaochuntao - */ -public class MemoryKvStorage implements KvStorage { - - private final Map storage = new ConcurrentSkipListMap<>(); - - @Override - public byte[] get(byte[] key) throws KvStorageException { - return storage.get(new Key(key)); - } - - @Override - public Map batchGet(List keys) throws KvStorageException { - Map result = new HashMap<>(keys.size()); - for (byte[] key : keys) { - byte[] val = storage.get(new Key(key)); - if (val != null) { - result.put(key, val); - } - } - return result; - } - - @Override - public void put(byte[] key, byte[] value) throws KvStorageException { - storage.put(new Key(key), value); - } - - @Override - public void batchPut(List keys, List values) throws KvStorageException { - if (keys.size() != values.size()) { - throw new KvStorageException(ErrorCode.KVStorageBatchWriteError.getCode(), - "key's size must be equal to value's size"); - } - int size = keys.size(); - for (int i = 0; i < size; i++) { - storage.put(new Key(keys.get(i)), values.get(i)); - } - } - - @Override - public void delete(byte[] key) throws KvStorageException { - storage.remove(new Key(key)); - } - - @Override - public void batchDelete(List keys) throws KvStorageException { - for (byte[] key : keys) { - storage.remove(new Key(key)); - } - } - - @Override - public void doSnapshot(String backupPath) throws KvStorageException { - throw new UnsupportedOperationException(); - } - - @Override - public void snapshotLoad(String path) throws KvStorageException { - throw new UnsupportedOperationException(); - } - - @Override - public List allKeys() throws KvStorageException { - List result = new LinkedList<>(); - for (Key each : storage.keySet()) { - result.add(each.origin); - } - return result; - } - - @Override - public void shutdown() { - storage.clear(); - } - - private static class Key implements Comparable { - - private final byte[] origin; - - private Key(byte[] origin) { - this.origin = origin; - } - - @Override - public int compareTo(Key o) { - return BytesUtil.compare(origin, o.origin); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Key key = (Key) o; - return Arrays.equals(origin, key.origin); - } - - @Override - public int hashCode() { - return Arrays.hashCode(origin); - } - } - -} diff --git a/core/src/test/java/com/alibaba/nacos/core/storage/FileKvStorageTest.java b/core/src/test/java/com/alibaba/nacos/core/storage/FileKvStorageTest.java deleted file mode 100644 index 5461eefc8..000000000 --- a/core/src/test/java/com/alibaba/nacos/core/storage/FileKvStorageTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 1999-2021 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage; - -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.kv.FileKvStorage; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import org.apache.commons.io.FileUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * {@link FileKvStorage} unit tests. - * - * @author chenglu - * @date 2021-06-10 18:27 - */ -class FileKvStorageTest { - - private KvStorage kvStorage; - - private String baseDir; - - @AfterAll - static void clean() { - String dir = System.getProperty("user.home") + File.separator + "nacos_file_kv_storage_test_brotherluxcq"; - String backupDir = System.getProperty("user.home") + File.separator + "nacos_file_kv_storage_test_backup_brotherluxcq"; - - try { - FileUtils.deleteDirectory(new File(dir)); - FileUtils.deleteDirectory(new File(backupDir)); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @BeforeEach - void init() { - try { - baseDir = System.getProperty("user.home"); - String dir = baseDir + File.separator + "nacos_file_kv_storage_test_brotherluxcq"; - kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, null, dir); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testPutAndGetAndDelete() { - try { - byte[] key = "key".getBytes(); - byte[] value = "value".getBytes(); - kvStorage.put(key, value); - byte[] value1 = kvStorage.get(key); - assertArrayEquals(value, value1); - - assertNotNull(kvStorage.allKeys()); - - kvStorage.delete(key); - assertNull(kvStorage.get(key)); - - kvStorage.put(key, value); - kvStorage.shutdown(); - assertEquals(kvStorage.allKeys().size(), kvStorage.allKeys().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testBatchPutAndGet() { - try { - List keys = Arrays.asList("key1".getBytes(), "key2".getBytes()); - List values = Arrays.asList("value1".getBytes(), "value2".getBytes()); - kvStorage.batchPut(keys, values); - - Map res = kvStorage.batchGet(keys); - assertNotNull(res); - - res.forEach((key, value) -> { - if (Arrays.equals(key, "key1".getBytes())) { - assertArrayEquals("value1".getBytes(), value); - } else if (Arrays.equals(key, "key2".getBytes())) { - assertArrayEquals("value2".getBytes(), value); - } else { - fail(); - } - }); - - kvStorage.batchDelete(keys); - assertEquals(0, kvStorage.batchGet(values).size()); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testSnapshot() { - String backupDir = baseDir + File.separator + "nacos_file_kv_storage_test_backup_brotherluxcq"; - try { - File file = new File(backupDir); - if (!file.exists()) { - boolean dirResult = file.mkdirs(); - if (!dirResult) { - return; - } - } - kvStorage.doSnapshot(backupDir); - } catch (KvStorageException e) { - e.printStackTrace(); - fail(); - } - - try { - kvStorage.snapshotLoad(backupDir); - byte[] key = "key".getBytes(); - byte[] value = kvStorage.get(key); - assertArrayEquals("value".getBytes(), value); - } catch (KvStorageException e) { - e.printStackTrace(); - fail(); - } - } -} diff --git a/core/src/test/java/com/alibaba/nacos/core/storage/MemoryKvStorageTest.java b/core/src/test/java/com/alibaba/nacos/core/storage/MemoryKvStorageTest.java deleted file mode 100644 index 930c40249..000000000 --- a/core/src/test/java/com/alibaba/nacos/core/storage/MemoryKvStorageTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 1999-2021 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage; - -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.core.storage.kv.MemoryKvStorage; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * {@link MemoryKvStorage} unit tests. - * - * @author chenglu - * @date 2021-06-10 18:02 - */ -class MemoryKvStorageTest { - - private KvStorage kvStorage; - - @BeforeEach - void init() { - try { - kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.Memory, null, null); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testPutAndGetAndDelete() { - try { - byte[] key = "key".getBytes(); - byte[] value = "value".getBytes(); - kvStorage.put(key, value); - byte[] value1 = kvStorage.get(key); - assertArrayEquals(value, value1); - - assertNotNull(kvStorage.allKeys()); - - kvStorage.delete(key); - assertNull(kvStorage.get(key)); - - kvStorage.put(key, value); - kvStorage.shutdown(); - assertEquals(0, kvStorage.allKeys().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testBatchPutAndGet() { - try { - List keys = Arrays.asList("key1".getBytes(), "key2".getBytes()); - List values = Arrays.asList("value1".getBytes(), "value2".getBytes()); - kvStorage.batchPut(keys, values); - - Map res = kvStorage.batchGet(keys); - assertNotNull(res); - - res.forEach((key, value) -> { - if (Arrays.equals(key, "key1".getBytes())) { - assertArrayEquals("value1".getBytes(), value); - } else if (Arrays.equals(key, "key2".getBytes())) { - assertArrayEquals("value2".getBytes(), value); - } else { - fail(); - } - }); - - kvStorage.batchDelete(keys); - assertEquals(0, kvStorage.batchGet(values).size()); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - void testSnapshot() { - try { - kvStorage.doSnapshot("/"); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - } - - try { - kvStorage.snapshotLoad("/"); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - } - } -} diff --git a/core/src/test/java/com/alibaba/nacos/core/storage/StorageFactoryTest.java b/core/src/test/java/com/alibaba/nacos/core/storage/StorageFactoryTest.java deleted file mode 100644 index 64984beb9..000000000 --- a/core/src/test/java/com/alibaba/nacos/core/storage/StorageFactoryTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 1999-2021 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.storage; - -import com.alibaba.nacos.core.storage.kv.FileKvStorage; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.core.storage.kv.MemoryKvStorage; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * {@link StorageFactory} unit tests. - * - * @author chenglu - * @date 2021-06-10 17:55 - */ -class StorageFactoryTest { - - @Test - void testCreateKvStorage() { - try { - KvStorage kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.Memory, "", "/"); - assertTrue(kvStorage instanceof MemoryKvStorage); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - - try { - KvStorage kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "", "/"); - assertTrue(kvStorage instanceof FileKvStorage); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - - try { - StorageFactory.createKvStorage(KvStorage.KvType.RocksDB, "", "/"); - } catch (Exception e) { - assertTrue(e instanceof IllegalArgumentException); - } - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerStatusManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerStatusManager.java index 1020b7bbd..c86992073 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerStatusManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerStatusManager.java @@ -16,14 +16,15 @@ package com.alibaba.nacos.naming.cluster; -import com.alibaba.nacos.naming.consistency.ConsistencyService; +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.core.distributed.ProtocolManager; +import com.alibaba.nacos.naming.constants.Constants; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.SwitchDomain; -import com.alibaba.nacos.common.utils.StringUtils; +import com.alipay.sofa.jraft.RouteTable; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import javax.annotation.Resource; import java.util.Optional; /** @@ -35,14 +36,14 @@ import java.util.Optional; @Service public class ServerStatusManager { - @Resource(name = "persistentConsistencyServiceDelegate") - private ConsistencyService consistencyService; + private final ProtocolManager protocolManager; private final SwitchDomain switchDomain; private ServerStatus serverStatus = ServerStatus.STARTING; - public ServerStatusManager(SwitchDomain switchDomain) { + public ServerStatusManager(ProtocolManager protocolManager, SwitchDomain switchDomain) { + this.protocolManager = protocolManager; this.switchDomain = switchDomain; } @@ -58,19 +59,30 @@ public class ServerStatusManager { return; } - if (consistencyService.isAvailable()) { + if (hasLeader()) { serverStatus = ServerStatus.UP; } else { serverStatus = ServerStatus.DOWN; } } + private boolean hasLeader() { + if (protocolManager.getCpProtocol() == null) { + return false; + } + return null != RouteTable.getInstance().selectLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP); + } + public ServerStatus getServerStatus() { return serverStatus; } public Optional getErrorMsg() { - return consistencyService.getErrorMsg(); + if (hasLeader()) { + return Optional.empty(); + } + return Optional.of("No leader for raft group " + Constants.NAMING_PERSISTENT_SERVICE_GROUP + + ", please see logs `alipay-jraft.log` or `naming-raft.log` to see details."); } public class ServerStatusUpdater implements Runnable { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java deleted file mode 100644 index b242d9d6a..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.naming.pojo.Record; - -import java.util.Optional; - -/** - * Consistence service for all implementations to derive. - * - *

We announce this consistency service to decouple the specific consistency implementation with business logic. - * User should not be aware of what consistency protocol is being used. - * - *

In this way, we also provide space for user to extend the underlying consistency protocols, as long as they obey - * our consistency baseline. - * - * @author nkorange - * @since 1.0.0 - */ -public interface ConsistencyService { - - /** - * Put a data related to a key to Nacos cluster. - * - * @param key key of data, this key should be globally unique - * @param value value of data - * @throws NacosException nacos exception - */ - void put(String key, Record value) throws NacosException; - - /** - * Remove a data from Nacos cluster. - * - * @param key key of data - * @throws NacosException nacos exception - */ - void remove(String key) throws NacosException; - - /** - * Get a data from Nacos cluster. - * - * @param key key of data - * @return data related to the key - * @throws NacosException nacos exception - */ - Datum get(String key) throws NacosException; - - /** - * Listen for changes of a data. - * - * @param key key of data - * @param listener callback of data change - * @throws NacosException nacos exception - */ - void listen(String key, RecordListener listener) throws NacosException; - - /** - * Cancel listening of a data. - * - * @param key key of data - * @param listener callback of data change - * @throws NacosException nacos exception - */ - void unListen(String key, RecordListener listener) throws NacosException; - - /** - * Get the error message of the consistency protocol. - * - * @return the consistency protocol error message. - */ - Optional getErrorMsg(); - - /** - * Tell the status of this consistency service. - * - * @return true if available - */ - boolean isAvailable(); -} \ No newline at end of file diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/RecordListener.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/RecordListener.java deleted file mode 100644 index 9a3b45ecd..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/RecordListener.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency; - -import com.alibaba.nacos.naming.pojo.Record; - -/** - * Data listener public interface. - * - * @author nacos - */ -public interface RecordListener { - - /** - * Determine if the listener was registered with this key. - * - * @param key candidate key - * @return true if the listener was registered with this key - */ - boolean interests(String key); - - /** - * Determine if the listener is to be removed by matching the 'key'. - * - * @param key key to match - * @return true if match success - */ - boolean matchUnlistenKey(String key); - - /** - * Action to do if data of target key has changed. - * - * @param key target key - * @param value data of the key - * @throws Exception exception - */ - void onChange(String key, T value) throws Exception; - - /** - * Action to do if data of target key has been removed. - * - * @param key target key - * @throws Exception exception - */ - void onDelete(String key) throws Exception; -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ValueChangeEvent.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ValueChangeEvent.java deleted file mode 100644 index f05729bff..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ValueChangeEvent.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency; - -import com.alibaba.nacos.common.notify.Event; -import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.naming.pojo.Record; - -/** - * The value changes events. //TODO Recipients need to implement the ability to receive batch events - * - * @author liaochuntao - */ -public class ValueChangeEvent extends Event { - - private final String key; - - private final Record value; - - private final DataOperation action; - - public ValueChangeEvent(String key, Record value, DataOperation action) { - this.key = key; - this.value = value; - this.action = action; - } - - public String getKey() { - return key; - } - - public Record getValue() { - return value; - } - - public DataOperation getAction() { - return action; - } - - public static ValueChangeEventBuilder builder() { - return new ValueChangeEventBuilder(); - } - - public static final class ValueChangeEventBuilder { - - private String key; - - private Record value; - - private DataOperation action; - - private ValueChangeEventBuilder() { - } - - public ValueChangeEventBuilder key(String key) { - this.key = key; - return this; - } - - public ValueChangeEventBuilder value(Record value) { - this.value = value; - return this; - } - - public ValueChangeEventBuilder action(DataOperation action) { - this.action = action; - return this; - } - - public ValueChangeEvent build() { - return new ValueChangeEvent(key, value, action); - } - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyService.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyService.java deleted file mode 100644 index 67a633702..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyService.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent; - -import com.alibaba.nacos.naming.consistency.ConsistencyService; - -/** - * A consistency service that guarantee CP consistency for the published data. - * - *

CP consistency is hereby defined as follows: - * - *

Once the writing operation returned client a success, the data within the operation is guaranteed to be - * successfully written to the cluster. And the data should be consistent between servers after some time without any - * outside interfere. - * - * @author nkorange - * @since 1.0.0 - */ -public interface PersistentConsistencyService extends ConsistencyService { - -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java deleted file mode 100644 index 55c26613b..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.core.distributed.ProtocolManager; -import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor; -import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor; -import com.alibaba.nacos.naming.consistency.persistent.impl.StandalonePersistentServiceProcessor; -import com.alibaba.nacos.naming.pojo.Record; -import com.alibaba.nacos.sys.env.EnvUtil; -import org.springframework.context.annotation.DependsOn; -import org.springframework.stereotype.Component; - -import java.util.Optional; - -/** - * Persistent consistency service delegate. - * - * @author xiweng.yy - */ -@DependsOn("ProtocolManager") -@Component("persistentConsistencyServiceDelegate") -public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService { - - private final BasePersistentServiceProcessor persistentServiceProcessor; - - public PersistentConsistencyServiceDelegateImpl(ProtocolManager protocolManager) throws Exception { - this.persistentServiceProcessor = createPersistentServiceProcessor(protocolManager); - } - - @Override - public void put(String key, Record value) throws NacosException { - persistentServiceProcessor.put(key, value); - } - - @Override - public void remove(String key) throws NacosException { - persistentServiceProcessor.remove(key); - } - - @Override - public Datum get(String key) throws NacosException { - return persistentServiceProcessor.get(key); - } - - @Override - public void listen(String key, RecordListener listener) throws NacosException { - persistentServiceProcessor.listen(key, listener); - } - - @Override - public void unListen(String key, RecordListener listener) throws NacosException { - persistentServiceProcessor.unListen(key, listener); - } - - @Override - public boolean isAvailable() { - return persistentServiceProcessor.isAvailable(); - } - - @Override - public Optional getErrorMsg() { - return persistentServiceProcessor.getErrorMsg(); - } - - private BasePersistentServiceProcessor createPersistentServiceProcessor(ProtocolManager protocolManager) - throws Exception { - final BasePersistentServiceProcessor processor = - EnvUtil.getStandaloneMode() ? new StandalonePersistentServiceProcessor() - : new PersistentServiceProcessor(protocolManager); - processor.afterConstruct(); - return processor; - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java deleted file mode 100644 index 1356c53af..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent; - -import com.alibaba.nacos.common.notify.Event; -import com.alibaba.nacos.common.notify.listener.Subscriber; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.consistency.ValueChangeEvent; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.pojo.Record; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -/** - * persistent notifier, It is responsible for notifying interested listeners of all write changes to the data. - * - * @author liaochuntao - */ -public final class PersistentNotifier extends Subscriber { - - private final Map> listenerMap = new ConcurrentHashMap<>(32); - - private final Function find; - - public PersistentNotifier(Function find) { - this.find = find; - } - - /** - * register listener with key. - * - * @param key key - * @param listener {@link RecordListener} - */ - public void registerListener(final String key, final RecordListener listener) { - listenerMap.computeIfAbsent(key, s -> new ConcurrentHashSet<>()).add(listener); - } - - /** - * deregister listener by key. - * - * @param key key - * @param listener {@link RecordListener} - */ - public void deregisterListener(final String key, final RecordListener listener) { - if (!listenerMap.containsKey(key)) { - return; - } - listenerMap.get(key).remove(listener); - } - - /** - * deregister all listener by key. - * - * @param key key - */ - public void deregisterAllListener(final String key) { - listenerMap.remove(key); - } - - public Map> getListeners() { - return listenerMap; - } - - /** - * notify value to listener with {@link DataOperation} and key. - * - * @param key key - * @param action {@link DataOperation} - * @param value value - * @param type - */ - public void notify(final String key, final DataOperation action, final T value) { - - if (!listenerMap.containsKey(key)) { - return; - } - - for (RecordListener listener : listenerMap.get(key)) { - try { - if (action == DataOperation.CHANGE) { - listener.onChange(key, value); - continue; - } - if (action == DataOperation.DELETE) { - listener.onDelete(key); - } - } catch (Throwable e) { - Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); - } - } - } - - @Override - public void onEvent(ValueChangeEvent event) { - notify(event.getKey(), event.getAction(), find.apply(event.getKey())); - } - - @Override - public Class subscribeType() { - return ValueChangeEvent.class; - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java deleted file mode 100644 index ab4b72106..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; -import com.alibaba.nacos.common.notify.NotifyCenter; -import com.alibaba.nacos.common.utils.ByteUtils; -import com.alibaba.nacos.common.utils.TypeUtils; -import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.consistency.SerializeFactory; -import com.alibaba.nacos.consistency.Serializer; -import com.alibaba.nacos.consistency.cp.RequestProcessor4CP; -import com.alibaba.nacos.consistency.entity.ReadRequest; -import com.alibaba.nacos.consistency.entity.Response; -import com.alibaba.nacos.consistency.entity.WriteRequest; -import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.KeyBuilder; -import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.consistency.ValueChangeEvent; -import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; -import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; -import com.alibaba.nacos.naming.constants.Constants; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.UtilsAndCommons; -import com.alibaba.nacos.naming.pojo.Record; -import com.google.protobuf.ByteString; - -import java.lang.reflect.Type; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * New service data persistence handler. - * - * @author liaochuntao - */ -public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP - implements PersistentConsistencyService { - - enum Op { - /** - * write ops. - */ - Write("Write"), - - /** - * read ops. - */ - Read("Read"), - - /** - * delete ops. - */ - Delete("Delete"); - - protected final String desc; - - Op(String desc) { - this.desc = desc; - } - } - - protected final KvStorage kvStorage; - - protected final Serializer serializer; - - /** - * Whether an unrecoverable error occurred. - */ - protected volatile boolean hasError = false; - - protected volatile String jRaftErrorMsg; - - /** - * If use old raft, should not notify listener even new listener add. - */ - protected volatile boolean startNotify = false; - - /** - * During snapshot processing, the processing of other requests needs to be paused. - */ - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - - protected final PersistentNotifier notifier; - - protected final int queueMaxSize = 16384; - - protected final int priority = 10; - - public BasePersistentServiceProcessor() throws Exception { - this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); - this.serializer = SerializeFactory.getSerializer("JSON"); - this.notifier = new PersistentNotifier(key -> { - try { - byte[] data = kvStorage.get(ByteUtils.toBytes(key)); - Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key)); - return null != datum ? datum.value : null; - } catch (KvStorageException ex) { - throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg()); - } catch (Exception e) { - throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.getMessage()); - } - }); - } - - @SuppressWarnings("unchecked") - public void afterConstruct() { - NotifyCenter.registerToPublisher(ValueChangeEvent.class, queueMaxSize); - } - - @Override - public Response onRequest(ReadRequest request) { - final Lock lock = readLock; - lock.lock(); - try { - final List keys = serializer - .deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class)); - final Map result = kvStorage.batchGet(keys); - final BatchReadResponse response = new BatchReadResponse(); - result.forEach(response::append); - return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response))) - .build(); - } catch (KvStorageException e) { - return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); - } catch (Exception e) { - Loggers.RAFT.warn("On read request failed, ", e); - return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build(); - } finally { - lock.unlock(); - } - } - - @Override - public Response onApply(WriteRequest request) { - final byte[] data = request.getData().toByteArray(); - final Lock lock = readLock; - lock.lock(); - try { - final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class); - final Op op = Op.valueOf(request.getOperation()); - switch (op) { - case Write: - kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues()); - break; - case Delete: - kvStorage.batchDelete(bwRequest.getKeys()); - break; - default: - return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build(); - } - publishValueChangeEvent(op, bwRequest); - return Response.newBuilder().setSuccess(true).build(); - } catch (KvStorageException e) { - return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); - } catch (Exception e) { - Loggers.RAFT.warn("On apply write request failed, ", e); - return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build(); - } finally { - lock.unlock(); - } - } - - private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) { - final List keys = request.getKeys(); - final List values = request.getValues(); - for (int i = 0; i < keys.size(); i++) { - final String key = new String(keys.get(i)); - // Ignore old 1.x version data - if (!KeyBuilder.matchSwitchKey(key)) { - continue; - } - final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key)); - final Record value = null != datum ? datum.value : null; - final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) - .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); - NotifyCenter.publishEvent(event); - } - } - - @Override - public String group() { - return Constants.NAMING_PERSISTENT_SERVICE_GROUP; - } - - @Override - public List loadSnapshotOperate() { - return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock, serializer)); - } - - @Override - public void onError(Throwable error) { - super.onError(error); - hasError = true; - jRaftErrorMsg = error.getMessage(); - } - - protected Type getDatumTypeFromKey(String key) { - return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key)); - } - - protected Class getClassOfRecordFromKey(String key) { - if (KeyBuilder.matchSwitchKey(key)) { - return com.alibaba.nacos.naming.misc.SwitchDomain.class; - } - return Record.class; - } - - protected void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException { - if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { - notifierAllServiceMeta(listener); - } else { - Datum datum = get(key); - if (null != datum) { - notifierDatum(key, datum, listener); - } - } - } - - /** - * This notify should only notify once during startup. - */ - private void notifierAllServiceMeta(RecordListener listener) throws NacosException { - for (byte[] each : kvStorage.allKeys()) { - String key = new String(each); - if (listener.interests(key)) { - Datum datum = get(key); - if (null != datum) { - notifierDatum(key, datum, listener); - } - } - } - } - - private void notifierDatum(String key, Datum datum, RecordListener listener) { - try { - listener.onChange(key, datum.value); - } catch (Exception e) { - Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e); - } - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java deleted file mode 100644 index a3b0c0f76..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.StorageFactory; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.core.storage.kv.MemoryKvStorage; -import com.alibaba.nacos.sys.utils.TimerContext; -import com.alibaba.nacos.naming.misc.Loggers; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Kv storage implementation for naming. - * - * @author xiweng.yy - */ -public class NamingKvStorage extends MemoryKvStorage { - - private static final String LOAD_SNAPSHOT = NamingKvStorage.class.getSimpleName() + ".snapshotLoad"; - - private static final String LABEL = "naming-persistent"; - - private final String baseDir; - - private final KvStorage baseDirStorage; - - public NamingKvStorage(final String baseDir) throws Exception { - this.baseDir = baseDir; - this.baseDirStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, LABEL, baseDir); - } - - @Override - public byte[] get(byte[] key) throws KvStorageException { - // First get the data from the memory Cache - byte[] result = super.get(key); - if (null == result) { - try { - KvStorage storage = getStorage(); - result = null == storage ? null : storage.get(key); - if (null != result) { - super.put(key, result); - } - } catch (Exception e) { - throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(), - "Get data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e); - } - } - return result; - } - - @Override - public Map batchGet(List keys) throws KvStorageException { - Map result = new HashMap<>(keys.size()); - for (byte[] key : keys) { - byte[] val = get(key); - if (val != null) { - result.put(key, val); - } - } - return result; - } - - @Override - public void put(byte[] key, byte[] value) throws KvStorageException { - try { - KvStorage storage = getStorage(); - storage.put(key, value); - } catch (Exception e) { - throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(), - "Put data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e); - } - // after actual storage put success, put it in memory, memory put should success all the time - super.put(key, value); - } - - @Override - public void batchPut(List keys, List values) throws KvStorageException { - if (keys.size() != values.size()) { - throw new KvStorageException(ErrorCode.KVStorageBatchWriteError, - "key's size must be equal to value's size"); - } - int size = keys.size(); - for (int i = 0; i < size; i++) { - put(keys.get(i), values.get(i)); - } - } - - @Override - public void delete(byte[] key) throws KvStorageException { - try { - KvStorage storage = getStorage(); - if (null != storage) { - storage.delete(key); - } - } catch (Exception e) { - throw new KvStorageException(ErrorCode.KVStorageDeleteError.getCode(), - "Delete data failed, key: " + new String(key) + ", detail: " + e.getMessage(), e); - } - // after actual storage delete success, put it in memory, memory delete should success all the time - super.delete(key); - } - - @Override - public void batchDelete(List keys) throws KvStorageException { - for (byte[] each : keys) { - delete(each); - } - } - - @Override - public void doSnapshot(String backupPath) throws KvStorageException { - baseDirStorage.doSnapshot(backupPath); - } - - @Override - public void snapshotLoad(String path) throws KvStorageException { - TimerContext.start(LOAD_SNAPSHOT); - try { - baseDirStorage.snapshotLoad(path); - loadSnapshotFromActualStorage(baseDirStorage); - } finally { - TimerContext.end(LOAD_SNAPSHOT, Loggers.RAFT); - } - } - - private void loadSnapshotFromActualStorage(KvStorage actualStorage) throws KvStorageException { - for (byte[] each : actualStorage.allKeys()) { - byte[] datum = actualStorage.get(each); - super.put(each, datum); - } - } - - @Override - public List allKeys() throws KvStorageException { - return super.allKeys(); - } - - @Override - public void shutdown() { - baseDirStorage.shutdown(); - super.shutdown(); - } - - private KvStorage getStorage() throws Exception { - return baseDirStorage; - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/OldDataOperation.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/OldDataOperation.java new file mode 100644 index 000000000..8249ca42e --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/OldDataOperation.java @@ -0,0 +1,50 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.consistency.persistent.impl; + +/** + * Adapter old version data operation {@link com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor.Op}. + * + * @author xiweng.yy + */ +public enum OldDataOperation { + + /** + * write ops. + */ + Write("Write"), + + /** + * read ops. + */ + Read("Read"), + + /** + * delete ops. + */ + Delete("Delete"); + + private final String desc; + + OldDataOperation(String desc) { + this.desc = desc; + } + + public String getDesc() { + return desc; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java deleted file mode 100644 index f8768506a..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.common.notify.NotifyCenter; -import com.alibaba.nacos.common.utils.ByteUtils; -import com.alibaba.nacos.common.utils.StringUtils; -import com.alibaba.nacos.consistency.ProtocolMetaData; -import com.alibaba.nacos.consistency.cp.CPProtocol; -import com.alibaba.nacos.consistency.cp.MetadataKey; -import com.alibaba.nacos.consistency.entity.ReadRequest; -import com.alibaba.nacos.consistency.entity.Response; -import com.alibaba.nacos.consistency.entity.WriteRequest; -import com.alibaba.nacos.core.distributed.ProtocolManager; -import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.constants.Constants; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.pojo.Record; -import com.alibaba.nacos.sys.env.EnvUtil; -import com.google.protobuf.ByteString; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -/** - * In cluster mode, start the Raft protocol. - * - * @author liaochuntao - */ -@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") -public class PersistentServiceProcessor extends BasePersistentServiceProcessor { - - private final CPProtocol protocol; - - /** - * Is there a leader node currently. - */ - private volatile boolean hasLeader = false; - - public PersistentServiceProcessor(ProtocolManager protocolManager) throws Exception { - this.protocol = protocolManager.getCpProtocol(); - } - - @Override - public void afterConstruct() { - super.afterConstruct(); - String raftGroup = Constants.NAMING_PERSISTENT_SERVICE_GROUP; - this.protocol.protocolMetaData().subscribe(raftGroup, MetadataKey.LEADER_META_DATA, o -> { - if (!(o instanceof ProtocolMetaData.ValueItem)) { - return; - } - Object leader = ((ProtocolMetaData.ValueItem) o).getData(); - hasLeader = StringUtils.isNotBlank(String.valueOf(leader)); - Loggers.RAFT.info("Raft group {} has leader {}", raftGroup, leader); - }); - this.protocol.addRequestProcessors(Collections.singletonList(this)); - // If you choose to use the new RAFT protocol directly, there will be no compatible logical execution - if (EnvUtil.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) { - NotifyCenter.registerSubscriber(notifier); - waitLeader(); - startNotify = true; - } - } - - private void waitLeader() { - while (!hasLeader && !hasError) { - Loggers.RAFT.info("Waiting Jraft leader vote ..."); - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException ignored) { - } - } - } - - @Override - public void put(String key, Record value) throws NacosException { - final BatchWriteRequest req = new BatchWriteRequest(); - Datum datum = Datum.createDatum(key, value); - req.append(ByteUtils.toBytes(key), serializer.serialize(datum)); - final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) - .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); - try { - protocol.write(request); - } catch (Exception e) { - throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); - } - } - - @Override - public void remove(String key) throws NacosException { - final BatchWriteRequest req = new BatchWriteRequest(); - req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY); - final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) - .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build(); - try { - protocol.write(request); - } catch (Exception e) { - throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); - } - } - - @Override - public Datum get(String key) throws NacosException { - final List keys = new ArrayList<>(1); - keys.add(ByteUtils.toBytes(key)); - final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP) - .setData(ByteString.copyFrom(serializer.serialize(keys))).build(); - try { - Response resp = protocol.getData(req); - if (resp.getSuccess()) { - BatchReadResponse response = serializer - .deserialize(resp.getData().toByteArray(), BatchReadResponse.class); - final List rValues = response.getValues(); - return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key)); - } - throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg()); - } catch (Throwable e) { - throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage()); - } - } - - @Override - public void listen(String key, RecordListener listener) throws NacosException { - notifier.registerListener(key, listener); - if (startNotify) { - notifierDatumIfAbsent(key, listener); - } - } - - @Override - public void unListen(String key, RecordListener listener) throws NacosException { - notifier.deregisterListener(key, listener); - } - - @Override - public boolean isAvailable() { - return hasLeader && !hasError; - } - - @Override - public Optional getErrorMsg() { - String errorMsg; - if (hasLeader && hasError) { - errorMsg = "The raft peer is in error: " + jRaftErrorMsg; - } else if (hasLeader && !hasError) { - errorMsg = null; - } else if (!hasLeader && hasError) { - errorMsg = "Could not find leader! And the raft peer is in error: " + jRaftErrorMsg; - } else { - errorMsg = "Could not find leader!"; - } - return Optional.ofNullable(errorMsg); - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java deleted file mode 100644 index 47af6ea0b..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.common.utils.ByteUtils; -import com.alibaba.nacos.consistency.entity.ReadRequest; -import com.alibaba.nacos.consistency.entity.Response; -import com.alibaba.nacos.consistency.entity.WriteRequest; -import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.constants.Constants; -import com.alibaba.nacos.naming.pojo.Record; -import com.google.protobuf.ByteString; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -/** - * Persistent service manipulation layer in stand-alone mode. - * - * @author liaochuntao - */ -@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") -public class StandalonePersistentServiceProcessor extends BasePersistentServiceProcessor { - - public StandalonePersistentServiceProcessor() throws Exception { - } - - @Override - public void put(String key, Record value) throws NacosException { - final BatchWriteRequest req = new BatchWriteRequest(); - Datum datum = Datum.createDatum(key, value); - req.append(ByteUtils.toBytes(key), serializer.serialize(datum)); - final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) - .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); - try { - onApply(request); - } catch (Exception e) { - throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); - } - } - - @Override - public void remove(String key) throws NacosException { - final BatchWriteRequest req = new BatchWriteRequest(); - req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY); - final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) - .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build(); - try { - onApply(request); - } catch (Exception e) { - throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); - } - } - - @Override - public Datum get(String key) throws NacosException { - final List keys = Collections.singletonList(ByteUtils.toBytes(key)); - final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP) - .setData(ByteString.copyFrom(serializer.serialize(keys))).build(); - try { - final Response resp = onRequest(req); - if (resp.getSuccess()) { - BatchReadResponse response = serializer - .deserialize(resp.getData().toByteArray(), BatchReadResponse.class); - final List rValues = response.getValues(); - return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key)); - } - throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg()); - } catch (Throwable e) { - throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage()); - } - } - - @Override - public void listen(String key, RecordListener listener) throws NacosException { - notifier.registerListener(key, listener); - if (startNotify) { - notifierDatumIfAbsent(key, listener); - } - } - - @Override - public void unListen(String key, RecordListener listener) throws NacosException { - notifier.deregisterListener(key, listener); - } - - @Override - public boolean isAvailable() { - return !hasError; - } - - @Override - public Optional getErrorMsg() { - String errorMsg; - if (hasError) { - errorMsg = "The raft peer is in error: " + jRaftErrorMsg; - } else { - errorMsg = null; - } - return Optional.ofNullable(errorMsg); - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperation.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomainSnapshotOperation.java similarity index 51% rename from naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperation.java rename to naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomainSnapshotOperation.java index eb812b90c..10d704ec3 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperation.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomainSnapshotOperation.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,56 +14,40 @@ * limitations under the License. */ -package com.alibaba.nacos.naming.consistency.persistent.impl; +package com.alibaba.nacos.naming.misc; -import com.alibaba.nacos.common.notify.NotifyCenter; -import com.alibaba.nacos.common.utils.TypeUtils; -import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.snapshot.LocalFileMeta; import com.alibaba.nacos.consistency.snapshot.Reader; import com.alibaba.nacos.consistency.snapshot.Writer; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.KeyBuilder; -import com.alibaba.nacos.naming.consistency.ValueChangeEvent; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.SwitchDomain; -import com.alibaba.nacos.naming.pojo.Record; +import com.alibaba.nacos.naming.consistency.persistent.impl.AbstractSnapshotOperation; import com.alibaba.nacos.sys.utils.DiskUtils; import com.alipay.sofa.jraft.util.CRC64; -import java.lang.reflect.Type; import java.nio.file.Paths; -import java.util.List; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.Checksum; /** - * Snapshot processing of persistent service data for accelerated Raft protocol recovery and data synchronization. + * Switch Domain snapshot operation. * - * @author liaochuntao * @author xiweng.yy */ -public class NamingSnapshotOperation extends AbstractSnapshotOperation { - - private static final String NAMING_SNAPSHOT_SAVE = NamingSnapshotOperation.class.getSimpleName() + ".SAVE"; - - private static final String NAMING_SNAPSHOT_LOAD = NamingSnapshotOperation.class.getSimpleName() + ".LOAD"; +public class SwitchDomainSnapshotOperation extends AbstractSnapshotOperation { private final String snapshotDir = "naming_persistent"; private final String snapshotArchive = "naming_persistent.zip"; - private final KvStorage storage; - + private final SwitchManager switchManager; + private final Serializer serializer; - public NamingSnapshotOperation(KvStorage storage, ReentrantReadWriteLock lock, Serializer serializer) { + public SwitchDomainSnapshotOperation(ReentrantReadWriteLock lock, SwitchManager switchManager, + Serializer serializer) { super(lock); - this.storage = storage; + this.switchManager = switchManager; this.serializer = serializer; } @@ -74,7 +58,7 @@ public class NamingSnapshotOperation extends AbstractSnapshotOperation { DiskUtils.deleteDirectory(parentPath); DiskUtils.forceMkdir(parentPath); - storage.doSnapshot(parentPath); + this.switchManager.dumpSnapshot(parentPath); final String outputFile = Paths.get(writePath, snapshotArchive).toString(); final Checksum checksum = new CRC64(); DiskUtils.compress(writePath, snapshotDir, outputFile, checksum); @@ -98,56 +82,19 @@ public class NamingSnapshotOperation extends AbstractSnapshotOperation { } } final String loadPath = Paths.get(readerPath, snapshotDir).toString(); - storage.snapshotLoad(loadPath); - // publish value change - publishValueChangeEvent(); Loggers.RAFT.info("snapshot load from : {}", loadPath); + this.switchManager.loadSnapshot(loadPath); DiskUtils.deleteDirectory(loadPath); return true; } - - /** - * publish value change event. - * - * @throws KvStorageException throw to invoker - */ - private void publishValueChangeEvent() throws KvStorageException { - List keys = storage.allKeys(); - for (int i = 0; i < keys.size(); i++) { - String key = new String(keys.get(i)); - // Ignore old 1.x version data - if (!KeyBuilder.matchSwitchKey(key)) { - continue; - } - Datum datum = serializer.deserialize(storage.get(keys.get(i)), getDatumTypeFromKey(key)); - Record value = (datum != null) ? datum.value : null; - // report for refreshing SwitchDomain message - if (value != null) { - ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) - .action(DataOperation.CHANGE).build(); - NotifyCenter.publishEvent(event); - } - } - } - - private Type getDatumTypeFromKey(String key) { - return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key)); - } - - private Class getClassOfRecordFromKey(String key) { - if (KeyBuilder.matchSwitchKey(key)) { - return SwitchDomain.class; - } - return Record.class; - } @Override protected String getSnapshotSaveTag() { - return NAMING_SNAPSHOT_SAVE; + return SwitchDomainSnapshotOperation.class.getSimpleName() + ".SAVE"; } @Override protected String getSnapshotLoadTag() { - return NAMING_SNAPSHOT_LOAD; + return SwitchDomainSnapshotOperation.class.getSimpleName() + ".LOAD"; } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java index ce50b7211..080bcaa2a 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java @@ -18,23 +18,43 @@ package com.alibaba.nacos.naming.misc; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; +import com.alibaba.nacos.common.utils.ByteUtils; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.StringUtils; -import com.alibaba.nacos.naming.consistency.ConsistencyService; +import com.alibaba.nacos.common.utils.TypeUtils; +import com.alibaba.nacos.consistency.SerializeFactory; +import com.alibaba.nacos.consistency.Serializer; +import com.alibaba.nacos.consistency.cp.RequestProcessor4CP; +import com.alibaba.nacos.consistency.entity.ReadRequest; +import com.alibaba.nacos.consistency.entity.Response; +import com.alibaba.nacos.consistency.entity.WriteRequest; +import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; +import com.alibaba.nacos.core.distributed.ProtocolManager; +import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; -import com.alibaba.nacos.naming.consistency.RecordListener; -import org.springframework.beans.factory.annotation.Autowired; +import com.alibaba.nacos.naming.consistency.persistent.impl.BatchReadResponse; +import com.alibaba.nacos.naming.consistency.persistent.impl.BatchWriteRequest; +import com.alibaba.nacos.naming.consistency.persistent.impl.OldDataOperation; +import com.alibaba.nacos.naming.pojo.Record; +import com.alibaba.nacos.sys.utils.DiskUtils; +import com.google.protobuf.ByteString; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import javax.annotation.Resource; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Switch manager. @@ -43,27 +63,36 @@ import java.util.concurrent.locks.ReentrantLock; * @since 1.0.0 */ @Component -public class SwitchManager implements RecordListener { +public class SwitchManager extends RequestProcessor4CP { - @Autowired - private SwitchDomain switchDomain; + private final SwitchDomain switchDomain; - @Resource(name = "persistentConsistencyServiceDelegate") - private ConsistencyService consistencyService; + private final ProtocolManager protocolManager; - ReentrantLock lock = new ReentrantLock(); + private final ReentrantReadWriteLock raftLock; - /** - * Init switch manager. - */ - @PostConstruct - public void init() { - + private final ReentrantLock requestLock; + + private final Serializer serializer; + + private final SwitchDomainSnapshotOperation snapshotOperation; + + private final File dataFile; + + public SwitchManager(SwitchDomain switchDomain, ProtocolManager protocolManager) { + this.switchDomain = switchDomain; + this.protocolManager = protocolManager; + this.raftLock = new ReentrantReadWriteLock(); + this.requestLock = new ReentrantLock(); + this.serializer = SerializeFactory.getSerializer("JSON"); + this.snapshotOperation = new SwitchDomainSnapshotOperation(this.raftLock, this, this.serializer); + this.dataFile = Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data", KeyBuilder.getSwitchDomainKey()).toFile(); try { - consistencyService.listen(KeyBuilder.getSwitchDomainKey(), this); - } catch (NacosException e) { - Loggers.SRV_LOG.error("listen switch service failed.", e); + DiskUtils.forceMkdir(this.dataFile.getParent()); + } catch (IOException e) { + Loggers.RAFT.error("Init Switch Domain directory failed: ", e); } + protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this)); } /** @@ -76,22 +105,15 @@ public class SwitchManager implements RecordListener { */ public void update(String entry, String value, boolean debug) throws Exception { - lock.lock(); + this.requestLock.lock(); try { - Datum datum = consistencyService.get(KeyBuilder.getSwitchDomainKey()); - SwitchDomain switchDomain; - - if (datum != null && datum.value != null) { - switchDomain = (SwitchDomain) datum.value; - } else { - switchDomain = this.switchDomain.clone(); - } + SwitchDomain tempSwitchDomain = this.switchDomain.clone(); if (SwitchEntry.BATCH.equals(entry)) { //batch update SwitchDomain dom = JacksonUtils.toObj(value, SwitchDomain.class); - dom.setEnableStandalone(switchDomain.isEnableStandalone()); + dom.setEnableStandalone(tempSwitchDomain.isEnableStandalone()); if (dom.getHttpHealthParams().getMin() < SwitchDomain.HttpHealthParams.MIN_MIN || dom.getTcpHealthParams().getMin() < SwitchDomain.HttpHealthParams.MIN_MIN) { @@ -110,7 +132,7 @@ public class SwitchManager implements RecordListener { throw new IllegalArgumentException("malformed factor"); } - switchDomain = dom; + tempSwitchDomain = dom; } if (entry.equals(SwitchEntry.DISTRO_THRESHOLD)) { @@ -118,12 +140,12 @@ public class SwitchManager implements RecordListener { if (threshold <= 0) { throw new IllegalArgumentException("distroThreshold can not be zero or negative: " + threshold); } - switchDomain.setDistroThreshold(threshold); + tempSwitchDomain.setDistroThreshold(threshold); } if (entry.equals(SwitchEntry.CLIENT_BEAT_INTERVAL)) { long clientBeatInterval = Long.parseLong(value); - switchDomain.setClientBeatInterval(clientBeatInterval); + tempSwitchDomain.setClientBeatInterval(clientBeatInterval); } if (entry.equals(SwitchEntry.PUSH_VERSION)) { @@ -137,13 +159,13 @@ public class SwitchManager implements RecordListener { } if (StringUtils.equals(SwitchEntry.CLIENT_JAVA, type)) { - switchDomain.setPushJavaVersion(version); + tempSwitchDomain.setPushJavaVersion(version); } else if (StringUtils.equals(SwitchEntry.CLIENT_PYTHON, type)) { - switchDomain.setPushPythonVersion(version); + tempSwitchDomain.setPushPythonVersion(version); } else if (StringUtils.equals(SwitchEntry.CLIENT_C, type)) { - switchDomain.setPushCVersion(version); + tempSwitchDomain.setPushCVersion(version); } else if (StringUtils.equals(SwitchEntry.CLIENT_GO, type)) { - switchDomain.setPushGoVersion(version); + tempSwitchDomain.setPushGoVersion(version); } else { throw new IllegalArgumentException("unsupported client type: " + type); } @@ -156,7 +178,7 @@ public class SwitchManager implements RecordListener { throw new IllegalArgumentException("min cache time for http or tcp is too small(<10000)"); } - switchDomain.setDefaultPushCacheMillis(cacheMillis); + tempSwitchDomain.setDefaultPushCacheMillis(cacheMillis); } // extremely careful while modifying this, cause it will affect all clients without pushing enabled @@ -167,27 +189,27 @@ public class SwitchManager implements RecordListener { throw new IllegalArgumentException("min default cache time is too small(<1000)"); } - switchDomain.setDefaultCacheMillis(cacheMillis); + tempSwitchDomain.setDefaultCacheMillis(cacheMillis); } if (entry.equals(SwitchEntry.MASTERS)) { List masters = Arrays.asList(value.split(",")); - switchDomain.setMasters(masters); + tempSwitchDomain.setMasters(masters); } if (entry.equals(SwitchEntry.DISTRO)) { boolean enabled = Boolean.parseBoolean(value); - switchDomain.setDistroEnabled(enabled); + tempSwitchDomain.setDistroEnabled(enabled); } if (entry.equals(SwitchEntry.CHECK)) { boolean enabled = Boolean.parseBoolean(value); - switchDomain.setHealthCheckEnabled(enabled); + tempSwitchDomain.setHealthCheckEnabled(enabled); } if (entry.equals(SwitchEntry.PUSH_ENABLED)) { boolean enabled = Boolean.parseBoolean(value); - switchDomain.setPushEnabled(enabled); + tempSwitchDomain.setPushEnabled(enabled); } if (entry.equals(SwitchEntry.SERVICE_STATUS_SYNC_PERIOD)) { @@ -197,7 +219,7 @@ public class SwitchManager implements RecordListener { throw new IllegalArgumentException("serviceStatusSynchronizationPeriodMillis is too small(<5000)"); } - switchDomain.setServiceStatusSynchronizationPeriodMillis(millis); + tempSwitchDomain.setServiceStatusSynchronizationPeriodMillis(millis); } if (entry.equals(SwitchEntry.SERVER_STATUS_SYNC_PERIOD)) { @@ -207,25 +229,25 @@ public class SwitchManager implements RecordListener { throw new IllegalArgumentException("serverStatusSynchronizationPeriodMillis is too small(<15000)"); } - switchDomain.setServerStatusSynchronizationPeriodMillis(millis); + tempSwitchDomain.setServerStatusSynchronizationPeriodMillis(millis); } if (entry.equals(SwitchEntry.HEALTH_CHECK_TIMES)) { int times = Integer.parseInt(value); - switchDomain.setCheckTimes(times); + tempSwitchDomain.setCheckTimes(times); } if (entry.equals(SwitchEntry.DISABLE_ADD_IP)) { boolean disableAddIp = Boolean.parseBoolean(value); - switchDomain.setDisableAddIP(disableAddIp); + tempSwitchDomain.setDisableAddIP(disableAddIp); } if (entry.equals(SwitchEntry.SEND_BEAT_ONLY)) { boolean sendBeatOnly = Boolean.parseBoolean(value); - switchDomain.setSendBeatOnly(sendBeatOnly); + tempSwitchDomain.setSendBeatOnly(sendBeatOnly); } if (entry.equals(SwitchEntry.LIMITED_URL_MAP)) { @@ -253,14 +275,14 @@ public class SwitchManager implements RecordListener { } - switchDomain.setLimitedUrlMap(limitedUrlMap); + tempSwitchDomain.setLimitedUrlMap(limitedUrlMap); } } if (entry.equals(SwitchEntry.ENABLE_STANDALONE)) { if (!StringUtils.isNotEmpty(value)) { - switchDomain.setEnableStandalone(Boolean.parseBoolean(value)); + tempSwitchDomain.setEnableStandalone(Boolean.parseBoolean(value)); } } @@ -269,33 +291,33 @@ public class SwitchManager implements RecordListener { if (Constants.NULL_STRING.equals(status)) { status = StringUtils.EMPTY; } - switchDomain.setOverriddenServerStatus(status); + tempSwitchDomain.setOverriddenServerStatus(status); } if (entry.equals(SwitchEntry.DEFAULT_INSTANCE_EPHEMERAL)) { - switchDomain.setDefaultInstanceEphemeral(Boolean.parseBoolean(value)); + tempSwitchDomain.setDefaultInstanceEphemeral(Boolean.parseBoolean(value)); } if (entry.equals(SwitchEntry.DISTRO_SERVER_EXPIRED_MILLIS)) { - switchDomain.setDistroServerExpiredMillis(Long.parseLong(value)); + tempSwitchDomain.setDistroServerExpiredMillis(Long.parseLong(value)); } if (entry.equals(SwitchEntry.LIGHT_BEAT_ENABLED)) { - switchDomain.setLightBeatEnabled(ConvertUtils.toBoolean(value)); + tempSwitchDomain.setLightBeatEnabled(ConvertUtils.toBoolean(value)); } if (entry.equals(SwitchEntry.AUTO_CHANGE_HEALTH_CHECK_ENABLED)) { - switchDomain.setAutoChangeHealthCheckEnabled(ConvertUtils.toBoolean(value)); + tempSwitchDomain.setAutoChangeHealthCheckEnabled(ConvertUtils.toBoolean(value)); } if (debug) { - update(switchDomain); + update(tempSwitchDomain); } else { - consistencyService.put(KeyBuilder.getSwitchDomainKey(), switchDomain); + updateWithConsistency(tempSwitchDomain); } } finally { - lock.unlock(); + this.requestLock.unlock(); } } @@ -340,27 +362,145 @@ public class SwitchManager implements RecordListener { switchDomain.setLightBeatEnabled(newSwitchDomain.isLightBeatEnabled()); } + private void updateWithConsistency(SwitchDomain tempSwitchDomain) throws NacosException { + try { + final BatchWriteRequest req = new BatchWriteRequest(); + String switchDomainKey = KeyBuilder.getSwitchDomainKey(); + Datum datum = Datum.createDatum(switchDomainKey, tempSwitchDomain); + req.append(ByteUtils.toBytes(switchDomainKey), serializer.serialize(datum)); + WriteRequest operationLog = WriteRequest.newBuilder().setGroup(group()) + .setOperation(OldDataOperation.Write.getDesc()).setData(ByteString.copyFrom(serializer.serialize(req))) + .build(); + protocolManager.getCpProtocol().write(operationLog); + } catch (Exception e) { + Loggers.RAFT.error("Submit switch domain failed: ", e); + throw new NacosException(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage()); + } + } + public SwitchDomain getSwitchDomain() { return switchDomain; } @Override - public boolean interests(String key) { - return KeyBuilder.matchSwitchKey(key); + public List loadSnapshotOperate() { + return Collections.singletonList(snapshotOperation); + } + + /** + * Load Snapshot from snapshot dir. + * + * @param snapshotPath snapshot dir + */ + public void loadSnapshot(String snapshotPath) { + this.raftLock.writeLock().lock(); + try { + File srcDir = Paths.get(snapshotPath).toFile(); + // If snapshot path is non-exist, means snapshot is empty + if (srcDir.exists()) { + // First clean up the local file information, before the file copy + String baseDir = this.dataFile.getParent(); + DiskUtils.deleteDirThenMkdir(baseDir); + File descDir = Paths.get(baseDir).toFile(); + DiskUtils.copyDirectory(srcDir, descDir); + if (!this.dataFile.exists()) { + return; + } + byte[] snapshotData = DiskUtils.readFileBytes(this.dataFile); + final Datum datum = serializer.deserialize(snapshotData, getDatumType()); + final Record value = null != datum ? datum.value : null; + if (!(value instanceof SwitchDomain)) { + return; + } + update((SwitchDomain) value); + } + } catch (IOException e) { + throw new NacosRuntimeException(ErrorCode.IOCopyDirError.getCode(), e); + } finally { + this.raftLock.writeLock().unlock(); + } + } + + /** + * Dump data from data dir to snapshot dir. + * + * @param backupPath snapshot dir + */ + public void dumpSnapshot(String backupPath) { + this.raftLock.writeLock().lock(); + try { + File srcDir = Paths.get(this.dataFile.getParent()).toFile(); + File descDir = Paths.get(backupPath).toFile(); + DiskUtils.copyDirectory(srcDir, descDir); + } catch (IOException e) { + throw new NacosRuntimeException(ErrorCode.IOCopyDirError.getCode(), e); + } finally { + this.raftLock.writeLock().unlock(); + } } @Override - public boolean matchUnlistenKey(String key) { - return KeyBuilder.matchSwitchKey(key); + public Response onRequest(ReadRequest request) { + this.raftLock.readLock().lock(); + try { + final List keys = serializer.deserialize(request.getData().toByteArray(), + TypeUtils.parameterize(List.class, byte[].class)); + if (isNotSwitchDomainKey(keys)) { + return Response.newBuilder().setSuccess(false).setErrMsg("not switch domain key").build(); + } + Datum datum = Datum.createDatum(KeyBuilder.getSwitchDomainKey(), switchDomain); + final BatchReadResponse response = new BatchReadResponse(); + response.append(ByteUtils.toBytes(KeyBuilder.getSwitchDomainKey()), serializer.serialize(datum)); + return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response))) + .build(); + } catch (Exception e) { + Loggers.RAFT.warn("On read switch domain failed, ", e); + return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build(); + } finally { + this.raftLock.readLock().unlock(); + } } @Override - public void onChange(String key, SwitchDomain domain) throws Exception { - update(domain); + public Response onApply(WriteRequest log) { + this.raftLock.writeLock().lock(); + try { + BatchWriteRequest bwRequest = serializer.deserialize(log.getData().toByteArray(), BatchWriteRequest.class); + if (isNotSwitchDomainKey(bwRequest.getKeys())) { + return Response.newBuilder().setSuccess(false).setErrMsg("not switch domain key").build(); + } + final Datum datum = serializer.deserialize(bwRequest.getValues().get(0), getDatumType()); + final Record value = null != datum ? datum.value : null; + if (!(value instanceof SwitchDomain)) { + return Response.newBuilder().setSuccess(false).setErrMsg("datum is not switch domain").build(); + } + DiskUtils.touch(dataFile); + DiskUtils.writeFile(dataFile, bwRequest.getValues().get(0), false); + SwitchDomain switchDomain = (SwitchDomain) value; + update(switchDomain); + return Response.newBuilder().setSuccess(true).build(); + } catch (Exception e) { + Loggers.RAFT.warn("On apply switch domain failed, ", e); + return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build(); + } finally { + this.raftLock.writeLock().unlock(); + } } @Override - public void onDelete(String key) throws Exception { + public String group() { + return com.alibaba.nacos.naming.constants.Constants.NAMING_PERSISTENT_SERVICE_GROUP; + } + private boolean isNotSwitchDomainKey(List keys) { + if (1 != keys.size()) { + return false; + } + String keyString = new String(keys.get(0)); + return !KeyBuilder.getSwitchDomainKey().equals(keyString); + } + + private Type getDatumType() { + return TypeUtils.parameterize(Datum.class, SwitchDomain.class); } } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/cluster/ServerStatusManagerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/cluster/ServerStatusManagerTest.java index 8bfd4f07d..9ebc86603 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/cluster/ServerStatusManagerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/cluster/ServerStatusManagerTest.java @@ -18,27 +18,44 @@ package com.alibaba.nacos.naming.cluster; -import com.alibaba.nacos.naming.consistency.ConsistencyService; +import com.alibaba.nacos.consistency.cp.CPProtocol; +import com.alibaba.nacos.core.distributed.ProtocolManager; +import com.alibaba.nacos.naming.constants.Constants; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.sys.env.EnvUtil; +import com.alipay.sofa.jraft.RouteTable; +import com.alipay.sofa.jraft.entity.PeerId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.mock.env.MockEnvironment; -import java.lang.reflect.Field; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class ServerStatusManagerTest { + @Mock + SwitchDomain switchDomain; + + @Mock + ProtocolManager protocolManager; + + @Mock + CPProtocol cpProtocol; + @BeforeEach void setUp() { EnvUtil.setEnvironment(new MockEnvironment()); @@ -47,7 +64,7 @@ class ServerStatusManagerTest { @Test void testInit() { try (MockedStatic mocked = mockStatic(GlobalExecutor.class)) { - ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class)); + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); serverStatusManager.init(); mocked.verify(() -> GlobalExecutor.registerServerStatusUpdater(any())); } @@ -55,30 +72,24 @@ class ServerStatusManagerTest { @Test void testGetServerStatus() { - ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class)); + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); ServerStatus serverStatus = serverStatusManager.getServerStatus(); assertEquals(ServerStatus.STARTING, serverStatus); } @Test - void testGetErrorMsg() throws NoSuchFieldException, IllegalAccessException { - ServerStatusManager serverStatusManager = new ServerStatusManager(mock(SwitchDomain.class)); - Field field = ServerStatusManager.class.getDeclaredField("consistencyService"); - field.setAccessible(true); - ConsistencyService consistencyService = mock(ConsistencyService.class); - when(consistencyService.getErrorMsg()).thenReturn(Optional.empty()); - field.set(serverStatusManager, consistencyService); + void testGetErrorMsg() { + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); Optional errorMsg = serverStatusManager.getErrorMsg(); - assertFalse(errorMsg.isPresent()); + assertTrue(errorMsg.isPresent()); } @Test void testUpdaterFromSwitch() { - SwitchDomain switchDomain = mock(SwitchDomain.class); String expect = ServerStatus.DOWN.toString(); when(switchDomain.getOverriddenServerStatus()).thenReturn(expect); - ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain); + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater(); //then updater.run(); @@ -88,30 +99,36 @@ class ServerStatusManagerTest { } @Test - void testUpdaterFromConsistency1() throws NoSuchFieldException, IllegalAccessException { - SwitchDomain switchDomain = mock(SwitchDomain.class); - ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain); - Field field = ServerStatusManager.class.getDeclaredField("consistencyService"); - field.setAccessible(true); - ConsistencyService consistencyService = mock(ConsistencyService.class); - when(consistencyService.isAvailable()).thenReturn(true); - field.set(serverStatusManager, consistencyService); + void testUpdaterFromConsistency1() { + try (MockedStatic mocked = mockStatic(RouteTable.class)) { + RouteTable mockTable = mock(RouteTable.class); + when(mockTable.selectLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)).thenReturn(new PeerId()); + mocked.when(RouteTable::getInstance).thenReturn(mockTable); + when(protocolManager.getCpProtocol()).thenReturn(cpProtocol); + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); + ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater(); + //then + updater.run(); + //then + assertEquals(ServerStatus.UP, serverStatusManager.getServerStatus()); + assertFalse(serverStatusManager.getErrorMsg().isPresent()); + } + } + + @Test + void testUpdaterFromConsistency2() { + when(protocolManager.getCpProtocol()).thenReturn(cpProtocol); + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater(); //then updater.run(); //then - assertEquals(ServerStatus.UP, serverStatusManager.getServerStatus()); + assertEquals(ServerStatus.DOWN, serverStatusManager.getServerStatus()); } @Test - void testUpdaterFromConsistency2() throws NoSuchFieldException, IllegalAccessException { - SwitchDomain switchDomain = mock(SwitchDomain.class); - ServerStatusManager serverStatusManager = new ServerStatusManager(switchDomain); - Field field = ServerStatusManager.class.getDeclaredField("consistencyService"); - field.setAccessible(true); - ConsistencyService consistencyService = mock(ConsistencyService.class); - when(consistencyService.isAvailable()).thenReturn(false); - field.set(serverStatusManager, consistencyService); + void testUpdaterFromConsistency3() { + ServerStatusManager serverStatusManager = new ServerStatusManager(protocolManager, switchDomain); ServerStatusManager.ServerStatusUpdater updater = serverStatusManager.new ServerStatusUpdater(); //then updater.run(); diff --git a/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorageTest.java b/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorageTest.java deleted file mode 100644 index 6e4dc4b4d..000000000 --- a/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorageTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.kv.FileKvStorage; -import com.alibaba.nacos.sys.utils.DiskUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.io.IOException; -import java.lang.reflect.Field; - -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -// todo remove this -@MockitoSettings(strictness = Strictness.LENIENT) -class NamingKvStorageTest { - - private final byte[] key = "fileName_test".getBytes(); - - private final String str = "str_test"; - - private NamingKvStorage namingKvStorage; - - @Mock - private FileKvStorage baseDirStorageMock; - - @BeforeEach - void setUp() throws Exception { - namingKvStorage = new NamingKvStorage("baseDir_test"); - - Field baseDirStorageField = NamingKvStorage.class.getDeclaredField("baseDirStorage"); - baseDirStorageField.setAccessible(true); - baseDirStorageField.set(namingKvStorage, baseDirStorageMock); - - when(baseDirStorageMock.get(key)).thenReturn(null); - } - - @AfterEach - void tearDown() throws IOException { - DiskUtils.deleteDirectory("baseDir_test"); - } - - @Test - void testGet() throws KvStorageException { - namingKvStorage.get(key); - verify(baseDirStorageMock).get(key); - } - - @Test - void testPut() throws KvStorageException { - byte[] value = "value_test".getBytes(); - namingKvStorage.put(key, value); - verify(baseDirStorageMock).put(key, value); - } - - @Test - void testDelete() throws KvStorageException { - namingKvStorage.delete(key); - verify(baseDirStorageMock).delete(key); - } - - @Test - void testDoSnapshot() throws KvStorageException { - namingKvStorage.doSnapshot(str); - verify(baseDirStorageMock).doSnapshot(str); - } - - @Test - void testSnapshotLoad() throws KvStorageException { - namingKvStorage.snapshotLoad(str); - verify(baseDirStorageMock).snapshotLoad(str); - } -} diff --git a/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperationTest.java b/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperationTest.java deleted file mode 100644 index 17b7e5e82..000000000 --- a/naming/src/test/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingSnapshotOperationTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.consistency.persistent.impl; - -import com.alibaba.nacos.consistency.Serializer; -import com.alibaba.nacos.consistency.snapshot.Reader; -import com.alibaba.nacos.consistency.snapshot.Writer; -import com.alibaba.nacos.core.distributed.raft.RaftConfig; -import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor; -import com.alibaba.nacos.core.storage.kv.KvStorage; -import com.alibaba.nacos.sys.env.EnvUtil; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.mock.env.MockEnvironment; - -import java.nio.file.Paths; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; - -@ExtendWith(MockitoExtension.class) -class NamingSnapshotOperationTest { - - static { - RaftExecutor.init(new RaftConfig()); - EnvUtil.setEnvironment(new MockEnvironment()); - } - - private final String snapshotDir = Paths.get(EnvUtil.getNacosTmpDir(), "rocks_snapshot_test").toString(); - - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - @Mock - private KvStorage storage; - - private boolean isSnapshoted = false; - - @BeforeEach - void init() throws Exception { - doAnswer(invocationOnMock -> { - isSnapshoted = true; - return null; - }).when(storage).doSnapshot(any(String.class)); - } - - @AfterEach - void after() { - storage.shutdown(); - } - - @Test - void testNamingSnapshot() throws InterruptedException { - AtomicBoolean result = new AtomicBoolean(false); - NamingSnapshotOperation operation = new NamingSnapshotOperation(storage, lock, Mockito.mock(Serializer.class)); - final Writer writer = new Writer(snapshotDir); - final CountDownLatch latch = new CountDownLatch(1); - - operation.onSnapshotSave(writer, (isOk, throwable) -> { - result.set(isOk && throwable == null); - latch.countDown(); - }); - latch.await(10, TimeUnit.SECONDS); - assertTrue(isSnapshoted); - assertTrue(result.get()); - - final Reader reader = new Reader(snapshotDir, writer.listFiles()); - boolean res = operation.onSnapshotLoad(reader); - assertTrue(res); - } - -} diff --git a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/CPInstancesAPI_ITCase.java b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/CPInstancesAPI_ITCase.java index e0970cab5..351454fab 100644 --- a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/CPInstancesAPI_ITCase.java +++ b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/CPInstancesAPI_ITCase.java @@ -29,27 +29,16 @@ import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.web.server.LocalServerPort; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; -import org.springframework.util.MultiValueMap; -import org.springframework.web.util.UriComponentsBuilder; import java.net.URL; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -import static com.alibaba.nacos.test.naming.NamingBase.NAMING_CONTROLLER_PATH; -import static com.alibaba.nacos.test.naming.NamingBase.TEST_GROUP_1; -import static com.alibaba.nacos.test.naming.NamingBase.TEST_NAMESPACE_1; -import static com.alibaba.nacos.test.naming.NamingBase.TEST_NAMESPACE_2; -import static com.alibaba.nacos.test.naming.NamingBase.TEST_PORT2_4_DOM_1; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -58,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ @SpringBootTest(classes = Nacos.class, properties = { "server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) -class CPInstancesAPI_ITCase { +class CPInstancesAPI_ITCase extends NamingBase { private NamingService naming; @@ -69,11 +58,6 @@ class CPInstancesAPI_ITCase { @LocalServerPort private int port; - private URL base; - - @Autowired - private TestRestTemplate restTemplate; - @BeforeEach void setUp() throws Exception { String url = String.format("http://localhost:%d/", port); @@ -90,6 +74,7 @@ class CPInstancesAPI_ITCase { properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE_2); properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1" + ":" + port); naming2 = NamingFactory.createNamingService(properties); + isNamingServerReady(); } @AfterEach @@ -182,13 +167,14 @@ class CPInstancesAPI_ITCase { String serviceName = NamingBase.randomDomainName(); ResponseEntity registerResponse = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance", - Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "11.11.11.11").appendParam("port", "80") - .appendParam("namespaceId", TEST_NAMESPACE_1).done(), String.class, HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", "11.11.11.11") + .appendParam("port", "80").appendParam("namespaceId", TEST_NAMESPACE_1).done(), String.class, + HttpMethod.POST); assertTrue(registerResponse.getStatusCode().is2xxSuccessful()); ResponseEntity deleteServiceResponse = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1).done(), - String.class, HttpMethod.DELETE); + Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1) + .done(), String.class, HttpMethod.DELETE); assertTrue(deleteServiceResponse.getStatusCode().is4xxClientError()); } @@ -210,8 +196,8 @@ class CPInstancesAPI_ITCase { //get service response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1).done(), - String.class); + Params.newParams().appendParam("serviceName", serviceName).appendParam("namespaceId", TEST_NAMESPACE_1) + .done(), String.class); assertTrue(response.getStatusCode().is2xxSuccessful()); @@ -254,8 +240,8 @@ class CPInstancesAPI_ITCase { //get service ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list", - Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(), - String.class); + Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1") + .appendParam("pageSize", "150").done(), String.class); System.out.println("json = " + response.getBody()); assertTrue(response.getStatusCode().is2xxSuccessful()); @@ -337,7 +323,8 @@ class CPInstancesAPI_ITCase { assertEquals(1, json.get("hosts").size()); instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, "33.33.33.33", TEST_PORT2_4_DOM_1); - instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1, "22.22.22.22", TEST_PORT2_4_DOM_1); + instanceDeregister(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1, "22.22.22.22", + TEST_PORT2_4_DOM_1); namingServiceDelete(serviceName, Constants.DEFAULT_NAMESPACE_ID); namingServiceDelete(serviceName, Constants.DEFAULT_NAMESPACE_ID, TEST_GROUP_1); @@ -349,17 +336,19 @@ class CPInstancesAPI_ITCase { private void instanceDeregister(String serviceName, String namespace, String groupName, String ip, String port) { ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance", - Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip).appendParam("port", port) - .appendParam("namespaceId", namespace).appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), - String.class, HttpMethod.DELETE); + Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip) + .appendParam("port", port).appendParam("namespaceId", namespace) + .appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), String.class, + HttpMethod.DELETE); assertTrue(response.getStatusCode().is2xxSuccessful()); } private void instanceRegister(String serviceName, String namespace, String groupName, String ip, String port) { ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/instance", - Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip).appendParam("port", port) - .appendParam("namespaceId", namespace).appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), - String.class, HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("ip", ip) + .appendParam("port", port).appendParam("namespaceId", namespace) + .appendParam("groupName", groupName).appendParam("ephemeral", "false").done(), String.class, + HttpMethod.POST); assertTrue(response.getStatusCode().is2xxSuccessful()); } @@ -374,7 +363,8 @@ class CPInstancesAPI_ITCase { private void namingServiceCreate(String serviceName, String namespace, String groupName) { ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") - .appendParam("namespaceId", namespace).appendParam("groupName", groupName).done(), String.class, HttpMethod.POST); + .appendParam("namespaceId", namespace).appendParam("groupName", groupName).done(), String.class, + HttpMethod.POST); System.out.println(response); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); @@ -393,26 +383,4 @@ class CPInstancesAPI_ITCase { assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); } - - private ResponseEntity request(String path, MultiValueMap params, Class clazz) { - - HttpHeaders headers = new HttpHeaders(); - - HttpEntity entity = new HttpEntity(headers); - - UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(this.base.toString() + path).queryParams(params); - - return this.restTemplate.exchange(builder.toUriString(), HttpMethod.GET, entity, clazz); - } - - private ResponseEntity request(String path, MultiValueMap params, Class clazz, HttpMethod httpMethod) { - - HttpHeaders headers = new HttpHeaders(); - - HttpEntity entity = new HttpEntity(headers); - - UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(this.base.toString() + path).queryParams(params); - - return this.restTemplate.exchange(builder.toUriString(), httpMethod, entity, clazz); - } } diff --git a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/NamingBase.java b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/NamingBase.java index 0a4889b7e..d9d8592c4 100644 --- a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/NamingBase.java +++ b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/NamingBase.java @@ -24,11 +24,14 @@ import com.alibaba.nacos.common.http.client.NacosRestTemplate; import com.alibaba.nacos.common.http.param.Header; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.test.base.HttpClient4Test; +import com.alibaba.nacos.test.base.Params; import org.apache.http.HttpStatus; +import org.springframework.http.ResponseEntity; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -232,4 +235,17 @@ public class NamingBase extends HttpClient4Test { } return contextPath.startsWith("/") ? contextPath : "/" + contextPath; } + + protected void isNamingServerReady() throws InterruptedException { + int retry = 0; + while (retry < 3) { + ResponseEntity response = request("/nacos/v1/ns/operator/metrics", Params.newParams().done(), + String.class); + if (response.getStatusCode().is2xxSuccessful() && response.getBody().contains("UP")) { + break; + } + retry++; + TimeUnit.SECONDS.sleep(5); + } + } } diff --git a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/RestAPI_ITCase.java b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/RestAPI_ITCase.java index 12b35dd05..0ba1a0e2f 100644 --- a/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/RestAPI_ITCase.java +++ b/test/naming-test/src/test/java/com/alibaba/nacos/test/naming/RestAPI_ITCase.java @@ -49,6 +49,7 @@ class RestAPI_ITCase extends NamingBase { void setUp() throws Exception { String url = String.format("http://localhost:%d/", port); this.base = new URL(url); + isNamingServerReady(); //prepareData(); } @@ -85,8 +86,8 @@ class RestAPI_ITCase extends NamingBase { void createService() throws Exception { String serviceName = NamingBase.randomDomainName(); ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class, - HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") + .done(), String.class, HttpMethod.POST); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); @@ -102,14 +103,15 @@ class RestAPI_ITCase extends NamingBase { void getService() throws Exception { String serviceName = NamingBase.randomDomainName(); ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class, - HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") + .done(), String.class, HttpMethod.POST); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); //get service response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") + .done(), String.class); assertTrue(response.getStatusCode().is2xxSuccessful()); @@ -129,8 +131,8 @@ class RestAPI_ITCase extends NamingBase { String serviceName = NamingBase.randomDomainName(); //get service ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list", - Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(), - String.class); + Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1") + .appendParam("pageSize", "150").done(), String.class); assertTrue(response.getStatusCode().is2xxSuccessful()); JsonNode json = JacksonUtils.toObj(response.getBody()); @@ -138,14 +140,14 @@ class RestAPI_ITCase extends NamingBase { assertTrue(count >= 0); response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class, - HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") + .done(), String.class, HttpMethod.POST); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service/list", - Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1").appendParam("pageSize", "150").done(), - String.class); + Params.newParams().appendParam("serviceName", serviceName).appendParam("pageNo", "1") + .appendParam("pageSize", "150").done(), String.class); assertTrue(response.getStatusCode().is2xxSuccessful()); json = JacksonUtils.toObj(response.getBody()); @@ -163,8 +165,8 @@ class RestAPI_ITCase extends NamingBase { void updateService() throws Exception { String serviceName = NamingBase.randomDomainName(); ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.6").done(), String.class, - HttpMethod.POST); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.6") + .done(), String.class, HttpMethod.POST); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody()); @@ -208,8 +210,8 @@ class RestAPI_ITCase extends NamingBase { private void namingServiceDelete(String serviceName) { //delete service ResponseEntity response = request(NamingBase.NAMING_CONTROLLER_PATH + "/service", - Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3").done(), String.class, - HttpMethod.DELETE); + Params.newParams().appendParam("serviceName", serviceName).appendParam("protectThreshold", "0.3") + .done(), String.class, HttpMethod.DELETE); assertTrue(response.getStatusCode().is2xxSuccessful()); assertEquals("ok", response.getBody());