feat(nacos-client:config): fix issue #1317
This commit is contained in:
parent
240ee50aa2
commit
ade510f31d
@ -52,6 +52,8 @@ public class PropertyKeyConst {
|
|||||||
|
|
||||||
public final static String MAX_RETRY = "maxRetry";
|
public final static String MAX_RETRY = "maxRetry";
|
||||||
|
|
||||||
|
public final static String ENABLE_REMOTE_SYNC_CONFIG = "enableRemoteSyncConfig";
|
||||||
|
|
||||||
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
|
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
|
||||||
|
|
||||||
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
|
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
|
||||||
|
@ -37,7 +37,7 @@ public interface ConfigService {
|
|||||||
String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
|
String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get config
|
* Get config and register Listener
|
||||||
*
|
*
|
||||||
* @param dataId dataId
|
* @param dataId dataId
|
||||||
* @param group group
|
* @param group group
|
||||||
|
@ -175,11 +175,6 @@ public class NacosConfigService implements ConfigService {
|
|||||||
|
|
||||||
cr.setContent(content);
|
cr.setContent(content);
|
||||||
|
|
||||||
CacheData cacheData = worker.getCache(dataId, group, tenant);
|
|
||||||
if (cacheData != null) {
|
|
||||||
cacheData.setContent(content);
|
|
||||||
}
|
|
||||||
|
|
||||||
configFilterChainManager.doFilter(null, cr);
|
configFilterChainManager.doFilter(null, cr);
|
||||||
content = cr.getContent();
|
content = cr.getContent();
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
|
|||||||
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
|
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
|
||||||
import com.alibaba.nacos.client.config.utils.MD5;
|
import com.alibaba.nacos.client.config.utils.MD5;
|
||||||
import com.alibaba.nacos.client.utils.LogUtils;
|
import com.alibaba.nacos.client.utils.LogUtils;
|
||||||
|
import com.alibaba.nacos.client.utils.StringUtils;
|
||||||
import com.alibaba.nacos.client.utils.TenantUtil;
|
import com.alibaba.nacos.client.utils.TenantUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
@ -74,8 +75,7 @@ public class CacheData {
|
|||||||
if (null == listener) {
|
if (null == listener) {
|
||||||
throw new IllegalArgumentException("listener is null");
|
throw new IllegalArgumentException("listener is null");
|
||||||
}
|
}
|
||||||
ManagerListenerWrap wrap = new ManagerListenerWrap(listener);
|
ManagerListenerWrap wrap = new ManagerListenerWrap(listener, md5);
|
||||||
wrap.lastCallMd5 = md5;
|
|
||||||
if (listeners.addIfAbsent(wrap)) {
|
if (listeners.addIfAbsent(wrap)) {
|
||||||
LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
|
LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
|
||||||
listeners.size());
|
listeners.size());
|
||||||
@ -263,6 +263,22 @@ public class CacheData {
|
|||||||
this.md5 = getMd5String(content);
|
this.md5 = getMd5String(content);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group,
|
||||||
|
String tenant, String content) {
|
||||||
|
if (null == dataId || null == group) {
|
||||||
|
throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
|
||||||
|
}
|
||||||
|
this.name = name;
|
||||||
|
this.configFilterChainManager = configFilterChainManager;
|
||||||
|
this.dataId = dataId;
|
||||||
|
this.group = group;
|
||||||
|
this.tenant = tenant;
|
||||||
|
listeners = new CopyOnWriteArrayList<ManagerListenerWrap>();
|
||||||
|
this.isInitializing = true;
|
||||||
|
this.content = StringUtils.isEmpty(content) ? loadCacheContentFromDiskLocal(name, dataId, group, tenant) : content;
|
||||||
|
this.md5 = getMd5String(content);
|
||||||
|
}
|
||||||
|
|
||||||
// ==================
|
// ==================
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
@ -294,6 +310,11 @@ class ManagerListenerWrap {
|
|||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ManagerListenerWrap(Listener listener, String md5) {
|
||||||
|
this.listener = listener;
|
||||||
|
this.lastCallMd5 = md5;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (null == obj || obj.getClass() != getClass()) {
|
if (null == obj || obj.getClass() != getClass()) {
|
||||||
|
@ -177,7 +177,6 @@ public class ClientWorker {
|
|||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
String key = GroupKey.getKeyTenant(dataId, group, tenant);
|
String key = GroupKey.getKeyTenant(dataId, group, tenant);
|
||||||
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
|
|
||||||
synchronized (cacheMap) {
|
synchronized (cacheMap) {
|
||||||
CacheData cacheFromMap = getCache(dataId, group, tenant);
|
CacheData cacheFromMap = getCache(dataId, group, tenant);
|
||||||
// multiple listeners on the same dataid+group and race condition,so
|
// multiple listeners on the same dataid+group and race condition,so
|
||||||
@ -187,6 +186,18 @@ public class ClientWorker {
|
|||||||
cache = cacheFromMap;
|
cache = cacheFromMap;
|
||||||
// reset so that server not hang this check
|
// reset so that server not hang this check
|
||||||
cache.setInitializing(true);
|
cache.setInitializing(true);
|
||||||
|
} else {
|
||||||
|
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
|
||||||
|
String content;
|
||||||
|
// fix issue # 1317
|
||||||
|
if (enableRemoteSyncConfig) {
|
||||||
|
try {
|
||||||
|
content = getServerConfig(dataId, group, tenant, 3000L);
|
||||||
|
} catch (NacosException ignore) {
|
||||||
|
content = null;
|
||||||
|
}
|
||||||
|
cache.setContent(content);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
|
Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
|
||||||
@ -471,6 +482,8 @@ public class ClientWorker {
|
|||||||
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
|
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
|
||||||
|
|
||||||
taskPenaltyTime = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_RETRY_TIME)), Constants.CONFIG_RETRY_TIME);
|
taskPenaltyTime = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_RETRY_TIME)), Constants.CONFIG_RETRY_TIME);
|
||||||
|
|
||||||
|
enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
class LongPollingRunnable implements Runnable {
|
class LongPollingRunnable implements Runnable {
|
||||||
@ -569,4 +582,5 @@ public class ClientWorker {
|
|||||||
private long timeout;
|
private long timeout;
|
||||||
private double currentLongingTaskCount = 0;
|
private double currentLongingTaskCount = 0;
|
||||||
private int taskPenaltyTime;
|
private int taskPenaltyTime;
|
||||||
|
private boolean enableRemoteSyncConfig = false;
|
||||||
}
|
}
|
||||||
|
@ -436,7 +436,7 @@ public class ConfigAPI_ITCase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @TCDescription : nacos_在主动拉取配置后并注册Listener,在更新配置后才触发Listener监听事件
|
* @TCDescription : nacos_在主动拉取配置后并注册Listener,在更新配置后才触发Listener监听事件(使用特定接口)
|
||||||
* @TestStep : TODO Test steps
|
* @TestStep : TODO Test steps
|
||||||
* @ExpectResult : TODO expect results
|
* @ExpectResult : TODO expect results
|
||||||
* @author xiaochun.xxc
|
* @author xiaochun.xxc
|
||||||
@ -450,7 +450,7 @@ public class ConfigAPI_ITCase {
|
|||||||
boolean result = iconfig.publishConfig(dataId, group, content);
|
boolean result = iconfig.publishConfig(dataId, group, content);
|
||||||
Assert.assertTrue(result);
|
Assert.assertTrue(result);
|
||||||
|
|
||||||
Thread.sleep(3000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
Listener ml = new AbstractListener() {
|
Listener ml = new AbstractListener() {
|
||||||
@Override
|
@Override
|
||||||
@ -460,6 +460,7 @@ public class ConfigAPI_ITCase {
|
|||||||
Assert.assertEquals(content, newContent);
|
Assert.assertEquals(content, newContent);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
String receiveContent = iconfig.getConfigAndSignListener(dataId, group, 1000, ml);
|
String receiveContent = iconfig.getConfigAndSignListener(dataId, group, 1000, ml);
|
||||||
System.out.println(receiveContent);
|
System.out.println(receiveContent);
|
||||||
|
|
||||||
@ -467,7 +468,58 @@ public class ConfigAPI_ITCase {
|
|||||||
Assert.assertTrue(result);
|
Assert.assertTrue(result);
|
||||||
|
|
||||||
Assert.assertEquals(content, receiveContent);
|
Assert.assertEquals(content, receiveContent);
|
||||||
Thread.sleep(3000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, count.get());
|
||||||
|
iconfig.removeListener(dataId, group, ml);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @TCDescription : nacos_在主动拉取配置后并注册Listener,在更新配置后才触发Listener监听事件(进行配置参数设置)
|
||||||
|
* @TestStep : TODO Test steps
|
||||||
|
* @ExpectResult : TODO expect results
|
||||||
|
* @author xiaochun.xxc
|
||||||
|
* @since 3.6.8
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void nacos_addListener_6() throws InterruptedException, NacosException {
|
||||||
|
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port);
|
||||||
|
properties.put(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG, "true");
|
||||||
|
ConfigService iconfig = NacosFactory.createConfigService(properties);
|
||||||
|
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
final String content = "test-abc";
|
||||||
|
final String newContent = "new-test-def";
|
||||||
|
boolean result = iconfig.publishConfig(dataId, group, content);
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
Listener ml = new AbstractListener() {
|
||||||
|
@Override
|
||||||
|
public void receiveConfigInfo(String configInfo) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
System.out.println("Listener receive : [" + configInfo + "]");
|
||||||
|
Assert.assertEquals(content, newContent);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
iconfig.addListener(dataId, group, ml);
|
||||||
|
|
||||||
|
String receiveContent = iconfig.getConfig(dataId, group, 1000);
|
||||||
|
|
||||||
|
System.out.println(receiveContent);
|
||||||
|
|
||||||
|
result = iconfig.publishConfig(dataId, group, newContent);
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
receiveContent = iconfig.getConfig(dataId, group, 1000);
|
||||||
|
|
||||||
|
Assert.assertEquals(newContent, receiveContent);
|
||||||
|
|
||||||
Assert.assertEquals(1, count.get());
|
Assert.assertEquals(1, count.get());
|
||||||
iconfig.removeListener(dataId, group, ml);
|
iconfig.removeListener(dataId, group, ml);
|
||||||
|
@ -82,7 +82,7 @@ public class ConfigLongPoll_ITCase {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(30);
|
TimeUnit.SECONDS.sleep(10);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user