From ea515428ce20463f440562b15ef541316b30e2c6 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 25 Sep 2020 18:55:57 +0800 Subject: [PATCH] Fix 1.3.2 upgrade 1.4.0 can't notify service change problem --- .../persistent/PersistentNotifier.java | 31 ++++++++++------ .../persistent/impl/NamingKvStorage.java | 21 ++++++----- .../impl/PersistentServiceProcessor.java | 4 +++ .../consistency/persistent/raft/RaftCore.java | 35 ++++--------------- .../naming/controllers/RaftController.java | 8 ++--- 5 files changed, 45 insertions(+), 54 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java index e54eae2de..e1be3ef5f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentNotifier.java @@ -48,7 +48,7 @@ public final class PersistentNotifier extends Subscriber { /** * register listener with key. * - * @param key key + * @param key key * @param listener {@link RecordListener} */ public void registerListener(final String key, final RecordListener listener) { @@ -59,7 +59,7 @@ public final class PersistentNotifier extends Subscriber { /** * deregister listener by key. * - * @param key key + * @param key key * @param listener {@link RecordListener} */ public void deregisterListener(final String key, final RecordListener listener) { @@ -70,12 +70,25 @@ public final class PersistentNotifier extends Subscriber { } /** - * notify value to listener with {@link DataOperation} and key. + * deregister all listener by key. * * @param key key + */ + public void deregisterAllListener(final String key) { + listenerMap.remove(key); + } + + public Map> getListeners() { + return listenerMap; + } + + /** + * notify value to listener with {@link DataOperation} and key. + * + * @param key key * @param action {@link DataOperation} - * @param value value - * @param type + * @param value value + * @param type */ public void notify(final String key, final DataOperation action, final T value) { if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { @@ -89,18 +102,16 @@ public final class PersistentNotifier extends Subscriber { listener.onDelete(key); } } catch (Throwable e) { - Loggers.RAFT - .error("[NACOS-RAFT] error while notifying listener of key: {}", key, - e); + Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } } - + if (!listenerMap.containsKey(key)) { return; } - + for (RecordListener listener : listenerMap.get(key)) { try { if (action == DataOperation.CHANGE) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java index cb331fa38..5e54e4074 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/NamingKvStorage.java @@ -56,10 +56,15 @@ public class NamingKvStorage extends MemoryKvStorage { public byte[] get(byte[] key) throws KvStorageException { byte[] result = super.get(key); if (null == result) { - KvStorage storage = getActualStorage(key); - result = null == storage ? null : storage.get(key); - if (null != result) { - super.put(key, result); + try { + KvStorage storage = createActualStorageIfAbsent(key); + result = null == storage ? null : storage.get(key); + if (null != result) { + super.put(key, result); + } + } catch (Exception e) { + throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(), + "Get data failed, key: " + new String(key), e); } } return result; @@ -105,7 +110,7 @@ public class NamingKvStorage extends MemoryKvStorage { @Override public void delete(byte[] key) throws KvStorageException { try { - KvStorage storage = getActualStorage(key); + KvStorage storage = createActualStorageIfAbsent(key); if (null != storage) { storage.delete(key); } @@ -185,12 +190,6 @@ public class NamingKvStorage extends MemoryKvStorage { super.shutdown(); } - private KvStorage getActualStorage(byte[] key) { - String keyString = new String(key); - String namespace = KeyBuilder.getNamespace(keyString); - return StringUtils.isBlank(namespace) ? baseDirStorage : namespaceKvStorage.get(namespace); - } - private KvStorage createActualStorageIfAbsent(byte[] key) throws Exception { String keyString = new String(key); String namespace = KeyBuilder.getNamespace(keyString); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 0e8f90d2b..028103a66 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -231,6 +231,10 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); NotifyCenter.publishEvent(event); + // remove listeners of key to avoid mem leak + if (Op.Delete.equals(op)) { + notifier.deregisterAllListener(key); + } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java index d0b6dab08..3800e7b1d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.notify.EventPublisher; import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -68,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -187,8 +187,8 @@ public class RaftCore implements Closeable { GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } - public Map> getListeners() { - return listeners; + public Map> getListeners() { + return notifier.getListeners(); } /** @@ -916,21 +916,9 @@ public class RaftCore implements Closeable { * @param listener new listener */ public void listen(String key, RecordListener listener) { - - List listenerList = listeners.get(key); - if (listenerList != null && listenerList.contains(listener)) { - return; - } - - if (listenerList == null) { - listenerList = new CopyOnWriteArrayList<>(); - listeners.put(key, listenerList); - } + notifier.registerListener(key, listener); Loggers.RAFT.info("add listener: {}", key); - - listenerList.add(listener); - // if data present, notify immediately for (Datum datum : datums.values()) { if (!listener.interests(datum.key)) { @@ -952,22 +940,11 @@ public class RaftCore implements Closeable { * @param listener listener */ public void unListen(String key, RecordListener listener) { - - if (!listeners.containsKey(key)) { - return; - } - - for (RecordListener dl : listeners.get(key)) { - // TODO maybe use equal: - if (dl == listener) { - listeners.get(key).remove(listener); - break; - } - } + notifier.deregisterListener(key, listener); } public void unlistenAll(String key) { - listeners.remove(key); + notifier.deregisterAllListener(key); } public void setTerm(long term) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java index ebef44dd8..ea677e638 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/RaftController.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.controllers; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.IoUtils; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.utils.WebUtils; @@ -37,7 +38,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -57,9 +57,9 @@ import java.util.Map; /** * Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself. * - * @deprecated will remove in 1.4.x * @author nkorange * @since 1.0.0 + * @deprecated will remove in 1.4.x */ @Deprecated @RestController @@ -72,7 +72,7 @@ public class RaftController { private final ServiceManager serviceManager; private final RaftCore raftCore; - + private final ClusterVersionJudgement versionJudgement; public RaftController(RaftConsistencyServiceImpl raftConsistencyService, ServiceManager serviceManager, @@ -387,7 +387,7 @@ public class RaftController { throw new IllegalStateException("old raft protocol already stop"); } ObjectNode result = JacksonUtils.createEmptyJsonNode(); - Map> listeners = raftCore.getListeners(); + Map> listeners = raftCore.getListeners(); ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode(); for (String key : listeners.keySet()) {