Merge branch 'jraft_naming' of https://github.com/chuntaojun/nacos into jraft_naming

This commit is contained in:
chuntaojun 2020-08-20 23:55:52 +08:00
commit b4953488d3
8 changed files with 387 additions and 119 deletions

View File

@ -28,42 +28,47 @@ public enum ErrorCode {
*/
UnKnowError(40001),
// rocksdb error
// kv error
/**
* rocksdb write error.
* KVStorage write error.
*/
RocksDBWriteError(40100),
KVStorageWriteError(40100),
/**
* rocksdb read error.
* KVStorage read error.
*/
RocksDBReadError(40101),
KVStorageReadError(40101),
/**
* rocksdb delete error.
* KVStorage delete error.
*/
RocksDBDeleteError(40102),
KVStorageDeleteError(40102),
/**
* rocksdb snapshot save error.
* KVStorage snapshot save error.
*/
RocksDBSnapshotSaveError(40103),
KVStorageSnapshotSaveError(40103),
/**
* rocksdb snapshot load error.
* KVStorage snapshot load error.
*/
RocksDBSnapshotLoadError(40104),
KVStorageSnapshotLoadError(40104),
/**
* rocksdb reset error.
* KVStorage reset error.
*/
RocksDBResetError(40105),
KVStorageResetError(40105),
/**
* rocksdb create error.
* KVStorage create error.
*/
RocksDBCreateError(40106),
KVStorageCreateError(40106),
/**
* KVStorage write error.
*/
KVStorageBatchWriteError(40107),
// disk error

View File

@ -24,21 +24,21 @@ import com.alibaba.nacos.api.exception.NacosException;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class RocksStorageException extends NacosException {
public class KVStorageException extends NacosException {
public RocksStorageException() {
public KVStorageException() {
super();
}
public RocksStorageException(int errCode, String errMsg) {
public KVStorageException(int errCode, String errMsg) {
super(errCode, errMsg);
}
public RocksStorageException(int errCode, Throwable throwable) {
public KVStorageException(int errCode, Throwable throwable) {
super(errCode, throwable);
}
public RocksStorageException(int errCode, String errMsg, Throwable throwable) {
public KVStorageException(int errCode, String errMsg, Throwable throwable) {
super(errCode, errMsg, throwable);
}

View File

@ -0,0 +1,93 @@
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KVStorageException;
import com.alibaba.nacos.core.utils.DiskUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* File based KV storage
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class FileKVStorage implements KvStorage {
private String baseDir;
public void init(String path) {
this.baseDir = path;
}
@Override
public byte[] get(byte[] key) throws KVStorageException {
final String fileName = new String(key);
File file = Paths.get(baseDir, fileName).toFile();
if (file.exists()) {
return DiskUtils.readFileBytes(file);
}
return null;
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KVStorageException {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = get(key);
if (val != null) {
result.put(key, val);
}
}
return result;
}
@Override
public void put(byte[] key, byte[] value) throws KVStorageException {
final String fileName = new String(key);
File file = Paths.get(baseDir, fileName).toFile();
try {
DiskUtils.touch(file);
} catch (IOException e) {
throw new KVStorageException(ErrorCode.KVStorageWriteError.getCode(), "create file failed");
}
DiskUtils.writeFile(file, value, false);
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KVStorageException {
if (keys.size() != values.size()) {
throw new KVStorageException(ErrorCode.KVStorageBatchWriteError.getCode(),
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i ++) {
put(keys.get(i), values.get(i));
}
}
@Override
public void delete(byte[] key) throws KVStorageException {
final String fileName = new String(key);
File file = Paths.get(baseDir, fileName).toFile();
if (file.exists()) {
file.delete();
}
}
@Override
public void batchDelete(List<byte[]> keys) throws KVStorageException {
for (byte[] key : keys) {
delete(key);
}
}
@Override
public void shutdown() {
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.exception.KVStorageException;
import java.util.List;
import java.util.Map;
/**
* Universal KV storage interface
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public interface KvStorage {
/**
* get data by key
*
* @param key byte[]
* @return byte[]
* @throws KVStorageException KVStorageException
*/
byte[] get(byte[] key) throws KVStorageException;
/**
* batch get by List byte[].
*
* @param keys List byte[]
* @return Map byte[], byte[]
* @throws KVStorageException RocksStorageException
*/
Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KVStorageException;
/**
* write data.
*
* @param key byte[]
* @param value byte[]
* @throws KVStorageException RocksStorageException
*/
void put(byte[] key, byte[] value) throws KVStorageException;
/**
* batch write.
*
* @param key List byte[]
* @param values List byte[]
* @throws KVStorageException RocksStorageException
*/
void batchPut(List<byte[]> key, List<byte[]> values) throws KVStorageException;
/**
* delete with key.
*
* @param key byte[]
* @throws KVStorageException RocksStorageException
*/
void delete(byte[] key) throws KVStorageException;
/**
* batch delete with keys.
*
* @param key List byte[]
* @throws KVStorageException RocksStorageException
*/
void batchDelete(List<byte[]> key) throws KVStorageException;
/**
* shutdown.
*/
void shutdown();
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.storage;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KVStorageException;
import com.alipay.sofa.jraft.util.BytesUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class MemoryKVStorage implements KvStorage {
private final Map<Key, byte[]> storage = new ConcurrentSkipListMap<Key, byte[]>();
@Override
public byte[] get(byte[] key) throws KVStorageException {
return storage.get(new Key(key));
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KVStorageException {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = storage.get(new Key(key));
if (val != null) {
result.put(key, val);
}
}
return result;
}
@Override
public void put(byte[] key, byte[] value) throws KVStorageException {
storage.put(new Key(key), value);
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KVStorageException {
if (keys.size() != values.size()) {
throw new KVStorageException(ErrorCode.KVStorageBatchWriteError.getCode(),
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i ++) {
storage.put(new Key(keys.get(i)), values.get(i));
}
}
@Override
public void delete(byte[] key) throws KVStorageException {
storage.remove(new Key(key));
}
@Override
public void batchDelete(List<byte[]> keys) throws KVStorageException {
for (byte[] key : keys) {
storage.remove(new Key(key));
}
}
@Override
public void shutdown() {
storage.clear();
}
private static class Key implements Comparable<byte[]> {
private final byte[] origin;
private Key(byte[] origin) {
this.origin = origin;
}
@Override
public int compareTo(byte[] o) {
return BytesUtil.compare(origin, o);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key key = (Key) o;
return Arrays.equals(origin, key.origin);
}
@Override
public int hashCode() {
return Arrays.hashCode(origin);
}
}
}

View File

@ -21,7 +21,7 @@ import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.RocksStorageException;
import com.alibaba.nacos.core.exception.KVStorageException;
import com.alibaba.nacos.core.utils.DiskUtils;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
@ -53,7 +53,7 @@ import java.util.stream.Collectors;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class RocksStorage {
public class RocksStorage implements KvStorage {
private String group;
@ -81,7 +81,7 @@ public final class RocksStorage {
/**
* create rocksdb storage with default operation.
*
* @param group group
* @param group group
* @param baseDir base dir
* @return {@link RocksStorage}
*/
@ -93,10 +93,10 @@ public final class RocksStorage {
/**
* create rocksdb storage and set customer operation.
*
* @param group group
* @param baseDir base dir
* @param group group
* @param baseDir base dir
* @param writeOptions {@link WriteOptions}
* @param readOptions {@link ReadOptions}
* @param readOptions {@link ReadOptions}
* @return {@link RocksStorage}
*/
public static RocksStorage createCustomer(final String group, String baseDir, WriteOptions writeOptions,
@ -116,26 +116,26 @@ public final class RocksStorage {
/**
* destroy old rocksdb and open new one.
*
* @throws RocksStorageException RocksStorageException
* @throws KVStorageException RocksStorageException
*/
public void destroyAndOpenNew() throws RocksStorageException {
public void destroyAndOpenNew() throws KVStorageException {
try (final Options options = new Options()) {
RocksDB.destroyDB(dbPath, options);
createRocksDB(dbPath, group, writeOptions, readOptions, this);
} catch (RocksDBException ex) {
Status status = ex.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBResetError, status);
throw createRocksStorageException(ErrorCode.KVStorageResetError, status);
}
}
/**
* create rocksdb.
*
* @param baseDir base dir
* @param group group
* @param baseDir base dir
* @param group group
* @param writeOptions {@link WriteOptions}
* @param readOptions {@link ReadOptions}
* @param storage {@link RocksStorage}
* @param readOptions {@link ReadOptions}
* @param storage {@link RocksStorage}
*/
private static void createRocksDB(final String baseDir, final String group, WriteOptions writeOptions,
ReadOptions readOptions, final RocksStorage storage) {
@ -156,34 +156,22 @@ public final class RocksStorage {
storage.db = RocksDB.open(options, baseDir, columnFamilyDescriptors, columnFamilyHandles);
storage.defaultHandle = columnFamilyHandles.get(0);
} catch (RocksDBException e) {
throw new NacosRuntimeException(ErrorCode.RocksDBCreateError.getCode(), e);
throw new NacosRuntimeException(ErrorCode.KVStorageCreateError.getCode(), e);
}
}
/**
* write data.
*
* @param key byte[]
* @param value byte[]
* @throws RocksStorageException RocksStorageException
*/
public void write(byte[] key, byte[] value) throws RocksStorageException {
@Override
public void put(byte[] key, byte[] value) throws KVStorageException {
try {
this.db.put(defaultHandle, writeOptions, key, value);
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBWriteError, status);
throw createRocksStorageException(ErrorCode.KVStorageWriteError, status);
}
}
/**
* batch write.
*
* @param key List byte[]
* @param values List byte[]
* @throws RocksStorageException RocksStorageException
*/
public void batchWrite(List<byte[]> key, List<byte[]> values) throws RocksStorageException {
@Override
public void batchPut(List<byte[]> key, List<byte[]> values) throws KVStorageException {
if (key.size() != values.size()) {
throw new IllegalArgumentException("key size and values size must be equals!");
}
@ -194,81 +182,71 @@ public final class RocksStorage {
db.write(writeOptions, batch);
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBWriteError, status);
throw createRocksStorageException(ErrorCode.KVStorageWriteError, status);
}
}
/**
* get data by byte[].
*
* @param key byte[]
* @return result byte[]
* @throws RocksStorageException RocksStorageException
*/
public byte[] get(byte[] key) throws RocksStorageException {
@Override
public byte[] get(byte[] key) throws KVStorageException {
try {
return db.get(defaultHandle, readOptions, key);
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBReadError, status);
throw createRocksStorageException(ErrorCode.KVStorageReadError, status);
}
}
/**
* batch get by List byte[].
*
* @param key List byte[]
* @return Map byte[], byte[]
* @throws RocksStorageException RocksStorageException
*/
public Map<byte[], byte[]> batchGet(List<byte[]> key) throws RocksStorageException {
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KVStorageException {
try {
return db.multiGet(readOptions, key);
return db.multiGet(readOptions, keys);
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBReadError, status);
throw createRocksStorageException(ErrorCode.KVStorageReadError, status);
}
}
/**
* delete with key.
*
* @param key byte[]
* @throws RocksStorageException RocksStorageException
*/
public void delete(byte[] key) throws RocksStorageException {
@Override
public void delete(byte[] key) throws KVStorageException {
try {
db.delete(defaultHandle, writeOptions, key);
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBDeleteError, status);
throw createRocksStorageException(ErrorCode.KVStorageDeleteError, status);
}
}
/**
* batch delete with keys.
*
* @param key List byte[]
* @throws RocksStorageException RocksStorageException
*/
public void batchDelete(List<byte[]> key) throws RocksStorageException {
@Override
public void batchDelete(List<byte[]> key) throws KVStorageException {
try {
for (byte[] k : key) {
db.delete(defaultHandle, writeOptions, k);
}
} catch (RocksDBException e) {
Status status = e.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBDeleteError, status);
throw createRocksStorageException(ErrorCode.KVStorageDeleteError, status);
}
}
@Override
public void shutdown() {
this.defaultHandle.close();
this.db.close();
for (final ColumnFamilyOptions opt : this.cfOptions) {
opt.close();
}
this.options.close();
this.writeOptions.close();
this.readOptions.close();
}
/**
* do snapshot save operation.
*
* @param backupPath backup path
* @throws RocksStorageException RocksStorageException
* @throws KVStorageException RocksStorageException
*/
public void snapshotSave(final String backupPath) throws RocksStorageException {
public void snapshotSave(final String backupPath) throws KVStorageException {
final String path = Paths.get(backupPath, group).toString();
Throwable ex = DiskUtils.forceMkdir(path, (aVoid, ioe) -> {
BackupableDBOptions backupOpt = new BackupableDBOptions(path).setSync(true).setShareTableFiles(false);
@ -284,13 +262,13 @@ public final class RocksStorage {
return null;
} catch (RocksDBException e) {
Status status = e.getStatus();
return createRocksStorageException(ErrorCode.RocksDBSnapshotSaveError, status);
return createRocksStorageException(ErrorCode.KVStorageSnapshotSaveError, status);
} catch (Throwable throwable) {
return throwable;
}
});
if (ex != null) {
throw new RocksStorageException(ErrorCode.UnKnowError.getCode(), ex);
throw new KVStorageException(ErrorCode.UnKnowError.getCode(), ex);
}
}
@ -298,9 +276,9 @@ public final class RocksStorage {
* do snapshot load operation.
*
* @param backupPath backup path
* @throws RocksStorageException RocksStorageException
* @throws KVStorageException RocksStorageException
*/
public void snapshotLoad(final String backupPath) throws RocksStorageException {
public void snapshotLoad(final String backupPath) throws KVStorageException {
try {
final String path = Paths.get(backupPath, group).toString();
final File file = Paths.get(path, "meta_snapshot").toFile();
@ -316,28 +294,14 @@ public final class RocksStorage {
backupEngine.restoreDbFromBackup(info.getBackupId(), dbPath, dbOptions.walDir(), options);
} catch (RocksDBException ex) {
Status status = ex.getStatus();
throw createRocksStorageException(ErrorCode.RocksDBSnapshotLoadError, status);
throw createRocksStorageException(ErrorCode.KVStorageSnapshotLoadError, status);
} catch (Throwable ex) {
throw new RocksStorageException(ErrorCode.UnKnowError.getCode(), ex);
throw new KVStorageException(ErrorCode.UnKnowError.getCode(), ex);
}
}
/**
* shutdown.
*/
public void shutdown() {
this.defaultHandle.close();
this.db.close();
for (final ColumnFamilyOptions opt : this.cfOptions) {
opt.close();
}
this.options.close();
this.writeOptions.close();
this.readOptions.close();
}
private static RocksStorageException createRocksStorageException(ErrorCode code, Status status) {
RocksStorageException exception = new RocksStorageException();
private static KVStorageException createRocksStorageException(ErrorCode code, Status status) {
KVStorageException exception = new KVStorageException();
exception.setErrCode(code.getCode());
exception.setErrMsg(String.format("RocksDB error msg : code=%s, subCode=%s, state=%s", status.getCode(),
status.getSubCode(), status.getState()));

View File

@ -45,14 +45,14 @@ public class RocksStorageTest {
@Test
public void testCreateRocksStorage() throws Throwable {
RocksStorage storage = RocksStorage.createDefault("test", DIR);
storage.write("liaochuntao".getBytes(), "liaochuntao".getBytes());
storage.put("liaochuntao".getBytes(), "liaochuntao".getBytes());
}
@Test
public void testRocksStorageSnapshotSave() throws Throwable {
try {
RocksStorage storage = RocksStorage.createDefault("test", DIR);
storage.write("liaochuntao".getBytes(), "liaochuntao".getBytes());
storage.put("liaochuntao".getBytes(), "liaochuntao".getBytes());
storage.snapshotSave(Paths.get(DIR, "snapshot").toString());
} catch (Throwable ex) {
Assert.fail(ex.getMessage());
@ -62,7 +62,7 @@ public class RocksStorageTest {
@Test
public void testRocksStorageSnapshotLoad() throws Throwable {
RocksStorage storage = RocksStorage.createDefault("test", DIR);
storage.write("liaochuntao".getBytes(), "liaochuntao".getBytes());
storage.put("liaochuntao".getBytes(), "liaochuntao".getBytes());
storage.snapshotSave(Paths.get(DIR, "snapshot").toString());
storage.shutdown();
ThreadUtils.sleep(5_000L);

View File

@ -35,7 +35,7 @@ import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.raft.RaftConfig;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.RocksStorageException;
import com.alibaba.nacos.core.exception.KVStorageException;
import com.alibaba.nacos.core.storage.RocksStorage;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
@ -134,7 +134,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
try {
byte[] data = rocksStorage.get(ByteUtils.toBytes(key));
return serializer.deserialize(data);
} catch (RocksStorageException ex) {
} catch (KVStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
}
});
@ -169,7 +169,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
final Map<byte[], byte[]> result = rocksStorage.batchGet(keys);
return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(result)))
.build();
} catch (RocksStorageException e) {
} catch (KVStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();
@ -186,7 +186,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
try {
switch (op) {
case Write:
rocksStorage.batchWrite(request.getKeys(), request.getValues());
rocksStorage.batchPut(request.getKeys(), request.getValues());
break;
case Delete:
rocksStorage.batchDelete(request.getKeys());
@ -195,7 +195,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build();
}
return Response.newBuilder().setSuccess(true).build();
} catch (RocksStorageException e) {
} catch (KVStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();