Merge branch 'jraft-naming' of https://github.com/KomachiSion/nacos into jraft_naming

This commit is contained in:
chuntaojun 2020-09-30 14:24:27 +08:00
commit c442a71a5f
13 changed files with 389 additions and 169 deletions

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.storage.kv; package com.alibaba.nacos.core.storage.kv;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException; import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.utils.DiskUtils; import com.alibaba.nacos.core.utils.DiskUtils;
@ -24,6 +25,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -174,7 +176,20 @@ public class FileKvStorage implements KvStorage {
} }
@Override @Override
public void shutdown() { public List<byte[]> allKeys() throws KvStorageException {
List<byte[]> result = new LinkedList<>();
File[] files = new File(baseDir).listFiles();
if (null != files) {
for (File each : files) {
if (each.isFile()) {
result.add(ByteUtils.toBytes(each.getName()));
}
}
}
return result;
}
@Override
public void shutdown() {
} }
} }

View File

@ -114,6 +114,14 @@ public interface KvStorage {
*/ */
void snapshotLoad(String path) throws KvStorageException; void snapshotLoad(String path) throws KvStorageException;
/**
* Get all keys.
*
* @return all keys
* @throws KvStorageException KVStorageException
*/
List<byte[]> allKeys() throws KvStorageException;
/** /**
* shutdown. * shutdown.
*/ */

View File

@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.util.BytesUtil;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -91,12 +92,21 @@ public class MemoryKvStorage implements KvStorage {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public List<byte[]> allKeys() throws KvStorageException {
List<byte[]> result = new LinkedList<>();
for (Key each : storage.keySet()) {
result.add(each.origin);
}
return result;
}
@Override @Override
public void shutdown() { public void shutdown() {
storage.clear(); storage.clear();
} }
private static class Key implements Comparable<byte[]> { private static class Key implements Comparable<Key> {
private final byte[] origin; private final byte[] origin;
@ -105,8 +115,8 @@ public class MemoryKvStorage implements KvStorage {
} }
@Override @Override
public int compareTo(byte[] o) { public int compareTo(Key o) {
return BytesUtil.compare(origin, o); return BytesUtil.compare(origin, o.origin);
} }
@Override @Override

View File

@ -19,6 +19,8 @@ package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_PREFIX;
/** /**
* Key operations for data. * Key operations for data.
* *
@ -81,8 +83,7 @@ public class KeyBuilder {
} }
public static boolean matchSwitchKey(String key) { public static boolean matchSwitchKey(String key) {
return key.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME) || key return key.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME);
.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME + UtilsAndCommons.RAFT_CACHE_FILE_SUFFIX);
} }
public static boolean matchServiceName(String key, String namespaceId, String serviceName) { public static boolean matchServiceName(String key, String namespaceId, String serviceName) {
@ -141,4 +142,8 @@ public class KeyBuilder {
public static String getServiceName(String key) { public static String getServiceName(String key) {
return key.split(NAMESPACE_KEY_CONNECTOR)[1]; return key.split(NAMESPACE_KEY_CONNECTOR)[1];
} }
public static boolean isDatumCacheFile(String key) {
return key.startsWith(RAFT_CACHE_FILE_PREFIX);
}
} }

View File

@ -48,7 +48,7 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
/** /**
* register listener with key. * register listener with key.
* *
* @param key key * @param key key
* @param listener {@link RecordListener} * @param listener {@link RecordListener}
*/ */
public void registerListener(final String key, final RecordListener listener) { public void registerListener(final String key, final RecordListener listener) {
@ -59,7 +59,7 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
/** /**
* deregister listener by key. * deregister listener by key.
* *
* @param key key * @param key key
* @param listener {@link RecordListener} * @param listener {@link RecordListener}
*/ */
public void deregisterListener(final String key, final RecordListener listener) { public void deregisterListener(final String key, final RecordListener listener) {
@ -70,12 +70,25 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
} }
/** /**
* notify value to listener with {@link DataOperation} and key. * deregister all listener by key.
* *
* @param key key * @param key key
*/
public void deregisterAllListener(final String key) {
listenerMap.remove(key);
}
public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
return listenerMap;
}
/**
* notify value to listener with {@link DataOperation} and key.
*
* @param key key
* @param action {@link DataOperation} * @param action {@link DataOperation}
* @param value value * @param value value
* @param <T> type * @param <T> type
*/ */
public <T extends Record> void notify(final String key, final DataOperation action, final T value) { public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
@ -89,18 +102,16 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
listener.onDelete(key); listener.onDelete(key);
} }
} catch (Throwable e) { } catch (Throwable e) {
Loggers.RAFT Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
.error("[NACOS-RAFT] error while notifying listener of key: {}", key,
e);
} }
} }
} }
} }
if (!listenerMap.containsKey(key)) { if (!listenerMap.containsKey(key)) {
return; return;
} }
for (RecordListener listener : listenerMap.get(key)) { for (RecordListener listener : listenerMap.get(key)) {
try { try {
if (action == DataOperation.CHANGE) { if (action == DataOperation.CHANGE) {

View File

@ -0,0 +1,210 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.StorageFactory;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.storage.kv.MemoryKvStorage;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.misc.Loggers;
import java.io.File;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Kv storage implementation for naming.
*
* @author xiweng.yy
*/
public class NamingKvStorage extends MemoryKvStorage {
private final String baseDir;
private final KvStorage baseDirStorage;
private final Map<String, KvStorage> namespaceKvStorage;
public NamingKvStorage(final String baseDir) throws Exception {
this.baseDir = baseDir;
baseDirStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", baseDir);
namespaceKvStorage = new ConcurrentHashMap<>();
}
@Override
public byte[] get(byte[] key) throws KvStorageException {
byte[] result = super.get(key);
if (null == result) {
try {
KvStorage storage = createActualStorageIfAbsent(key);
result = null == storage ? null : storage.get(key);
if (null != result) {
super.put(key, result);
}
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(),
"Get data failed, key: " + new String(key), e);
}
}
return result;
}
@Override
public Map<byte[], byte[]> batchGet(List<byte[]> keys) throws KvStorageException {
Map<byte[], byte[]> result = new HashMap<>(keys.size());
for (byte[] key : keys) {
byte[] val = get(key);
if (val != null) {
result.put(key, val);
}
}
return result;
}
@Override
public void put(byte[] key, byte[] value) throws KvStorageException {
try {
KvStorage storage = createActualStorageIfAbsent(key);
storage.put(key, value);
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(),
"Put data failed, key: " + new String(key), e);
}
// after actual storage put success, put it in memory, memory put should success all the time
super.put(key, value);
}
@Override
public void batchPut(List<byte[]> keys, List<byte[]> values) throws KvStorageException {
if (keys.size() != values.size()) {
throw new KvStorageException(ErrorCode.KVStorageBatchWriteError,
"key's size must be equal to value's size");
}
int size = keys.size();
for (int i = 0; i < size; i++) {
put(keys.get(i), values.get(i));
}
}
@Override
public void delete(byte[] key) throws KvStorageException {
try {
KvStorage storage = createActualStorageIfAbsent(key);
if (null != storage) {
storage.delete(key);
}
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageDeleteError.getCode(),
"Delete data failed, key: " + new String(key), e);
}
// after actual storage delete success, put it in memory, memory delete should success all the time
super.delete(key);
}
@Override
public void batchDelete(List<byte[]> key) throws KvStorageException {
for (byte[] each : key) {
delete(each);
}
}
@Override
public void doSnapshot(String backupPath) throws KvStorageException {
baseDirStorage.doSnapshot(backupPath);
}
@Override
public void snapshotLoad(String path) throws KvStorageException {
final long startTime = System.currentTimeMillis();
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
loadNamespaceSnapshot();
long costTime = System.currentTimeMillis() - startTime;
Loggers.RAFT.info("load snapshot cost time {}ms", costTime);
}
private void loadSnapshotFromActualStorage(KvStorage actualStorage) throws KvStorageException {
for (byte[] each : actualStorage.allKeys()) {
byte[] datum = actualStorage.get(each);
super.put(each, datum);
}
}
private void loadNamespaceSnapshot() {
for (String each : getAllNamespaceDirs()) {
try {
KvStorage kvStorage = createActualStorageIfAbsent(each);
loadSnapshotFromActualStorage(kvStorage);
} catch (Exception e) {
Loggers.RAFT.error("load snapshot for namespace {} failed", each, e);
}
}
}
private List<String> getAllNamespaceDirs() {
List<String> result = new LinkedList<>();
File[] files = new File(baseDir).listFiles();
if (null != files) {
for (File each : files) {
if (each.isDirectory()) {
result.add(each.getName());
}
}
}
return result;
}
@Override
public List<byte[]> allKeys() throws KvStorageException {
return super.allKeys();
}
@Override
public void shutdown() {
baseDirStorage.shutdown();
for (KvStorage each : namespaceKvStorage.values()) {
each.shutdown();
}
namespaceKvStorage.clear();
super.shutdown();
}
private KvStorage createActualStorageIfAbsent(byte[] key) throws Exception {
String keyString = new String(key);
String namespace = KeyBuilder.getNamespace(keyString);
return createActualStorageIfAbsent(namespace);
}
private KvStorage createActualStorageIfAbsent(String namespace) throws Exception {
if (StringUtils.isBlank(namespace)) {
return baseDirStorage;
}
if (!namespaceKvStorage.containsKey(namespace)) {
String namespacePath = Paths.get(baseDir, namespace).toString();
namespaceKvStorage.putIfAbsent(namespace,
StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", namespacePath));
}
return namespaceKvStorage.get(namespace);
}
}

View File

@ -34,7 +34,6 @@ import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.ProtocolManager; import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException; import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.StorageFactory;
import com.alibaba.nacos.core.storage.kv.KvStorage; import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
@ -44,7 +43,6 @@ import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftStore;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.pojo.Record;
@ -53,13 +51,13 @@ import com.google.protobuf.ByteString;
import org.apache.commons.lang3.reflect.TypeUtils; import org.apache.commons.lang3.reflect.TypeUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -99,8 +97,6 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
private final KvStorage kvStorage; private final KvStorage kvStorage;
private final RaftStore oldStore;
private final ClusterVersionJudgement versionJudgement; private final ClusterVersionJudgement versionJudgement;
private final Serializer serializer; private final Serializer serializer;
@ -124,18 +120,22 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
*/ */
private volatile boolean hasError = false; private volatile boolean hasError = false;
/**
* If use old raft, should not notify listener even new listener add.
*/
private volatile boolean startNotify = false;
public PersistentServiceProcessor(final ProtocolManager protocolManager, public PersistentServiceProcessor(final ProtocolManager protocolManager,
final ClusterVersionJudgement versionJudgement, final RaftStore oldStore) throws Exception { final ClusterVersionJudgement versionJudgement) throws Exception {
this.protocol = protocolManager.getCpProtocol(); this.protocol = protocolManager.getCpProtocol();
this.oldStore = oldStore;
this.versionJudgement = versionJudgement; this.versionJudgement = versionJudgement;
this.kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
Paths.get(UtilsAndCommons.DATA_BASE_DIR, "persistent").toString());
this.serializer = SerializeFactory.getSerializer("JSON"); this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> { this.notifier = new PersistentNotifier(key -> {
try { try {
byte[] data = kvStorage.get(ByteUtils.toBytes(key)); byte[] data = kvStorage.get(ByteUtils.toBytes(key));
return serializer.deserialize(data, getClassForDeserialize(key)); Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key));
return null != datum ? datum.value : null;
} catch (KvStorageException ex) { } catch (KvStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg()); throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
} }
@ -153,16 +153,28 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
// If you choose to use the new RAFT protocol directly, there will be no compatible logical execution // If you choose to use the new RAFT protocol directly, there will be no compatible logical execution
if (ApplicationUtils.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) { if (ApplicationUtils.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) {
NotifyCenter.registerSubscriber(notifier); NotifyCenter.registerSubscriber(notifier);
waitLeader();
startNotify = true;
} else { } else {
this.versionJudgement.registerObserver(isNewVersion -> { this.versionJudgement.registerObserver(isNewVersion -> {
if (isNewVersion) { if (isNewVersion) {
loadFromOldData();
NotifyCenter.registerSubscriber(notifier); NotifyCenter.registerSubscriber(notifier);
startNotify = true;
} }
}, 10); }, 10);
} }
} }
private void waitLeader() {
while (!hasLeader && !hasError) {
Loggers.RAFT.info("Waiting Jraft leader vote ...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
}
}
}
@Override @Override
public Response onRequest(GetRequest request) { public Response onRequest(GetRequest request) {
final List<byte[]> keys = serializer final List<byte[]> keys = serializer
@ -214,10 +226,15 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
final List<byte[]> values = request.getValues(); final List<byte[]> values = request.getValues();
for (int i = 0; i < keys.size(); i++) { for (int i = 0; i < keys.size(); i++) {
final String key = new String(keys.get(i)); final String key = new String(keys.get(i));
final Record value = serializer.deserialize(values.get(i), getClassForDeserialize(key)); final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key));
final Record value = null != datum ? datum.value : null;
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event); NotifyCenter.publishEvent(event);
// remove listeners of key to avoid mem leak
if (Op.Delete.equals(op)) {
notifier.deregisterAllListener(key);
}
} }
} }
@ -231,62 +248,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock)); return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock));
} }
/**
* Pull old data into the new data store. When loading old data information, write locks must be added, and new
* requests can be processed only after the old data has been loaded
*/
@SuppressWarnings("unchecked")
public void loadFromOldData() {
final Lock lock = this.lock.writeLock();
lock.lock();
Loggers.RAFT.warn("start to load data to new raft protocol!!!");
try {
if (protocol.isLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)) {
Map<String, Datum> datumMap = new HashMap<>(64);
oldStore.loadDatums(null, datumMap);
int totalSize = datumMap.size();
List<byte[]> keys = new ArrayList<>(totalSize);
List<byte[]> values = new ArrayList<>(totalSize);
int batchSize = 100;
List<CompletableFuture> futures = new ArrayList<>(16);
for (Map.Entry<String, Datum> entry : datumMap.entrySet()) {
totalSize--;
keys.add(ByteUtils.toBytes(entry.getKey()));
values.add(serializer.serialize(entry.getValue().value));
if (keys.size() == batchSize || totalSize == 0) {
BatchWriteRequest request = new BatchWriteRequest();
request.setKeys(keys);
request.setValues(values);
CompletableFuture future = protocol.submitAsync(
Log.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setOperation(Op.Write.name())
.setData(ByteString.copyFrom(serializer.serialize(request))).build())
.whenComplete(((response, throwable) -> {
if (throwable == null) {
Loggers.RAFT.info("submit old raft data result : {}", response);
} else {
Loggers.RAFT.error("submit old raft data occur exception : {}", throwable);
}
}));
futures.add(future);
keys.clear();
values.clear();
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
} catch (Throwable ex) {
hasError = true;
Loggers.RAFT.error("load old raft data occur exception : {}", ex);
} finally {
lock.unlock();
}
}
@Override @Override
public void put(String key, Record value) throws NacosException { public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest(); final BatchWriteRequest req = new BatchWriteRequest();
req.append(ByteUtils.toBytes(key), serializer.serialize(value)); Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final Log log = Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) final Log log = Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try { try {
@ -321,9 +287,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
BatchReadResponse response = serializer BatchReadResponse response = serializer
.deserialize(resp.getData().toByteArray(), BatchReadResponse.class); .deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
final List<byte[]> rValues = response.getValues(); final List<byte[]> rValues = response.getValues();
Record record = return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getClassForDeserialize(key));
return Datum.createDatum(key, record);
} }
throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg()); throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
} catch (Throwable e) { } catch (Throwable e) {
@ -334,6 +298,9 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
@Override @Override
public void listen(String key, RecordListener listener) throws NacosException { public void listen(String key, RecordListener listener) throws NacosException {
notifier.registerListener(key, listener); notifier.registerListener(key, listener);
if (startNotify) {
notifierDatumIfAbsent(key, listener);
}
} }
@Override @Override
@ -352,7 +319,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
return hasLeader && !hasError; return hasLeader && !hasError;
} }
private Class<? extends Record> getClassForDeserialize(String key) { private Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
private Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) { if (KeyBuilder.matchSwitchKey(key)) {
return com.alibaba.nacos.naming.misc.SwitchDomain.class; return com.alibaba.nacos.naming.misc.SwitchDomain.class;
} else if (KeyBuilder.matchServiceMetaKey(key)) { } else if (KeyBuilder.matchServiceMetaKey(key)) {
@ -362,4 +333,38 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
} }
return Record.class; return Record.class;
} }
private void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException {
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
notifierAllServiceMeta(listener);
} else {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
/**
* This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()}
*/
private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
for (byte[] each : kvStorage.allKeys()) {
String key = new String(each);
if (listener.interests(key)) {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
}
private void notifierDatum(String key, Datum datum, RecordListener listener) {
try {
listener.onChange(key, datum.value);
} catch (Exception e) {
Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e);
}
}
} }

