Fix 1.3.2 upgrade 1.4.0 can't notify service change problem

This commit is contained in:
KomachiSion 2020-09-25 18:55:57 +08:00
parent 402ad12c85
commit ea515428ce
5 changed files with 45 additions and 54 deletions

View File

@ -48,7 +48,7 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
/**
* register listener with key.
*
* @param key key
* @param key key
* @param listener {@link RecordListener}
*/
public void registerListener(final String key, final RecordListener listener) {
@ -59,7 +59,7 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
/**
* deregister listener by key.
*
* @param key key
* @param key key
* @param listener {@link RecordListener}
*/
public void deregisterListener(final String key, final RecordListener listener) {
@ -70,12 +70,25 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
}
/**
* notify value to listener with {@link DataOperation} and key.
* deregister all listener by key.
*
* @param key key
*/
public void deregisterAllListener(final String key) {
listenerMap.remove(key);
}
public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
return listenerMap;
}
/**
* notify value to listener with {@link DataOperation} and key.
*
* @param key key
* @param action {@link DataOperation}
* @param value value
* @param <T> type
* @param value value
* @param <T> type
*/
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
@ -89,18 +102,16 @@ public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT
.error("[NACOS-RAFT] error while notifying listener of key: {}", key,
e);
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
}
if (!listenerMap.containsKey(key)) {
return;
}
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {

View File

@ -56,10 +56,15 @@ public class NamingKvStorage extends MemoryKvStorage {
public byte[] get(byte[] key) throws KvStorageException {
byte[] result = super.get(key);
if (null == result) {
KvStorage storage = getActualStorage(key);
result = null == storage ? null : storage.get(key);
if (null != result) {
super.put(key, result);
try {
KvStorage storage = createActualStorageIfAbsent(key);
result = null == storage ? null : storage.get(key);
if (null != result) {
super.put(key, result);
}
} catch (Exception e) {
throw new KvStorageException(ErrorCode.KVStorageWriteError.getCode(),
"Get data failed, key: " + new String(key), e);
}
}
return result;
@ -105,7 +110,7 @@ public class NamingKvStorage extends MemoryKvStorage {
@Override
public void delete(byte[] key) throws KvStorageException {
try {
KvStorage storage = getActualStorage(key);
KvStorage storage = createActualStorageIfAbsent(key);
if (null != storage) {
storage.delete(key);
}
@ -185,12 +190,6 @@ public class NamingKvStorage extends MemoryKvStorage {
super.shutdown();
}
private KvStorage getActualStorage(byte[] key) {
String keyString = new String(key);
String namespace = KeyBuilder.getNamespace(keyString);
return StringUtils.isBlank(namespace) ? baseDirStorage : namespaceKvStorage.get(namespace);
}
private KvStorage createActualStorageIfAbsent(byte[] key) throws Exception {
String keyString = new String(key);
String namespace = KeyBuilder.getNamespace(keyString);

View File

@ -231,6 +231,10 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
// remove listeners of key to avoid mem leak
if (Op.Delete.equals(op)) {
notifier.deregisterAllListener(key);
}
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.EventPublisher;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -68,7 +69,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -187,8 +187,8 @@ public class RaftCore implements Closeable {
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
public Map<String, List<RecordListener>> getListeners() {
return listeners;
public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
return notifier.getListeners();
}
/**
@ -916,21 +916,9 @@ public class RaftCore implements Closeable {
* @param listener new listener
*/
public void listen(String key, RecordListener listener) {
List<RecordListener> listenerList = listeners.get(key);
if (listenerList != null && listenerList.contains(listener)) {
return;
}
if (listenerList == null) {
listenerList = new CopyOnWriteArrayList<>();
listeners.put(key, listenerList);
}
notifier.registerListener(key, listener);
Loggers.RAFT.info("add listener: {}", key);
listenerList.add(listener);
// if data present, notify immediately
for (Datum datum : datums.values()) {
if (!listener.interests(datum.key)) {
@ -952,22 +940,11 @@ public class RaftCore implements Closeable {
* @param listener listener
*/
public void unListen(String key, RecordListener listener) {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener dl : listeners.get(key)) {
// TODO maybe use equal:
if (dl == listener) {
listeners.get(key).remove(listener);
break;
}
}
notifier.deregisterListener(key, listener);
}
public void unlistenAll(String key) {
listeners.remove(key);
notifier.deregisterAllListener(key);
}
public void setTerm(long term) {

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.utils.WebUtils;
@ -37,7 +38,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@ -57,9 +57,9 @@ import java.util.Map;
/**
* Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself.
*
* @deprecated will remove in 1.4.x
* @author nkorange
* @since 1.0.0
* @deprecated will remove in 1.4.x
*/
@Deprecated
@RestController
@ -72,7 +72,7 @@ public class RaftController {
private final ServiceManager serviceManager;
private final RaftCore raftCore;
private final ClusterVersionJudgement versionJudgement;
public RaftController(RaftConsistencyServiceImpl raftConsistencyService, ServiceManager serviceManager,
@ -387,7 +387,7 @@ public class RaftController {
throw new IllegalStateException("old raft protocol already stop");
}
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Map<String, List<RecordListener>> listeners = raftCore.getListeners();
Map<String, ConcurrentHashSet<RecordListener>> listeners = raftCore.getListeners();
ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode();
for (String key : listeners.keySet()) {