From eceab55cef74f3c76a379e32063ed0d2cc02c897 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Thu, 24 Sep 2020 17:03:24 +0800 Subject: [PATCH 1/5] Use datum in new raft processor to compatible old data --- .../impl/PersistentServiceProcessor.java | 84 ++++--------------- .../persistent/raft/RaftPeerSet.java | 4 +- 2 files changed, 17 insertions(+), 71 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 6e752a973..c342465b2 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -44,8 +44,6 @@ import com.alibaba.nacos.naming.consistency.ValueChangeEvent; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; -import com.alibaba.nacos.naming.consistency.persistent.raft.RaftStore; -import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.utils.Constants; @@ -53,13 +51,12 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang3.reflect.TypeUtils; import org.springframework.stereotype.Service; +import java.lang.reflect.Type; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -99,8 +96,6 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi private final KvStorage kvStorage; - private final RaftStore oldStore; - private final ClusterVersionJudgement versionJudgement; private final Serializer serializer; @@ -125,17 +120,17 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi private volatile boolean hasError = false; public PersistentServiceProcessor(final ProtocolManager protocolManager, - final ClusterVersionJudgement versionJudgement, final RaftStore oldStore) throws Exception { + final ClusterVersionJudgement versionJudgement) throws Exception { this.protocol = protocolManager.getCpProtocol(); - this.oldStore = oldStore; this.versionJudgement = versionJudgement; this.kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", - Paths.get(UtilsAndCommons.DATA_BASE_DIR, "persistent").toString()); + Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); this.serializer = SerializeFactory.getSerializer("JSON"); this.notifier = new PersistentNotifier(key -> { try { byte[] data = kvStorage.get(ByteUtils.toBytes(key)); - return serializer.deserialize(data, getClassForDeserialize(key)); + Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key)); + return null != datum ? datum.value : null; } catch (KvStorageException ex) { throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg()); } @@ -156,7 +151,6 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi } else { this.versionJudgement.registerObserver(isNewVersion -> { if (isNewVersion) { - loadFromOldData(); NotifyCenter.registerSubscriber(notifier); } }, 10); @@ -214,7 +208,8 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi final List values = request.getValues(); for (int i = 0; i < keys.size(); i++) { final String key = new String(keys.get(i)); - final Record value = serializer.deserialize(values.get(i), getClassForDeserialize(key)); + final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key)); + final Record value = null != datum ? datum.value : null; final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); NotifyCenter.publishEvent(event); @@ -231,62 +226,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock)); } - /** - * Pull old data into the new data store. When loading old data information, write locks must be added, and new - * requests can be processed only after the old data has been loaded - */ - @SuppressWarnings("unchecked") - public void loadFromOldData() { - final Lock lock = this.lock.writeLock(); - lock.lock(); - Loggers.RAFT.warn("start to load data to new raft protocol!!!"); - try { - if (protocol.isLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)) { - Map datumMap = new HashMap<>(64); - oldStore.loadDatums(null, datumMap); - int totalSize = datumMap.size(); - List keys = new ArrayList<>(totalSize); - List values = new ArrayList<>(totalSize); - int batchSize = 100; - List futures = new ArrayList<>(16); - for (Map.Entry entry : datumMap.entrySet()) { - totalSize--; - keys.add(ByteUtils.toBytes(entry.getKey())); - values.add(serializer.serialize(entry.getValue().value)); - if (keys.size() == batchSize || totalSize == 0) { - BatchWriteRequest request = new BatchWriteRequest(); - request.setKeys(keys); - request.setValues(values); - CompletableFuture future = protocol.submitAsync( - Log.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP) - .setOperation(Op.Write.name()) - .setData(ByteString.copyFrom(serializer.serialize(request))).build()) - .whenComplete(((response, throwable) -> { - if (throwable == null) { - Loggers.RAFT.info("submit old raft data result : {}", response); - } else { - Loggers.RAFT.error("submit old raft data occur exception : {}", throwable); - } - })); - futures.add(future); - keys.clear(); - values.clear(); - } - } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - } - } catch (Throwable ex) { - hasError = true; - Loggers.RAFT.error("load old raft data occur exception : {}", ex); - } finally { - lock.unlock(); - } - } - @Override public void put(String key, Record value) throws NacosException { final BatchWriteRequest req = new BatchWriteRequest(); - req.append(ByteUtils.toBytes(key), serializer.serialize(value)); + Datum datum = Datum.createDatum(key, value); + req.append(ByteUtils.toBytes(key), serializer.serialize(datum)); final Log log = Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); try { @@ -321,9 +265,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi BatchReadResponse response = serializer .deserialize(resp.getData().toByteArray(), BatchReadResponse.class); final List rValues = response.getValues(); - Record record = - rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getClassForDeserialize(key)); - return Datum.createDatum(key, record); + return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key)); } throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg()); } catch (Throwable e) { @@ -352,7 +294,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi return hasLeader && !hasError; } - private Class getClassForDeserialize(String key) { + private Type getDatumTypeFromKey(String key) { + return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key)); + } + + private Class getClassOfRecordFromKey(String key) { if (KeyBuilder.matchSwitchKey(key)) { return com.alibaba.nacos.naming.misc.SwitchDomain.class; } else if (KeyBuilder.matchServiceMetaKey(key)) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java index 10604779e..8ec0f9ec6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java @@ -71,7 +71,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable { private volatile boolean ready = false; - private Set oldMembers; + private Set oldMembers = new HashSet<>(); public RaftPeerSet(ServerMemberManager memberManager) { this.memberManager = memberManager; @@ -317,7 +317,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable { @Override public void onEvent(MembersChangeEvent event) { Collection members = event.getMembers(); - if (oldMembers == null) { + if (oldMembers.isEmpty()) { oldMembers = new HashSet<>(members); } else { oldMembers.removeAll(members); From 170f979d44f25a2df579a6d50daacd078dad398f Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 25 Sep 2020 13:58:19 +0800 Subject: [PATCH 2/5] Use multiple kv storage in new raft processor to compatible old data --- .../nacos/core/storage/kv/FileKvStorage.java | 17 +- .../nacos/core/storage/kv/KvStorage.java | 8 + .../core/storage/kv/MemoryKvStorage.java | 16 +- .../persistent/impl/NamingKvStorage.java | 211 ++++++++++++++++++ .../impl/PersistentServiceProcessor.java | 61 ++++- .../raft/RaftConsistencyServiceImpl.java | 1 - .../consistency/persistent/raft/RaftCore.java | 38 ++-- 7 files changed, 324 insertions(+), 28 deletions(-) create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java index 3addf6f2d..735e5fd3c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java +++ b/core/src/main/java/com/alibaba/nacos/core/storage/kv/FileKvStorage.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.core.storage.kv; +import com.alibaba.nacos.common.utils.ByteUtils; import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.core.exception.KvStorageException; import com.alibaba.nacos.core.utils.DiskUtils; @@ -24,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -174,7 +176,20 @@ public class FileKvStorage implements KvStorage { } @Override - public void shutdown() { + public List allKeys() throws KvStorageException { + List result = new LinkedList<>(); + File[] files = new File(baseDir).listFiles(); + if (null != files) { + for (File each : files) { + if (each.isFile()) { + result.add(ByteUtils.toBytes(each.getName())); + } + } + } + return result; + } + @Override + public void shutdown() { } } diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java index 7e1d4df82..38ddd5cfb 100644 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java +++ b/core/src/main/java/com/alibaba/nacos/core/storage/kv/KvStorage.java @@ -114,6 +114,14 @@ public interface KvStorage { */ void snapshotLoad(String path) throws KvStorageException; + /** + * Get all keys. + * + * @return all keys + * @throws KvStorageException KVStorageException + */ + List allKeys() throws KvStorageException; + /** * shutdown. */ diff --git a/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java b/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java index 2b77868da..34577f701 100644 --- a/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java +++ b/core/src/main/java/com/alibaba/nacos/core/storage/kv/MemoryKvStorage.java @@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.util.BytesUtil; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; @@ -91,12 +92,21 @@ public class MemoryKvStorage implements KvStorage { throw new UnsupportedOperationException(); } + @Override + public List allKeys() throws KvStorageException { + List result = new LinkedList<>(); + for (Key each : storage.keySet()) { + result.add(each.origin); + } + return result; + } + @Override public void shutdown() { storage.clear(); } - private static class Key implements Comparable { + private static class Key implements Comparable { private final byte[] origin; @@ -105,8 +115,8 @@ public class MemoryKvStorage implements KvStorage { } @Override - public int compareTo(byte[] o) { - return BytesUtil.compare(origin, o); + public int compareTo(Key o) { + return BytesUtil.compare(origin, o.origin); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java new file mode 100644 index 000000000..cb331fa38 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java @@ -0,0 +1,211 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.consistency.persistent.impl; + +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.core.exception.ErrorCode; +import com.alibaba.nacos.core.exception.KvStorageException; +import com.alibaba.nacos.core.storage.StorageFactory; +import com.alibaba.nacos.core.storage.kv.KvStorage; +import com.alibaba.nacos.core.storage.kv.MemoryKvStorage; +import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.misc.Loggers; + +import java.io.File; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Kv storage implementation for naming. + * + * @author xiweng.yy + */ +public class NamingKvStorage extends MemoryKvStorage { + + private final String baseDir; + + private final KvStorage baseDirStorage; + + private final Map namespaceKvStorage; + + public NamingKvStorage(final String baseDir) throws Exception { + this.baseDir = baseDir; + baseDirStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", baseDir); + namespaceKvStorage = new ConcurrentHashMap<>(); + } + + @Override + public byte[] get(byte[] key) throws KvStorageException { + byte[] result = super.get(key); + if (null == result) { + KvStorage storage = getActualStorage(key); + result = null == storage ? null : storage.get(key); + if (null != result) { + super.put(key, result); + } + } + return result; + } + + @Override + public Map batchGet(List keys) throws KvStorageException { + Map result = new HashMap<>(keys.size()); + for (byte[] key : keys) { + byte[] val = get(key); + if (val != null) { + result.put(key, val); + } + } + return result; + } + + @Override + public void put(byte[] key, byte[] value) throws KvStorageException { + try { + KvStorage storage = createActualStorageIfAbsent(key); + storage.put(key, value); + } catch (Exception e) { + throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(), + "Put data failed, key: " + new String(key), e); + } + // after actual storage put success, put it in memory, memory put should success all the time + super.put(key, value); + } + + @Override + public void batchPut(List keys, List values) throws KvStorageException { + if (keys.size() != values.size()) { + throw new KvStorageException(ErrorCode.KVStorageBatchWriteError, + "key's size must be equal to value's size"); + } + int size = keys.size(); + for (int i = 0; i < size; i++) { + put(keys.get(i), values.get(i)); + } + } + + @Override + public void delete(byte[] key) throws KvStorageException { + try { + KvStorage storage = getActualStorage(key); + if (null != storage) { + storage.delete(key); + } + } catch (Exception e) { + throw new KvStorageException(ErrorCode.KVStorageDeleteError.getCode(), + "Delete data failed, key: " + new String(key), e); + } + // after actual storage delete success, put it in memory, memory delete should success all the time + super.delete(key); + } + + @Override + public void batchDelete(List key) throws KvStorageException { + for (byte[] each : key) { + delete(each); + } + } + + @Override + public void doSnapshot(String backupPath) throws KvStorageException { + baseDirStorage.doSnapshot(backupPath); + } + + @Override + public void snapshotLoad(String path) throws KvStorageException { + final long startTime = System.currentTimeMillis(); + baseDirStorage.snapshotLoad(path); + loadSnapshotFromActualStorage(baseDirStorage); + loadNamespaceSnapshot(); + long costTime = System.currentTimeMillis() - startTime; + Loggers.RAFT.info("load snapshot cost time {}ms", costTime); + } + + private void loadSnapshotFromActualStorage(KvStorage actualStorage) throws KvStorageException { + for (byte[] each : actualStorage.allKeys()) { + byte[] datum = actualStorage.get(each); + super.put(each, datum); + } + } + + private void loadNamespaceSnapshot() { + for (String each : getAllNamespaceDirs()) { + try { + KvStorage kvStorage = createActualStorageIfAbsent(each); + loadSnapshotFromActualStorage(kvStorage); + } catch (Exception e) { + Loggers.RAFT.error("load snapshot for namespace {} failed", each, e); + } + } + } + + private List getAllNamespaceDirs() { + List result = new LinkedList<>(); + File[] files = new File(baseDir).listFiles(); + if (null != files) { + for (File each : files) { + if (each.isDirectory()) { + result.add(each.getName()); + } + } + } + return result; + } + + @Override + public List allKeys() throws KvStorageException { + return super.allKeys(); + } + + @Override + public void shutdown() { + baseDirStorage.shutdown(); + for (KvStorage each : namespaceKvStorage.values()) { + each.shutdown(); + } + namespaceKvStorage.clear(); + super.shutdown(); + } + + private KvStorage getActualStorage(byte[] key) { + String keyString = new String(key); + String namespace = KeyBuilder.getNamespace(keyString); + return StringUtils.isBlank(namespace) ? baseDirStorage : namespaceKvStorage.get(namespace); + } + + private KvStorage createActualStorageIfAbsent(byte[] key) throws Exception { + String keyString = new String(key); + String namespace = KeyBuilder.getNamespace(keyString); + return createActualStorageIfAbsent(namespace); + } + + private KvStorage createActualStorageIfAbsent(String namespace) throws Exception { + if (StringUtils.isBlank(namespace)) { + return baseDirStorage; + } + if (!namespaceKvStorage.containsKey(namespace)) { + String namespacePath = Paths.get(baseDir, namespace).toString(); + namespaceKvStorage.putIfAbsent(namespace, + StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", namespacePath)); + } + return namespaceKvStorage.get(namespace); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index c342465b2..659c1cda3 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -34,7 +34,6 @@ import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; import com.alibaba.nacos.core.distributed.ProtocolManager; import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.StorageFactory; import com.alibaba.nacos.core.storage.kv.KvStorage; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.naming.consistency.Datum; @@ -44,6 +43,7 @@ import com.alibaba.nacos.naming.consistency.ValueChangeEvent; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; +import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.utils.Constants; @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -119,12 +120,16 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi */ private volatile boolean hasError = false; + /** + * If use old raft, should not notify listener even new listener add. + */ + private volatile boolean startNotify = false; + public PersistentServiceProcessor(final ProtocolManager protocolManager, final ClusterVersionJudgement versionJudgement) throws Exception { this.protocol = protocolManager.getCpProtocol(); this.versionJudgement = versionJudgement; - this.kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent", - Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); + this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); this.serializer = SerializeFactory.getSerializer("JSON"); this.notifier = new PersistentNotifier(key -> { try { @@ -148,15 +153,28 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi // If you choose to use the new RAFT protocol directly, there will be no compatible logical execution if (ApplicationUtils.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) { NotifyCenter.registerSubscriber(notifier); + waitLeader(); + startNotify = true; } else { this.versionJudgement.registerObserver(isNewVersion -> { if (isNewVersion) { NotifyCenter.registerSubscriber(notifier); + startNotify = true; } }, 10); } } + private void waitLeader() { + while (!hasLeader && !hasError) { + Loggers.RAFT.info("Waiting Jraft leader vote ..."); + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ignored) { + } + } + } + @Override public Response onRequest(GetRequest request) { final List keys = serializer @@ -276,6 +294,9 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi @Override public void listen(String key, RecordListener listener) throws NacosException { notifier.registerListener(key, listener); + if (startNotify) { + notifierDatumIfAbsent(key, listener); + } } @Override @@ -308,4 +329,38 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi } return Record.class; } + + private void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException { + if (key.equals(KeyBuilder.SERVICE_META_KEY_PREFIX)) { + notifierAllServiceMeta(listener); + } else { + Datum datum = get(key); + if (null != datum) { + notifierDatum(key, datum, listener); + } + } + } + + /** + * This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()} + */ + private void notifierAllServiceMeta(RecordListener listener) throws NacosException { + for (byte[] each : kvStorage.allKeys()) { + String key = new String(each); + if (listener.interests(key)) { + Datum datum = get(key); + if (null != datum) { + notifierDatum(key, datum, listener); + } + } + } + } + + private void notifierDatum(String key, Datum datum, RecordListener listener) { + try { + listener.onChange(key, datum.value); + } catch (Exception e) { + Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e); + } + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java index d7e118846..a64e0c161 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java @@ -117,7 +117,6 @@ public class RaftConsistencyServiceImpl implements PersistentConsistencyService @Override public void listen(String key, RecordListener listener) throws NacosException { - checkIsStopWork(); raftCore.listen(key, listener); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java index c9d995cb9..d0b6dab08 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java @@ -22,7 +22,6 @@ import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.notify.EventPublisher; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.utils.JacksonUtils; -import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.naming.consistency.Datum; @@ -80,8 +79,8 @@ import java.util.zip.GZIPOutputStream; /** * Raft core code. * - * @deprecated will remove in 1.4.x * @author nacos + * @deprecated will remove in 1.4.x */ @Deprecated @DependsOn("ProtocolManager") @@ -164,20 +163,13 @@ public class RaftCore implements Closeable { Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); - while (true) { - if (publisher.currentEventSize() <= 0) { - break; - } - ThreadUtils.sleep(1000L); - } - initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); - + versionJudgement.registerObserver(isAllNewVersion -> { stopWork = isAllNewVersion; if (stopWork) { @@ -188,9 +180,9 @@ public class RaftCore implements Closeable { } } }, 100); - + NotifyCenter.registerSubscriber(notifier); - + Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } @@ -222,7 +214,7 @@ public class RaftCore implements Closeable { raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } - + OPERATE_LOCK.lock(); try { final long start = System.currentTimeMillis(); @@ -457,6 +449,8 @@ public class RaftCore implements Closeable { masterTask.cancel(true); Loggers.RAFT.warn("stop old raft protocol task for heartbeat task"); heartbeatTask.cancel(true); + Loggers.RAFT.warn("clean old cache datum for old raft"); + datums.clear(); } public class MasterElection implements Runnable { @@ -470,18 +464,18 @@ public class RaftCore implements Closeable { if (!peers.isReady()) { return; } - + RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; - + if (local.leaderDueMs > 0) { return; } - + // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); - + sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); @@ -855,8 +849,10 @@ public class RaftCore implements Closeable { raftStore.write(newDatum); datums.put(newDatum.key, newDatum); - - NotifyCenter.publishEvent(ValueChangeEvent.builder().key(newDatum.key).action(DataOperation.CHANGE).build()); + + NotifyCenter.publishEvent( + ValueChangeEvent.builder().key(newDatum.key).action(DataOperation.CHANGE) + .build()); local.resetLeaderDue(); @@ -1055,7 +1051,9 @@ public class RaftCore implements Closeable { raftStore.delete(deleted); Loggers.RAFT.info("datum deleted, key: {}", key); } - NotifyCenter.publishEvent(ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE).build()); + NotifyCenter.publishEvent( + ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE) + .build()); } catch (UnsupportedEncodingException e) { Loggers.RAFT.warn("datum key decode failed: {}", key); } From 481a1f972f65287bcab57fcfda7287123a7855d4 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 25 Sep 2020 15:06:21 +0800 Subject: [PATCH 3/5] Revert PR#2849. --- .../persistent/raft/RaftStore.java | 29 ++++++------------- .../nacos/naming/misc/UtilsAndCommons.java | 2 -- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java index d2b366d13..6df31d3f6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java @@ -24,8 +24,8 @@ import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; -import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; import com.alibaba.nacos.naming.consistency.ValueChangeEvent; +import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Service; @@ -50,13 +50,12 @@ import java.util.Properties; import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DATA_BASE_DIR; import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_PREFIX; -import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_SUFFIX; /** * Raft store. * - * @deprecated will remove in 1.4.x * @author nacos + * @deprecated will remove in 1.4.x */ @Deprecated @Component @@ -75,8 +74,7 @@ public class RaftStore implements Closeable { * @param datums cached datum map * @throws Exception any exception during load */ - public synchronized void loadDatums(PersistentNotifier notifier, Map datums) - throws Exception { + public synchronized void loadDatums(PersistentNotifier notifier, Map datums) throws Exception { Datum datum; long start = System.currentTimeMillis(); @@ -87,8 +85,8 @@ public class RaftStore implements Closeable { if (datum != null) { datums.put(datum.key, datum); if (notifier != null) { - NotifyCenter - .publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); + NotifyCenter.publishEvent( + ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); } } } @@ -137,8 +135,7 @@ public class RaftStore implements Closeable { Loggers.RAFT.warn("warning: encountered directory in cache dir: {}", cache.getAbsolutePath()); } - if (!StringUtils.equals(cache.getName(), encodeDatumKey(key)) && !StringUtils - .equals(cache.getName(), encodeDatumKey(key) + RAFT_CACHE_FILE_SUFFIX)) { + if (!StringUtils.equals(cache.getName(), encodeDatumKey(key))) { continue; } @@ -150,7 +147,7 @@ public class RaftStore implements Closeable { } private boolean isDatumCacheFile(String fileName) { - return fileName.endsWith(RAFT_CACHE_FILE_SUFFIX) || fileName.startsWith(RAFT_CACHE_FILE_PREFIX); + return fileName.startsWith(RAFT_CACHE_FILE_PREFIX); } private synchronized Datum readDatum(File file, String namespaceId) throws IOException { @@ -251,14 +248,6 @@ public class RaftStore implements Closeable { return fileName; } - private File cacheFile(String cacheFileName) { - File cacheFile = new File(cacheFileName); - if (cacheFile.exists()) { - return cacheFile; - } - return new File(cacheFileName + RAFT_CACHE_FILE_SUFFIX); - } - /** * Write datum to cache file. * @@ -269,7 +258,7 @@ public class RaftStore implements Closeable { String namespaceId = KeyBuilder.getNamespace(datum.key); - File cacheFile = cacheFile(cacheFileName(namespaceId, datum.key)); + File cacheFile = new File(cacheFileName(namespaceId, datum.key)); if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) { MetricsMonitor.getDiskException().increment(); @@ -301,7 +290,7 @@ public class RaftStore implements Closeable { String oldDatumKey = datum.key .replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY); - cacheFile = cacheFile(cacheFileName(namespaceId, oldDatumKey)); + cacheFile = new File(cacheFileName(namespaceId, oldDatumKey)); if (cacheFile.exists() && !cacheFile.delete()) { Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, datum.value); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java index fb398bd83..8cd518f6c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/UtilsAndCommons.java @@ -118,8 +118,6 @@ public class UtilsAndCommons { public static final String DATA_BASE_DIR = ApplicationUtils.getNacosHome() + File.separator + "data" + File.separator + "naming"; - public static final String RAFT_CACHE_FILE_SUFFIX = ".datum"; - public static final String RAFT_CACHE_FILE_PREFIX = "com.alibaba.nacos.naming"; public static final String NUMBER_PATTERN = "^\\d+$"; From 402ad12c8505427d48126af03cc082b5b2ced67b Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 25 Sep 2020 15:17:12 +0800 Subject: [PATCH 4/5] Move datum key check to KeyBuilder --- .../com/alibaba/nacos/naming/consistency/KeyBuilder.java | 9 +++++++-- .../persistent/impl/PersistentServiceProcessor.java | 2 +- .../naming/consistency/persistent/raft/RaftStore.java | 7 +------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/KeyBuilder.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/KeyBuilder.java index 092896612..41ce7cb12 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/KeyBuilder.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/KeyBuilder.java @@ -19,6 +19,8 @@ package com.alibaba.nacos.naming.consistency; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import org.apache.commons.lang3.StringUtils; +import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_PREFIX; + /** * Key operations for data. * @@ -81,8 +83,7 @@ public class KeyBuilder { } public static boolean matchSwitchKey(String key) { - return key.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME) || key - .endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME + UtilsAndCommons.RAFT_CACHE_FILE_SUFFIX); + return key.endsWith(UtilsAndCommons.SWITCH_DOMAIN_NAME); } public static boolean matchServiceName(String key, String namespaceId, String serviceName) { @@ -141,4 +142,8 @@ public class KeyBuilder { public static String getServiceName(String key) { return key.split(NAMESPACE_KEY_CONNECTOR)[1]; } + + public static boolean isDatumCacheFile(String key) { + return key.startsWith(RAFT_CACHE_FILE_PREFIX); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 659c1cda3..0e8f90d2b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -331,7 +331,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi } private void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException { - if (key.equals(KeyBuilder.SERVICE_META_KEY_PREFIX)) { + if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { notifierAllServiceMeta(listener); } else { Datum datum = get(key); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java index 6df31d3f6..5a56c60ee 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftStore.java @@ -49,7 +49,6 @@ import java.util.Map; import java.util.Properties; import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DATA_BASE_DIR; -import static com.alibaba.nacos.naming.misc.UtilsAndCommons.RAFT_CACHE_FILE_PREFIX; /** * Raft store. @@ -146,12 +145,8 @@ public class RaftStore implements Closeable { return null; } - private boolean isDatumCacheFile(String fileName) { - return fileName.startsWith(RAFT_CACHE_FILE_PREFIX); - } - private synchronized Datum readDatum(File file, String namespaceId) throws IOException { - if (!isDatumCacheFile(file.getName())) { + if (!KeyBuilder.isDatumCacheFile(file.getName())) { return null; } ByteBuffer buffer; From ea515428ce20463f440562b15ef541316b30e2c6 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 25 Sep 2020 18:55:57 +0800 Subject: [PATCH 5/5] Fix 1.3.2 upgrade 1.4.0 can't notify service change problem --- .../persistent/PersistentNotifier.java | 31 ++++++++++------ .../persistent/impl/NamingKvStorage.java | 21 ++++++----- .../impl/PersistentServiceProcessor.java | 4 +++ .../consistency/persistent/raft/RaftCore.java | 35 ++++--------------- .../naming/controllers/RaftController.java | 8 ++--- 5 files changed, 45 insertions(+), 54 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java index e54eae2de..e1be3ef5f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java @@ -48,7 +48,7 @@ public final class PersistentNotifier extends Subscriber { /** * register listener with key. * - * @param key key + * @param key key * @param listener {@link RecordListener} */ public void registerListener(final String key, final RecordListener listener) { @@ -59,7 +59,7 @@ public final class PersistentNotifier extends Subscriber { /** * deregister listener by key. * - * @param key key + * @param key key * @param listener {@link RecordListener} */ public void deregisterListener(final String key, final RecordListener listener) { @@ -70,12 +70,25 @@ public final class PersistentNotifier extends Subscriber { } /** - * notify value to listener with {@link DataOperation} and key. + * deregister all listener by key. * * @param key key + */ + public void deregisterAllListener(final String key) { + listenerMap.remove(key); + } + + public Map> getListeners() { + return listenerMap; + } + + /** + * notify value to listener with {@link DataOperation} and key. + * + * @param key key * @param action {@link DataOperation} - * @param value value - * @param type + * @param value value + * @param type */ public void notify(final String key, final DataOperation action, final T value) { if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { @@ -89,18 +102,16 @@ public final class PersistentNotifier extends Subscriber { listener.onDelete(key); } } catch (Throwable e) { - Loggers.RAFT - .error("[NACOS-RAFT] error while notifying listener of key: {}", key, - e); + Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } } - + if (!listenerMap.containsKey(key)) { return; } - + for (RecordListener listener : listenerMap.get(key)) { try { if (action == DataOperation.CHANGE) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java index cb331fa38..5e54e4074 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java @@ -56,10 +56,15 @@ public class NamingKvStorage extends MemoryKvStorage { public byte[] get(byte[] key) throws KvStorageException { byte[] result = super.get(key); if (null == result) { - KvStorage storage = getActualStorage(key); - result = null == storage ? null : storage.get(key); - if (null != result) { - super.put(key, result); + try { + KvStorage storage = createActualStorageIfAbsent(key); + result = null == storage ? null : storage.get(key); + if (null != result) { + super.put(key, result); + } + } catch (Exception e) { + throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(), + "Get data failed, key: " + new String(key), e); } } return result; @@ -105,7 +110,7 @@ public class NamingKvStorage extends MemoryKvStorage { @Override public void delete(byte[] key) throws KvStorageException { try { - KvStorage storage = getActualStorage(key); + KvStorage storage = createActualStorageIfAbsent(key); if (null != storage) { storage.delete(key); } @@ -185,12 +190,6 @@ public class NamingKvStorage extends MemoryKvStorage { super.shutdown(); } - private KvStorage getActualStorage(byte[] key) { - String keyString = new String(key); - String namespace = KeyBuilder.getNamespace(keyString); - return StringUtils.isBlank(namespace) ? baseDirStorage : namespaceKvStorage.get(namespace); - } - private KvStorage createActualStorageIfAbsent(byte[] key) throws Exception { String keyString = new String(key); String namespace = KeyBuilder.getNamespace(keyString); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 0e8f90d2b..028103a66 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -231,6 +231,10 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); NotifyCenter.publishEvent(event); + // remove listeners of key to avoid mem leak + if (Op.Delete.equals(op)) { + notifier.deregisterAllListener(key); + } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java index d0b6dab08..3800e7b1d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.notify.EventPublisher; import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -68,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -187,8 +187,8 @@ public class RaftCore implements Closeable { GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } - public Map> getListeners() { - return listeners; + public Map> getListeners() { + return notifier.getListeners(); } /** @@ -916,21 +916,9 @@ public class RaftCore implements Closeable { * @param listener new listener */ public void listen(String key, RecordListener listener) { - - List listenerList = listeners.get(key); - if (listenerList != null && listenerList.contains(listener)) { - return; - } - - if (listenerList == null) { - listenerList = new CopyOnWriteArrayList<>(); - listeners.put(key, listenerList); - } + notifier.registerListener(key, listener); Loggers.RAFT.info("add listener: {}", key); - - listenerList.add(listener); - // if data present, notify immediately for (Datum datum : datums.values()) { if (!listener.interests(datum.key)) { @@ -952,22 +940,11 @@ public class RaftCore implements Closeable { * @param listener listener */ public void unListen(String key, RecordListener listener) { - - if (!listeners.containsKey(key)) { - return; - } - - for (RecordListener dl : listeners.get(key)) { - // TODO maybe use equal: - if (dl == listener) { - listeners.get(key).remove(listener); - break; - } - } + notifier.deregisterListener(key, listener); } public void unlistenAll(String key) { - listeners.remove(key); + notifier.deregisterAllListener(key); } public void setTerm(long term) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java index ebef44dd8..ea677e638 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.IoUtils; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.utils.WebUtils; @@ -37,7 +38,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -57,9 +57,9 @@ import java.util.Map; /** * Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself. * - * @deprecated will remove in 1.4.x * @author nkorange * @since 1.0.0 + * @deprecated will remove in 1.4.x */ @Deprecated @RestController @@ -72,7 +72,7 @@ public class RaftController { private final ServiceManager serviceManager; private final RaftCore raftCore; - + private final ClusterVersionJudgement versionJudgement; public RaftController(RaftConsistencyServiceImpl raftConsistencyService, ServiceManager serviceManager, @@ -387,7 +387,7 @@ public class RaftController { throw new IllegalStateException("old raft protocol already stop"); } ObjectNode result = JacksonUtils.createEmptyJsonNode(); - Map> listeners = raftCore.getListeners(); + Map> listeners = raftCore.getListeners(); ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode(); for (String key : listeners.keySet()) {