From c4d7c33b98597124e0ffab9738ca335090c01a88 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Wed, 12 Oct 2022 14:08:19 +0800 Subject: [PATCH] fix cache removed when listeneradd delay (#9311) * fix listener miss * fixed cache remove on add listener delay --- .../nacos/client/config/impl/CacheData.java | 13 +++++++++ .../client/config/impl/ClientWorker.java | 27 +++++++++++-------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java index 738688688..715956664 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java @@ -111,6 +111,11 @@ public class CacheData { */ private volatile boolean isSyncWithServer = false; + /** + * if is cache data is discard,need to remove. + */ + private volatile boolean isDiscard = false; + private String type; public boolean isInitializing() { @@ -402,6 +407,14 @@ public class CacheData { isSyncWithServer = syncWithServer; } + public boolean isDiscard() { + return isDiscard; + } + + public void setDiscard(boolean discard) { + isDiscard = discard; + } + public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) { this(configFilterChainManager, name, dataId, group, TenantUtil.getUserTenantForAcm()); } diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index c63542351..6367c94a5 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -126,11 +126,11 @@ public class ClientWorker implements Closeable { private int taskPenaltyTime; private boolean enableRemoteSyncConfig = false; - + private static final int MIN_THREAD_NUM = 2; - + private static final int THREAD_MULTIPLE = 1; - + /** * Add listeners for data. * @@ -146,6 +146,7 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } + cache.setDiscard(false); cache.setSyncWithServer(false); agent.notifyListenConfig(); @@ -169,6 +170,7 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } + cache.setDiscard(false); cache.setSyncWithServer(false); agent.notifyListenConfig(); } @@ -196,6 +198,7 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } + cache.setDiscard(false); cache.setSyncWithServer(false); agent.notifyListenConfig(); } @@ -217,6 +220,7 @@ public class ClientWorker implements Closeable { cache.removeListener(listener); if (cache.getListeners().isEmpty()) { cache.setSyncWithServer(false); + cache.setDiscard(true); agent.removeCache(dataId, group); } } @@ -240,6 +244,7 @@ public class ClientWorker implements Closeable { cache.removeListener(listener); if (cache.getListeners().isEmpty()) { cache.setSyncWithServer(false); + cache.setDiscard(true); agent.removeCache(dataId, group); } } @@ -696,7 +701,7 @@ public class ClientWorker implements Closeable { continue; } executeConfigListen(); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } } @@ -733,7 +738,7 @@ public class ClientWorker implements Closeable { } } - if (!CollectionUtils.isEmpty(cache.getListeners())) { + if (!cache.isDiscard()) { //get listen config if (!cache.isUseLocalConfigInfo()) { List cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); @@ -744,7 +749,7 @@ public class ClientWorker implements Closeable { cacheDatas.add(cache); } - } else if (CollectionUtils.isEmpty(cache.getListeners())) { + } else if (cache.isDiscard()) { if (!cache.isUseLocalConfigInfo()) { List cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); @@ -807,8 +812,8 @@ public class ClientWorker implements Closeable { if (!cacheData.getListeners().isEmpty()) { Long previousTimesStamp = timestampMap.get(groupKey); - if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, - System.currentTimeMillis())) { + if (previousTimesStamp != null && !cacheData.getLastModifiedTs() + .compareAndSet(previousTimesStamp, System.currentTimeMillis())) { continue; } cacheData.setSyncWithServer(true); @@ -844,7 +849,7 @@ public class ClientWorker implements Closeable { if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { synchronized (cacheData) { - if (cacheData.getListeners().isEmpty()) { + if (cacheData.isDiscard()) { ClientWorker.this .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); } @@ -979,8 +984,8 @@ public class ClientWorker implements Closeable { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId, group, tenant, response); throw new NacosException(response.getErrorCode(), - "http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group - + ",tenant=" + tenant); + "http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + + dataId + ",group=" + group + ",tenant=" + tenant); } }