fix cache removed when listeneradd delay (#9311)

* fix listener  miss

* fixed cache remove on add listener delay
This commit is contained in:
nov.lzf 2022-10-12 14:08:19 +08:00 committed by GitHub
parent 8aa2e0a746
commit c4d7c33b98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 11 deletions

View File

@ -111,6 +111,11 @@ public class CacheData {
*/ */
private volatile boolean isSyncWithServer = false; private volatile boolean isSyncWithServer = false;
/**
* if is cache data is discard,need to remove.
*/
private volatile boolean isDiscard = false;
private String type; private String type;
public boolean isInitializing() { public boolean isInitializing() {
@ -402,6 +407,14 @@ public class CacheData {
isSyncWithServer = syncWithServer; isSyncWithServer = syncWithServer;
} }
public boolean isDiscard() {
return isDiscard;
}
public void setDiscard(boolean discard) {
isDiscard = discard;
}
public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) { public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) {
this(configFilterChainManager, name, dataId, group, TenantUtil.getUserTenantForAcm()); this(configFilterChainManager, name, dataId, group, TenantUtil.getUserTenantForAcm());
} }

View File

@ -146,6 +146,7 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) { for (Listener listener : listeners) {
cache.addListener(listener); cache.addListener(listener);
} }
cache.setDiscard(false);
cache.setSyncWithServer(false); cache.setSyncWithServer(false);
agent.notifyListenConfig(); agent.notifyListenConfig();
@ -169,6 +170,7 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) { for (Listener listener : listeners) {
cache.addListener(listener); cache.addListener(listener);
} }
cache.setDiscard(false);
cache.setSyncWithServer(false); cache.setSyncWithServer(false);
agent.notifyListenConfig(); agent.notifyListenConfig();
} }
@ -196,6 +198,7 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) { for (Listener listener : listeners) {
cache.addListener(listener); cache.addListener(listener);
} }
cache.setDiscard(false);
cache.setSyncWithServer(false); cache.setSyncWithServer(false);
agent.notifyListenConfig(); agent.notifyListenConfig();
} }
@ -217,6 +220,7 @@ public class ClientWorker implements Closeable {
cache.removeListener(listener); cache.removeListener(listener);
if (cache.getListeners().isEmpty()) { if (cache.getListeners().isEmpty()) {
cache.setSyncWithServer(false); cache.setSyncWithServer(false);
cache.setDiscard(true);
agent.removeCache(dataId, group); agent.removeCache(dataId, group);
} }
} }
@ -240,6 +244,7 @@ public class ClientWorker implements Closeable {
cache.removeListener(listener); cache.removeListener(listener);
if (cache.getListeners().isEmpty()) { if (cache.getListeners().isEmpty()) {
cache.setSyncWithServer(false); cache.setSyncWithServer(false);
cache.setDiscard(true);
agent.removeCache(dataId, group); agent.removeCache(dataId, group);
} }
} }
@ -696,7 +701,7 @@ public class ClientWorker implements Closeable {
continue; continue;
} }
executeConfigListen(); executeConfigListen();
} catch (Exception e) { } catch (Throwable e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
} }
} }
@ -733,7 +738,7 @@ public class ClientWorker implements Closeable {
} }
} }
if (!CollectionUtils.isEmpty(cache.getListeners())) { if (!cache.isDiscard()) {
//get listen config //get listen config
if (!cache.isUseLocalConfigInfo()) { if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
@ -744,7 +749,7 @@ public class ClientWorker implements Closeable {
cacheDatas.add(cache); cacheDatas.add(cache);
} }
} else if (CollectionUtils.isEmpty(cache.getListeners())) { } else if (cache.isDiscard()) {
if (!cache.isUseLocalConfigInfo()) { if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
@ -807,8 +812,8 @@ public class ClientWorker implements Closeable {
if (!cacheData.getListeners().isEmpty()) { if (!cacheData.getListeners().isEmpty()) {
Long previousTimesStamp = timestampMap.get(groupKey); Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, if (previousTimesStamp != null && !cacheData.getLastModifiedTs()
System.currentTimeMillis())) { .compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
continue; continue;
} }
cacheData.setSyncWithServer(true); cacheData.setSyncWithServer(true);
@ -844,7 +849,7 @@ public class ClientWorker implements Closeable {
if (removeSuccess) { if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) { for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) { synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) { if (cacheData.isDiscard()) {
ClientWorker.this ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
} }
@ -979,8 +984,8 @@ public class ClientWorker implements Closeable {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId, LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId,
group, tenant, response); group, tenant, response);
throw new NacosException(response.getErrorCode(), throw new NacosException(response.getErrorCode(),
"http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group "http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId="
+ ",tenant=" + tenant); + dataId + ",group=" + group + ",tenant=" + tenant);
} }
} }