fix(11127): Fix ClientWork removeListener and addListener concurrency issue. (#11681)

This commit is contained in:
blake.qiu 2024-02-02 15:55:52 +08:00 committed by GitHub
parent 1b9a22c1a8
commit 9fcc4c0dbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 186 additions and 2 deletions

View File

@ -155,14 +155,16 @@ public class ClientWorker implements Closeable {
group = blank2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// make sure cache exists in cacheMap
if (getCache(dataId, group) != cache) {
putCache(GroupKey.getKey(dataId, group), cache);
}
agent.notifyListenConfig();
}
}
@ -185,6 +187,10 @@ public class ClientWorker implements Closeable {
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// ensure cache present in cacheMap
if (getCache(dataId, group, tenant) != cache) {
putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
}
agent.notifyListenConfig();
}
@ -213,6 +219,10 @@ public class ClientWorker implements Closeable {
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// make sure cache exists in cacheMap
if (getCache(dataId, group, tenant) != cache) {
putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
}
agent.notifyListenConfig();
}
@ -403,6 +413,20 @@ public class ClientWorker implements Closeable {
return cache;
}
/**
* Put cache.
*
* @param key groupKey
* @param cache cache
*/
private void putCache(String key, CacheData cache) {
synchronized (cacheMap) {
Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
copy.put(key, cache);
cacheMap.set(copy);
}
}
private void increaseTaskIdCount(int taskId) {
taskIdCacheCountList.get(taskId).incrementAndGet();
}

View File

@ -16,22 +16,57 @@
package com.alibaba.nacos.client.config.impl;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.env.NacosClientProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
public class ClientWorkerTest {
private static final String TEST_NAMESPACE = "TEST_NAMESPACE";
private ClientWorker clientWorker;
private ClientWorker clientWorkerSpy;
@Before
public void setUp() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE);
ConfigFilterChainManager filter = new ConfigFilterChainManager(properties);
ServerListManager serverListManager = Mockito.mock(ServerListManager.class);
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
try {
clientWorker = new ClientWorker(filter, serverListManager, nacosClientProperties);
} catch (NacosException e) {
throw new RuntimeException(e);
}
clientWorkerSpy = Mockito.spy(clientWorker);
}
@Test
public void testConstruct() throws NacosException {
Properties prop = new Properties();
@ -205,4 +240,129 @@ public class ClientWorkerTest {
Mockito.when(client.isHealthServer()).thenReturn(Boolean.FALSE);
Assert.assertEquals(false, clientWorker.isHealthServer());
}
@Test
public void testPutCache() throws Exception {
// 反射调用私有方法putCacheIfAbsent
Method putCacheMethod = ClientWorker.class.getDeclaredMethod("putCache", String.class, CacheData.class);
putCacheMethod.setAccessible(true);
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
String key = "testKey";
CacheData cacheData = new CacheData(filter, "env", "dataId", "group");
putCacheMethod.invoke(clientWorker, key, cacheData);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
// 检查cacheMap是否包含特定的key
assertNotNull(cacheMapRef.get().get(key));
Assert.assertEquals(cacheData, cacheMapRef.get().get(key));
// 测试再次插入相同的key将覆盖原始的值
CacheData newCacheData = new CacheData(filter, "newEnv", "newDataId", "newGroup");
putCacheMethod.invoke(clientWorker, key, newCacheData);
// 检查key对应的value是否改变为newCacheData
Assert.assertEquals(newCacheData, cacheMapRef.get().get(key));
}
@Test
public void testAddListenersEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKey(dataId, group);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy).addCacheDataIfAbsent(anyString(), anyString());
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致均为differentCacheData
clientWorkerSpy.addListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}
@Test
public void testAddTenantListenersEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKeyTenant(dataId, group, TEST_NAMESPACE);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy)
.addCacheDataIfAbsent(anyString(), anyString(), eq(TEST_NAMESPACE));
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addTenantListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致均为differentCacheData
clientWorkerSpy.addTenantListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}
@Test
public void testAddTenantListenersWithContentEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKeyTenant(dataId, group, TEST_NAMESPACE);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy)
.addCacheDataIfAbsent(anyString(), anyString(), eq(TEST_NAMESPACE));
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addTenantListenersWithContent(dataId, group, "", "", Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致均为differentCacheData
clientWorkerSpy.addTenantListenersWithContent(dataId, group, "", "", Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}
}