View File

@ -117,7 +117,6 @@ public class RaftConsistencyServiceImpl implements PersistentConsistencyService
@Override @Override
public void listen(String key, RecordListener listener) throws NacosException { public void listen(String key, RecordListener listener) throws NacosException {
checkIsStopWork();
raftCore.listen(key, listener); raftCore.listen(key, listener);
} }

View File

@ -21,8 +21,8 @@ import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.EventPublisher; import com.alibaba.nacos.common.notify.EventPublisher;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
@ -69,7 +69,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -80,8 +79,8 @@ import java.util.zip.GZIPOutputStream;
/** /**
* Raft core code. * Raft core code.
* *
* @deprecated will remove in 1.4.x
* @author nacos * @author nacos
* @deprecated will remove in 1.4.x
*/ */
@Deprecated @Deprecated
@DependsOn("ProtocolManager") @DependsOn("ProtocolManager")
@ -164,20 +163,13 @@ public class RaftCore implements Closeable {
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
while (true) {
if (publisher.currentEventSize() <= 0) {
break;
}
ThreadUtils.sleep(1000L);
}
initialized = true; initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
versionJudgement.registerObserver(isAllNewVersion -> { versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion; stopWork = isAllNewVersion;
if (stopWork) { if (stopWork) {
@ -188,15 +180,15 @@ public class RaftCore implements Closeable {
} }
} }
}, 100); }, 100);
NotifyCenter.registerSubscriber(notifier); NotifyCenter.registerSubscriber(notifier);
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
} }
public Map<String, List<RecordListener>> getListeners() { public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
return listeners; return notifier.getListeners();
} }
/** /**
@ -222,7 +214,7 @@ public class RaftCore implements Closeable {
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return; return;
} }
OPERATE_LOCK.lock(); OPERATE_LOCK.lock();
try { try {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
@ -457,6 +449,8 @@ public class RaftCore implements Closeable {
masterTask.cancel(true); masterTask.cancel(true);
Loggers.RAFT.warn("stop old raft protocol task for heartbeat task"); Loggers.RAFT.warn("stop old raft protocol task for heartbeat task");
heartbeatTask.cancel(true); heartbeatTask.cancel(true);
Loggers.RAFT.warn("clean old cache datum for old raft");
datums.clear();
} }
public class MasterElection implements Runnable { public class MasterElection implements Runnable {
@ -470,18 +464,18 @@ public class RaftCore implements Closeable {
if (!peers.isReady()) { if (!peers.isReady()) {
return; return;
} }
RaftPeer local = peers.local(); RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) { if (local.leaderDueMs > 0) {
return; return;
} }
// reset timeout // reset timeout
local.resetLeaderDue(); local.resetLeaderDue();
local.resetHeartbeatDue(); local.resetHeartbeatDue();
sendVote(); sendVote();
} catch (Exception e) { } catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e); Loggers.RAFT.warn("[RAFT] error while master election {}", e);
@ -855,8 +849,10 @@ public class RaftCore implements Closeable {
raftStore.write(newDatum); raftStore.write(newDatum);
datums.put(newDatum.key, newDatum); datums.put(newDatum.key, newDatum);
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(newDatum.key).action(DataOperation.CHANGE).build()); NotifyCenter.publishEvent(
ValueChangeEvent.builder().key(newDatum.key).action(DataOperation.CHANGE)
.build());
local.resetLeaderDue(); local.resetLeaderDue();
@ -920,21 +916,9 @@ public class RaftCore implements Closeable {
* @param listener new listener * @param listener new listener
*/ */
public void listen(String key, RecordListener listener) { public void listen(String key, RecordListener listener) {
notifier.registerListener(key, listener);
List<RecordListener> listenerList = listeners.get(key);
if (listenerList != null && listenerList.contains(listener)) {
return;
}
if (listenerList == null) {
listenerList = new CopyOnWriteArrayList<>();
listeners.put(key, listenerList);
}
Loggers.RAFT.info("add listener: {}", key); Loggers.RAFT.info("add listener: {}", key);
listenerList.add(listener);
// if data present, notify immediately // if data present, notify immediately
for (Datum datum : datums.values()) { for (Datum datum : datums.values()) {
if (!listener.interests(datum.key)) { if (!listener.interests(datum.key)) {
@ -956,22 +940,11 @@ public class RaftCore implements Closeable {
* @param listener listener * @param listener listener
*/ */
public void unListen(String key, RecordListener listener) { public void unListen(String key, RecordListener listener) {
notifier.deregisterListener(key, listener);
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener dl : listeners.get(key)) {
// TODO maybe use equal:
if (dl == listener) {
listeners.get(key).remove(listener);
break;
}
}
} }
public void unlistenAll(String key) { public void unlistenAll(String key) {
listeners.remove(key); notifier.deregisterAllListener(key);
} }
public void setTerm(long term) { public void setTerm(long term) {
@ -1055,7 +1028,9 @@ public class RaftCore implements Closeable {
raftStore.delete(deleted); raftStore.delete(deleted);
Loggers.RAFT.info("datum deleted, key: {}", key); Loggers.RAFT.info("datum deleted, key: {}", key);
} }
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE).build()); NotifyCenter.publishEvent(
ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE)
.build());
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
Loggers.RAFT.warn("datum key decode failed: {}", key); Loggers.RAFT.warn("datum key decode failed: {}", key);
} }

View File

@ -71,7 +71,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable {
private volatile boolean ready = false; private volatile boolean ready = false;
private Set<Member> oldMembers; private Set<Member> oldMembers = new HashSet<>();
public RaftPeerSet(ServerMemberManager memberManager) { public RaftPeerSet(ServerMemberManager memberManager) {
this.memberManager = memberManager; this.memberManager = memberManager;
@ -317,7 +317,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable {
@Override @Override
public void onEvent(MembersChangeEvent event) { public void onEvent(MembersChangeEvent event) {
Collection<Member> members = event.getMembers(); Collection<Member> members = event.getMembers();
if (oldMembers == null) { if (oldMembers.isEmpty()) {
oldMembers = new HashSet<>(members); oldMembers = new HashSet<>(members);
} else { } else {
oldMembers.removeAll(members); oldMembers.removeAll(members);

View File

@ -24,8 +24,8 @@ import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent; import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.Service;
@ -49,14 +49,12 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DATA_BASE_DIR; import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DATA_BASE_DIR;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_PREFIX;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_SUFFIX;
/** /**
* Raft store. * Raft store.
* *
* @deprecated will remove in 1.4.x
* @author nacos * @author nacos
* @deprecated will remove in 1.4.x
*/ */
@Deprecated @Deprecated
@Component @Component
@ -75,8 +73,7 @@ public class RaftStore implements Closeable {
* @param datums cached datum map * @param datums cached datum map
* @throws Exception any exception during load * @throws Exception any exception during load
*/ */
public synchronized void loadDatums(PersistentNotifier notifier, Map<String, Datum> datums) public synchronized void loadDatums(PersistentNotifier notifier, Map<String, Datum> datums) throws Exception {
throws Exception {
Datum datum; Datum datum;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@ -87,8 +84,8 @@ public class RaftStore implements Closeable {
if (datum != null) { if (datum != null) {
datums.put(datum.key, datum); datums.put(datum.key, datum);
if (notifier != null) { if (notifier != null) {
NotifyCenter NotifyCenter.publishEvent(
.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
} }
} }
} }
@ -137,8 +134,7 @@ public class RaftStore implements Closeable {
Loggers.RAFT.warn("warning: encountered directory in cache dir: {}", cache.getAbsolutePath()); Loggers.RAFT.warn("warning: encountered directory in cache dir: {}", cache.getAbsolutePath());
} }
if (!StringUtils.equals(cache.getName(), encodeDatumKey(key)) && !StringUtils if (!StringUtils.equals(cache.getName(), encodeDatumKey(key))) {
.equals(cache.getName(), encodeDatumKey(key) + RAFT_CACHE_FILE_SUFFIX)) {
continue; continue;
} }
@ -149,12 +145,8 @@ public class RaftStore implements Closeable {
return null; return null;
} }
private boolean isDatumCacheFile(String fileName) {
return fileName.endsWith(RAFT_CACHE_FILE_SUFFIX) || fileName.startsWith(RAFT_CACHE_FILE_PREFIX);
}
private synchronized Datum readDatum(File file, String namespaceId) throws IOException { private synchronized Datum readDatum(File file, String namespaceId) throws IOException {
if (!isDatumCacheFile(file.getName())) { if (!KeyBuilder.isDatumCacheFile(file.getName())) {
return null; return null;
} }
ByteBuffer buffer; ByteBuffer buffer;
@ -251,14 +243,6 @@ public class RaftStore implements Closeable {
return fileName; return fileName;
} }
private File cacheFile(String cacheFileName) {
File cacheFile = new File(cacheFileName);
if (cacheFile.exists()) {
return cacheFile;
}
return new File(cacheFileName + RAFT_CACHE_FILE_SUFFIX);
}
/** /**
* Write datum to cache file. * Write datum to cache file.
* *
@ -269,7 +253,7 @@ public class RaftStore implements Closeable {
String namespaceId = KeyBuilder.getNamespace(datum.key); String namespaceId = KeyBuilder.getNamespace(datum.key);
File cacheFile = cacheFile(cacheFileName(namespaceId, datum.key)); File cacheFile = new File(cacheFileName(namespaceId, datum.key));
if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) { if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
MetricsMonitor.getDiskException().increment(); MetricsMonitor.getDiskException().increment();
@ -301,7 +285,7 @@ public class RaftStore implements Closeable {
String oldDatumKey = datum.key String oldDatumKey = datum.key
.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY); .replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);
cacheFile = cacheFile(cacheFileName(namespaceId, oldDatumKey)); cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));
if (cacheFile.exists() && !cacheFile.delete()) { if (cacheFile.exists() && !cacheFile.delete()) {
Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key,
datum.value); datum.value);

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.controllers; package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.IoUtils; import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.utils.WebUtils; import com.alibaba.nacos.core.utils.WebUtils;
@ -37,7 +38,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
@ -57,9 +57,9 @@ import java.util.Map;
/** /**
* Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself. * Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself.
* *
* @deprecated will remove in 1.4.x
* @author nkorange * @author nkorange
* @since 1.0.0 * @since 1.0.0
* @deprecated will remove in 1.4.x
*/ */
@Deprecated @Deprecated
@RestController @RestController
@ -72,7 +72,7 @@ public class RaftController {
private final ServiceManager serviceManager; private final ServiceManager serviceManager;
private final RaftCore raftCore; private final RaftCore raftCore;
private final ClusterVersionJudgement versionJudgement; private final ClusterVersionJudgement versionJudgement;
public RaftController(RaftConsistencyServiceImpl raftConsistencyService, ServiceManager serviceManager, public RaftController(RaftConsistencyServiceImpl raftConsistencyService, ServiceManager serviceManager,
@ -387,7 +387,7 @@ public class RaftController {
throw new IllegalStateException("old raft protocol already stop"); throw new IllegalStateException("old raft protocol already stop");
} }
ObjectNode result = JacksonUtils.createEmptyJsonNode(); ObjectNode result = JacksonUtils.createEmptyJsonNode();
Map<String, List<RecordListener>> listeners = raftCore.getListeners(); Map<String, ConcurrentHashSet<RecordListener>> listeners = raftCore.getListeners();
ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode(); ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode();
for (String key : listeners.keySet()) { for (String key : listeners.keySet()) {

View File

@ -118,8 +118,6 @@ public class UtilsAndCommons {
public static final String DATA_BASE_DIR = public static final String DATA_BASE_DIR =
ApplicationUtils.getNacosHome() + File.separator + "data" + File.separator + "naming"; ApplicationUtils.getNacosHome() + File.separator + "data" + File.separator + "naming";
public static final String RAFT_CACHE_FILE_SUFFIX = ".datum";
public static final String RAFT_CACHE_FILE_PREFIX = "com.alibaba.nacos.naming"; public static final String RAFT_CACHE_FILE_PREFIX = "com.alibaba.nacos.naming";
public static final String NUMBER_PATTERN = "^\\d+$"; public static final String NUMBER_PATTERN = "^\\d+$";