config module coverage (#11738)
* testcase fix * checkstyle * testcase coverage * testcase coverage * testcase coverage * testcase coverage * testcase fixed
This commit is contained in:
parent
2bb93982df
commit
3da1240f3a
@ -46,8 +46,7 @@ public class ServerHttpAgent implements HttpAgent {
|
||||
|
||||
private static final Logger LOGGER = LogUtils.logger(ServerHttpAgent.class);
|
||||
|
||||
private static final NacosRestTemplate NACOS_RESTTEMPLATE = ConfigHttpClientManager.getInstance()
|
||||
.getNacosRestTemplate();
|
||||
private NacosRestTemplate nacosRestTemplate = ConfigHttpClientManager.getInstance().getNacosRestTemplate();
|
||||
|
||||
private String encode;
|
||||
|
||||
@ -71,8 +70,8 @@ public class ServerHttpAgent implements HttpAgent {
|
||||
newHeaders.addAll(headers);
|
||||
}
|
||||
Query query = Query.newInstance().initParams(paramValues);
|
||||
HttpRestResult<String> result = NACOS_RESTTEMPLATE
|
||||
.get(getUrl(currentServerAddr, path), httpConfig, newHeaders, query, String.class);
|
||||
HttpRestResult<String> result = nacosRestTemplate.get(getUrl(currentServerAddr, path), httpConfig,
|
||||
newHeaders, query, String.class);
|
||||
if (isFail(result)) {
|
||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
||||
serverListMgr.getCurrentServerAddr(), result.getCode());
|
||||
@ -125,8 +124,8 @@ public class ServerHttpAgent implements HttpAgent {
|
||||
if (headers != null) {
|
||||
newHeaders.addAll(headers);
|
||||
}
|
||||
HttpRestResult<String> result = NACOS_RESTTEMPLATE
|
||||
.postForm(getUrl(currentServerAddr, path), httpConfig, newHeaders, paramValues, String.class);
|
||||
HttpRestResult<String> result = nacosRestTemplate.postForm(getUrl(currentServerAddr, path), httpConfig,
|
||||
newHeaders, paramValues, String.class);
|
||||
|
||||
if (isFail(result)) {
|
||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", currentServerAddr,
|
||||
@ -180,8 +179,8 @@ public class ServerHttpAgent implements HttpAgent {
|
||||
newHeaders.addAll(headers);
|
||||
}
|
||||
Query query = Query.newInstance().initParams(paramValues);
|
||||
HttpRestResult<String> result = NACOS_RESTTEMPLATE
|
||||
.delete(getUrl(currentServerAddr, path), httpConfig, newHeaders, query, String.class);
|
||||
HttpRestResult<String> result = nacosRestTemplate.delete(getUrl(currentServerAddr, path), httpConfig,
|
||||
newHeaders, query, String.class);
|
||||
if (isFail(result)) {
|
||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
||||
serverListMgr.getCurrentServerAddr(), result.getCode());
|
||||
|
@ -55,17 +55,25 @@ public class CacheData {
|
||||
|
||||
private static final Logger LOGGER = LogUtils.logger(CacheData.class);
|
||||
|
||||
private static long notifyWarnTimeout = 60000;
|
||||
private static final long DEFAULT_NOTIF_WARN_TIMEOUTS = 60000;
|
||||
|
||||
private static long notifyWarnTimeout = DEFAULT_NOTIF_WARN_TIMEOUTS;
|
||||
|
||||
static {
|
||||
initNotifyWarnTimeout();
|
||||
}
|
||||
|
||||
static long initNotifyWarnTimeout() {
|
||||
String notifyTimeouts = System.getProperty("nacos.listener.notify.warn.timeout");
|
||||
if (StringUtils.isNotBlank(notifyTimeouts) && NumberUtils.isDigits(notifyTimeouts)) {
|
||||
notifyWarnTimeout = Long.valueOf(notifyTimeouts);
|
||||
LOGGER.info("config listener notify warn timeout millis is set to {}", notifyWarnTimeout);
|
||||
} else {
|
||||
LOGGER.info("config listener notify warn timeout millis use default {} millis ", notifyWarnTimeout);
|
||||
|
||||
LOGGER.info("config listener notify warn timeout millis use default {} millis ",
|
||||
DEFAULT_NOTIF_WARN_TIMEOUTS);
|
||||
notifyWarnTimeout = DEFAULT_NOTIF_WARN_TIMEOUTS;
|
||||
}
|
||||
return notifyWarnTimeout;
|
||||
}
|
||||
|
||||
static ScheduledThreadPoolExecutor scheduledExecutor;
|
||||
@ -594,8 +602,7 @@ public class CacheData {
|
||||
}
|
||||
|
||||
ManagerListenerWrap(Listener listener, String md5, String lastContent) {
|
||||
this.listener = listener;
|
||||
this.lastCallMd5 = md5;
|
||||
this(listener, md5);
|
||||
this.lastContent = lastContent;
|
||||
}
|
||||
|
||||
|
@ -495,34 +495,6 @@ public class ClientWorker implements Closeable {
|
||||
return properties.getInteger(PropertyKeyConst.CLIENT_WORKER_THREAD_COUNT, count);
|
||||
}
|
||||
|
||||
private void refreshContentAndCheck(String groupKey, boolean notify) {
|
||||
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
|
||||
CacheData cache = cacheMap.get().get(groupKey);
|
||||
refreshContentAndCheck(cache, notify);
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
|
||||
try {
|
||||
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
|
||||
notify);
|
||||
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
|
||||
cacheData.setContent(response.getContent());
|
||||
if (null != response.getConfigType()) {
|
||||
cacheData.setType(response.getConfigType());
|
||||
}
|
||||
if (notify) {
|
||||
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
|
||||
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
|
||||
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
|
||||
}
|
||||
cacheData.checkListenerMd5();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
|
||||
cacheData.group, cacheData.tenant, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void init(NacosClientProperties properties) {
|
||||
|
||||
timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
|
||||
@ -535,7 +507,7 @@ public class ClientWorker implements Closeable {
|
||||
properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
|
||||
}
|
||||
|
||||
private Map<String, Object> getMetrics(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
|
||||
Map<String, Object> getMetrics(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
|
||||
Map<String, Object> metric = new HashMap<>(16);
|
||||
metric.put("listenConfigSize", String.valueOf(this.cacheMap.get().size()));
|
||||
metric.put("clientVersion", VersionUtils.getFullClientVersion());
|
||||
@ -598,15 +570,15 @@ public class ClientWorker implements Closeable {
|
||||
public class ConfigRpcTransportClient extends ConfigTransportClient {
|
||||
|
||||
Map<String, ExecutorService> multiTaskExecutor = new HashMap<>();
|
||||
|
||||
|
||||
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
|
||||
|
||||
|
||||
private final Object bellItem = new Object();
|
||||
|
||||
private long lastAllSyncTime = System.currentTimeMillis();
|
||||
|
||||
Subscriber subscriber = null;
|
||||
|
||||
|
||||
/**
|
||||
* 3 minutes to check all listen cache keys.
|
||||
*/
|
||||
@ -618,7 +590,6 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
private ConnectionType getConnectionType() {
|
||||
return ConnectionType.GRPC;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -669,38 +640,46 @@ public class ClientWorker implements Closeable {
|
||||
return labels;
|
||||
}
|
||||
|
||||
ConfigChangeNotifyResponse handleConfigChangeNotifyRequest(ConfigChangeNotifyRequest configChangeNotifyRequest,
|
||||
String clientName) {
|
||||
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}", clientName,
|
||||
configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
|
||||
configChangeNotifyRequest.getTenant());
|
||||
String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
|
||||
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
|
||||
|
||||
CacheData cacheData = cacheMap.get().get(groupKey);
|
||||
if (cacheData != null) {
|
||||
synchronized (cacheData) {
|
||||
cacheData.getReceiveNotifyChanged().set(true);
|
||||
cacheData.setConsistentWithServer(false);
|
||||
notifyListenConfig();
|
||||
}
|
||||
|
||||
}
|
||||
return new ConfigChangeNotifyResponse();
|
||||
}
|
||||
|
||||
ClientConfigMetricResponse handleClientMetricsRequest(ClientConfigMetricRequest configMetricRequest) {
|
||||
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
|
||||
response.setMetrics(getMetrics(configMetricRequest.getMetricsKeys()));
|
||||
return response;
|
||||
}
|
||||
|
||||
private void initRpcClientHandler(final RpcClient rpcClientInner) {
|
||||
/*
|
||||
* Register Config Change /Config ReSync Handler
|
||||
*/
|
||||
rpcClientInner.registerServerRequestHandler((request, connection) -> {
|
||||
if (request instanceof ConfigChangeNotifyRequest) {
|
||||
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
|
||||
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
|
||||
rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
|
||||
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
|
||||
String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
|
||||
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
|
||||
|
||||
CacheData cacheData = cacheMap.get().get(groupKey);
|
||||
if (cacheData != null) {
|
||||
synchronized (cacheData) {
|
||||
cacheData.getReceiveNotifyChanged().set(true);
|
||||
cacheData.setConsistentWithServer(false);
|
||||
notifyListenConfig();
|
||||
}
|
||||
|
||||
}
|
||||
return new ConfigChangeNotifyResponse();
|
||||
handleConfigChangeNotifyRequest((ConfigChangeNotifyRequest) request, rpcClientInner.getName());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
rpcClientInner.registerServerRequestHandler((request, connection) -> {
|
||||
if (request instanceof ClientConfigMetricRequest) {
|
||||
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
|
||||
response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys()));
|
||||
return response;
|
||||
return handleClientMetricsRequest((ClientConfigMetricRequest) request);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
@ -801,7 +780,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeConfigListen() {
|
||||
public void executeConfigListen() throws NacosException {
|
||||
|
||||
Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
|
||||
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);
|
||||
@ -810,9 +789,9 @@ public class ClientWorker implements Closeable {
|
||||
for (CacheData cache : cacheMap.get().values()) {
|
||||
|
||||
synchronized (cache) {
|
||||
|
||||
|
||||
checkLocalConfig(cache);
|
||||
|
||||
|
||||
// check local listeners consistent.
|
||||
if (cache.isConsistentWithServer()) {
|
||||
cache.checkListenerMd5();
|
||||
@ -820,12 +799,12 @@ public class ClientWorker implements Closeable {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If local configuration information is used, then skip the processing directly.
|
||||
if (cache.isUseLocalConfigInfo()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (!cache.isDiscard()) {
|
||||
List<CacheData> cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()),
|
||||
k -> new LinkedList<>());
|
||||
@ -844,7 +823,7 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
//execute check remove listen.
|
||||
checkRemoveListenCache(removeListenCachesMap);
|
||||
|
||||
|
||||
if (needAllSync) {
|
||||
lastAllSyncTime = now;
|
||||
}
|
||||
@ -852,9 +831,9 @@ public class ClientWorker implements Closeable {
|
||||
if (hasChangedKeys) {
|
||||
notifyListenConfig();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks and handles local configuration for a given CacheData object. This method evaluates the use of
|
||||
* failover files for local configuration storage and updates the CacheData accordingly.
|
||||
@ -866,10 +845,10 @@ public class ClientWorker implements Closeable {
|
||||
final String group = cacheData.group;
|
||||
final String tenant = cacheData.tenant;
|
||||
final String envName = cacheData.envName;
|
||||
|
||||
|
||||
// Check if a failover file exists for the specified dataId, group, and tenant.
|
||||
File file = LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant);
|
||||
|
||||
|
||||
// If not using local config info and a failover file exists, load and use it.
|
||||
if (!cacheData.isUseLocalConfigInfo() && file.exists()) {
|
||||
String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant);
|
||||
@ -882,7 +861,7 @@ public class ClientWorker implements Closeable {
|
||||
envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content));
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// If use local config info, but the failover file is deleted, switch back to server config.
|
||||
if (cacheData.isUseLocalConfigInfo() && !file.exists()) {
|
||||
cacheData.setUseLocalConfigInfo(false);
|
||||
@ -890,7 +869,7 @@ public class ClientWorker implements Closeable {
|
||||
dataId, group, tenant);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// When the failover file content changes, indicating a change in local configuration.
|
||||
if (cacheData.isUseLocalConfigInfo() && file.exists()
|
||||
&& cacheData.getLocalConfigInfoVersion() != file.lastModified()) {
|
||||
@ -904,7 +883,7 @@ public class ClientWorker implements Closeable {
|
||||
envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private ExecutorService ensureSyncExecutor(String taskId) {
|
||||
if (!multiTaskExecutor.containsKey(taskId)) {
|
||||
multiTaskExecutor.put(taskId,
|
||||
@ -916,21 +895,50 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
return multiTaskExecutor.get(taskId);
|
||||
}
|
||||
|
||||
private void checkRemoveListenCache(Map<String, List<CacheData>> removeListenCachesMap) {
|
||||
|
||||
private void refreshContentAndCheck(RpcClient rpcClient, String groupKey, boolean notify) {
|
||||
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
|
||||
CacheData cache = cacheMap.get().get(groupKey);
|
||||
refreshContentAndCheck(rpcClient, cache, notify);
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshContentAndCheck(RpcClient rpcClient, CacheData cacheData, boolean notify) {
|
||||
try {
|
||||
|
||||
ConfigResponse response = this.queryConfigInner(rpcClient, cacheData.dataId, cacheData.group,
|
||||
cacheData.tenant, 3000L, notify);
|
||||
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
|
||||
cacheData.setContent(response.getContent());
|
||||
if (null != response.getConfigType()) {
|
||||
cacheData.setType(response.getConfigType());
|
||||
}
|
||||
if (notify) {
|
||||
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
|
||||
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
|
||||
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
|
||||
}
|
||||
cacheData.checkListenerMd5();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
|
||||
cacheData.group, cacheData.tenant, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkRemoveListenCache(Map<String, List<CacheData>> removeListenCachesMap) throws NacosException {
|
||||
if (!removeListenCachesMap.isEmpty()) {
|
||||
List<Future> listenFutures = new ArrayList<>();
|
||||
|
||||
|
||||
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
|
||||
RpcClient rpcClient = ensureRpcClient(taskId);
|
||||
|
||||
ExecutorService executorService = ensureSyncExecutor(taskId);
|
||||
Future future = executorService.submit(() -> {
|
||||
List<CacheData> removeListenCaches = entry.getValue();
|
||||
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
|
||||
configChangeListenRequest.setListen(false);
|
||||
try {
|
||||
RpcClient rpcClient = ensureRpcClient(taskId);
|
||||
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
|
||||
if (removeSuccess) {
|
||||
for (CacheData cacheData : removeListenCaches) {
|
||||
@ -942,7 +950,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Async remove listen config change error ", e);
|
||||
try {
|
||||
@ -954,7 +962,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
});
|
||||
listenFutures.add(future);
|
||||
|
||||
|
||||
}
|
||||
for (Future future : listenFutures) {
|
||||
try {
|
||||
@ -965,14 +973,16 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {
|
||||
|
||||
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) throws NacosException {
|
||||
|
||||
final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
|
||||
if (!listenCachesMap.isEmpty()) {
|
||||
List<Future> listenFutures = new ArrayList<>();
|
||||
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
RpcClient rpcClient = ensureRpcClient(taskId);
|
||||
|
||||
ExecutorService executorService = ensureSyncExecutor(taskId);
|
||||
Future future = executorService.submit(() -> {
|
||||
List<CacheData> listenCaches = entry.getValue();
|
||||
@ -983,13 +993,12 @@ public class ClientWorker implements Closeable {
|
||||
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
|
||||
configChangeListenRequest.setListen(true);
|
||||
try {
|
||||
RpcClient rpcClient = ensureRpcClient(taskId);
|
||||
ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
|
||||
rpcClient, configChangeListenRequest);
|
||||
if (listenResponse != null && listenResponse.isSuccess()) {
|
||||
|
||||
|
||||
Set<String> changeKeys = new HashSet<String>();
|
||||
|
||||
|
||||
List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
|
||||
//handle changed keys,notify listener
|
||||
if (!CollectionUtils.isEmpty(changedConfigs)) {
|
||||
@ -999,22 +1008,22 @@ public class ClientWorker implements Closeable {
|
||||
changeConfig.getGroup(), changeConfig.getTenant());
|
||||
changeKeys.add(changeKey);
|
||||
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
|
||||
refreshContentAndCheck(changeKey, !isInitializing);
|
||||
refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
for (CacheData cacheData : listenCaches) {
|
||||
if (cacheData.getReceiveNotifyChanged().get()) {
|
||||
String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
|
||||
cacheData.getTenant());
|
||||
if (!changeKeys.contains(changeKey)) {
|
||||
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
|
||||
refreshContentAndCheck(changeKey, !isInitializing);
|
||||
refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//handler content configs
|
||||
for (CacheData cacheData : listenCaches) {
|
||||
cacheData.setInitializing(false);
|
||||
@ -1028,7 +1037,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Execute listen config change error ", e);
|
||||
@ -1041,7 +1050,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
});
|
||||
listenFutures.add(future);
|
||||
|
||||
|
||||
}
|
||||
for (Future future : listenFutures) {
|
||||
try {
|
||||
@ -1050,7 +1059,7 @@ public class ClientWorker implements Closeable {
|
||||
LOGGER.error("Async listen config change error ", throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
return hasChangedKeys.get();
|
||||
}
|
||||
@ -1073,7 +1082,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* build config string.
|
||||
*
|
||||
@ -1112,8 +1121,6 @@ public class ClientWorker implements Closeable {
|
||||
@Override
|
||||
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
|
||||
throws NacosException {
|
||||
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
|
||||
request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
|
||||
RpcClient rpcClient = getOneRunningClient();
|
||||
if (notify) {
|
||||
CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
|
||||
@ -1121,6 +1128,16 @@ public class ClientWorker implements Closeable {
|
||||
rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
|
||||
}
|
||||
}
|
||||
|
||||
return queryConfigInner(rpcClient, dataId, group, tenant, readTimeouts, notify);
|
||||
|
||||
}
|
||||
|
||||
ConfigResponse queryConfigInner(RpcClient rpcClient, String dataId, String group, String tenant,
|
||||
long readTimeouts, boolean notify) throws NacosException {
|
||||
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
|
||||
request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
|
||||
|
||||
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
|
||||
|
||||
ConfigResponse configResponse = new ConfigResponse();
|
||||
@ -1244,7 +1261,7 @@ public class ClientWorker implements Closeable {
|
||||
ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request);
|
||||
return response.isSuccess();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check server is health.
|
||||
*
|
||||
|
@ -175,8 +175,10 @@ public abstract class ConfigTransportClient {
|
||||
|
||||
/**
|
||||
* listen change .
|
||||
*
|
||||
* @throws NacosException nacos exception throws, should retry.
|
||||
*/
|
||||
public abstract void executeConfigListen();
|
||||
public abstract void executeConfigListen() throws NacosException;
|
||||
|
||||
/**
|
||||
* remove cache implements.
|
||||
|
@ -29,13 +29,20 @@ import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class NacosConfigServiceTest {
|
||||
|
||||
private NacosConfigService nacosConfigService;
|
||||
@ -64,7 +71,7 @@ public class NacosConfigServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConfig() throws NacosException {
|
||||
public void testGetConfigFromServer() throws NacosException {
|
||||
final String dataId = "1";
|
||||
final String group = "2";
|
||||
final String tenant = "";
|
||||
@ -79,6 +86,87 @@ public class NacosConfigServiceTest {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConfigFromFailOver() throws NacosException {
|
||||
final String dataId = "1failover";
|
||||
final String group = "2";
|
||||
final String tenant = "";
|
||||
|
||||
MockedStatic<LocalConfigInfoProcessor> localConfigInfoProcessorMockedStatic = Mockito.mockStatic(
|
||||
LocalConfigInfoProcessor.class);
|
||||
try {
|
||||
String contentFailOver = "failOverContent" + System.currentTimeMillis();
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailover(any(), eq(dataId), eq(group), eq(tenant)))
|
||||
.thenReturn(contentFailOver);
|
||||
final int timeout = 3000;
|
||||
|
||||
final String config = nacosConfigService.getConfig(dataId, group, timeout);
|
||||
Assert.assertEquals(contentFailOver, config);
|
||||
} finally {
|
||||
localConfigInfoProcessorMockedStatic.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConfigFromLocalCache() throws NacosException {
|
||||
final String dataId = "1localcache";
|
||||
final String group = "2";
|
||||
final String tenant = "";
|
||||
|
||||
MockedStatic<LocalConfigInfoProcessor> localConfigInfoProcessorMockedStatic = Mockito.mockStatic(
|
||||
LocalConfigInfoProcessor.class);
|
||||
try {
|
||||
String contentFailOver = "localCacheContent" + System.currentTimeMillis();
|
||||
//fail over null
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailover(any(), eq(dataId), eq(group), eq(tenant)))
|
||||
.thenReturn(null);
|
||||
//snapshot content
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getSnapshot(any(), eq(dataId), eq(group), eq(tenant)))
|
||||
.thenReturn(contentFailOver);
|
||||
//form server error.
|
||||
final int timeout = 3000;
|
||||
Mockito.when(mockWoker.getServerConfig(dataId, group, "", timeout, false)).thenThrow(new NacosException());
|
||||
|
||||
final String config = nacosConfigService.getConfig(dataId, group, timeout);
|
||||
Assert.assertEquals(contentFailOver, config);
|
||||
} finally {
|
||||
localConfigInfoProcessorMockedStatic.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConfig403() throws NacosException {
|
||||
final String dataId = "1localcache403";
|
||||
final String group = "2";
|
||||
final String tenant = "";
|
||||
|
||||
MockedStatic<LocalConfigInfoProcessor> localConfigInfoProcessorMockedStatic = Mockito.mockStatic(
|
||||
LocalConfigInfoProcessor.class);
|
||||
try {
|
||||
//fail over null
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailover(any(), eq(dataId), eq(group), eq(tenant)))
|
||||
.thenReturn(null);
|
||||
|
||||
//form server error.
|
||||
final int timeout = 3000;
|
||||
Mockito.when(mockWoker.getServerConfig(dataId, group, "", timeout, false))
|
||||
.thenThrow(new NacosException(NacosException.NO_RIGHT, "no right"));
|
||||
try {
|
||||
nacosConfigService.getConfig(dataId, group, timeout);
|
||||
Assert.assertTrue(false);
|
||||
} catch (NacosException e) {
|
||||
Assert.assertEquals(NacosException.NO_RIGHT, e.getErrCode());
|
||||
}
|
||||
} finally {
|
||||
localConfigInfoProcessorMockedStatic.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConfigAndSignListener() throws NacosException {
|
||||
final String dataId = "1";
|
||||
@ -98,37 +186,33 @@ public class NacosConfigServiceTest {
|
||||
}
|
||||
};
|
||||
|
||||
ConfigResponse response = new ConfigResponse();
|
||||
response.setContent(content);
|
||||
response.setConfigType("bb");
|
||||
Mockito.when(mockWoker.getServerConfig(dataId, group, "", timeout, false)).thenReturn(response);
|
||||
final NacosClientProperties properties = NacosClientProperties.PROTOTYPE.derive(new Properties());
|
||||
Mockito.when(mockWoker.getAgent()).thenReturn(new ConfigTransportClient(properties, new ServerListManager()) {
|
||||
@Override
|
||||
public void startInternal() throws NacosException {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "TestConfigTransportClient";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void notifyListenConfig() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void executeConfigListen() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void removeCache(String dataId, String group) {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeous,
|
||||
boolean notify) throws NacosException {
|
||||
@ -139,14 +223,14 @@ public class NacosConfigServiceTest {
|
||||
configResponse.setTenant(tenant);
|
||||
return configResponse;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean publishConfig(String dataId, String group, String tenant, String appName, String tag,
|
||||
String betaIps, String content, String encryptedDataKey, String casMd5, String type)
|
||||
throws NacosException {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
|
||||
return false;
|
||||
|
@ -18,15 +18,60 @@ package com.alibaba.nacos.client.config.http;
|
||||
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
|
||||
import com.alibaba.nacos.client.config.impl.ConfigHttpClientManager;
|
||||
import com.alibaba.nacos.client.config.impl.ServerListManager;
|
||||
import com.alibaba.nacos.common.http.HttpClientConfig;
|
||||
import com.alibaba.nacos.common.http.HttpRestResult;
|
||||
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
|
||||
import com.alibaba.nacos.common.http.param.Header;
|
||||
import com.alibaba.nacos.common.http.param.Query;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class ServerHttpAgentTest {
|
||||
|
||||
NacosRestTemplate nacosRestTemplate;
|
||||
|
||||
MockedStatic<ConfigHttpClientManager> configHttpClientManagerMockedStatic;
|
||||
|
||||
@Mock
|
||||
ConfigHttpClientManager configHttpClientManager;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
configHttpClientManagerMockedStatic = Mockito.mockStatic(ConfigHttpClientManager.class);
|
||||
configHttpClientManagerMockedStatic.when(() -> ConfigHttpClientManager.getInstance())
|
||||
.thenReturn(configHttpClientManager);
|
||||
nacosRestTemplate = Mockito.mock(NacosRestTemplate.class);
|
||||
Mockito.when(configHttpClientManager.getNacosRestTemplate()).thenReturn(nacosRestTemplate);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
configHttpClientManagerMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstruct() throws NacosException {
|
||||
ServerListManager server = new ServerListManager();
|
||||
@ -43,6 +88,154 @@ public class ServerHttpAgentTest {
|
||||
|
||||
}
|
||||
|
||||
private void resetNacosHttpTemplate(ServerHttpAgent serverHttpAgent, NacosRestTemplate nacosRestTemplate)
|
||||
throws Exception {
|
||||
Field nacosRestTemplateFiled = ServerHttpAgent.class.getDeclaredField("nacosRestTemplate");
|
||||
nacosRestTemplateFiled.setAccessible(true);
|
||||
nacosRestTemplateFiled.set(serverHttpAgent, nacosRestTemplate);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpGetSuccess() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.get(anyString(), any(HttpClientConfig.class), any(Header.class), any(Query.class),
|
||||
eq(String.class))).thenReturn(new HttpRestResult(Header.newInstance(), 500, "", ""))
|
||||
.thenThrow(new ConnectException())
|
||||
.thenReturn(new HttpRestResult(Header.newInstance(), 200, "hello", "success"));
|
||||
ServerListManager server = new ServerListManager(
|
||||
Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
HttpRestResult<String> stringHttpRestResult = serverHttpAgent.httpGet(path, Header.newInstance().getHeader(),
|
||||
parmas, "UTF-8", 3000L);
|
||||
Assert.assertEquals("hello", stringHttpRestResult.getData());
|
||||
Assert.assertEquals(true, stringHttpRestResult.ok());
|
||||
Assert.assertEquals("success", stringHttpRestResult.getMessage());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpGetFail() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.get(anyString(), any(HttpClientConfig.class), any(Header.class), any(Query.class),
|
||||
eq(String.class))).thenThrow(new SocketTimeoutException()).thenThrow(new ConnectException())
|
||||
.thenThrow(new ConnectException()).thenThrow(new NacosRuntimeException(2048));
|
||||
ServerListManager server = new ServerListManager(Arrays.asList("127.0.0.1", "127.0.0.2"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
try {
|
||||
serverHttpAgent.httpGet(path, Header.newInstance().getHeader(), parmas, "UTF-8", 3000L);
|
||||
Assert.fail();
|
||||
} catch (NacosRuntimeException e) {
|
||||
Assert.assertEquals(e.getErrCode(), 2048);
|
||||
} catch (Exception e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpPostSuccess() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.postForm(anyString(), any(HttpClientConfig.class), any(Header.class), any(Map.class),
|
||||
eq(String.class))).thenReturn(new HttpRestResult(Header.newInstance(), 500, "", ""))
|
||||
.thenThrow(new ConnectException())
|
||||
.thenReturn(new HttpRestResult(Header.newInstance(), 200, "hello", "success"));
|
||||
ServerListManager server = new ServerListManager(
|
||||
Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
HttpRestResult<String> stringHttpRestResult = serverHttpAgent.httpPost(path, Header.newInstance().getHeader(),
|
||||
parmas, "UTF-8", 3000L);
|
||||
Assert.assertEquals("hello", stringHttpRestResult.getData());
|
||||
Assert.assertEquals(true, stringHttpRestResult.ok());
|
||||
Assert.assertEquals("success", stringHttpRestResult.getMessage());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpPostFail() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.postForm(anyString(), any(HttpClientConfig.class), any(Header.class), any(Map.class),
|
||||
eq(String.class))).thenThrow(new SocketTimeoutException()).thenThrow(new ConnectException())
|
||||
.thenThrow(new ConnectException()).thenThrow(new NacosRuntimeException(2048));
|
||||
ServerListManager server = new ServerListManager(Arrays.asList("127.0.0.1", "127.0.0.2"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
try {
|
||||
serverHttpAgent.httpPost(path, Header.newInstance().getHeader(), parmas, "UTF-8", 3000L);
|
||||
Assert.fail();
|
||||
} catch (NacosRuntimeException e) {
|
||||
Assert.assertEquals(e.getErrCode(), 2048);
|
||||
} catch (Exception e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpDeleteSuccess() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.delete(anyString(), any(HttpClientConfig.class), any(Header.class), any(Query.class),
|
||||
eq(String.class))).thenReturn(new HttpRestResult(Header.newInstance(), 500, "", ""))
|
||||
.thenThrow(new ConnectException())
|
||||
.thenReturn(new HttpRestResult(Header.newInstance(), 200, "hello", "success"));
|
||||
ServerListManager server = new ServerListManager(
|
||||
Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
HttpRestResult<String> stringHttpRestResult = serverHttpAgent.httpDelete(path, Header.newInstance().getHeader(),
|
||||
parmas, "UTF-8", 3000L);
|
||||
Assert.assertEquals("hello", stringHttpRestResult.getData());
|
||||
Assert.assertEquals(true, stringHttpRestResult.ok());
|
||||
Assert.assertEquals("success", stringHttpRestResult.getMessage());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpDeleteFail() throws Exception {
|
||||
|
||||
Mockito.when(
|
||||
nacosRestTemplate.delete(anyString(), any(HttpClientConfig.class), any(Header.class), any(Query.class),
|
||||
eq(String.class))).thenThrow(new SocketTimeoutException()).thenThrow(new ConnectException())
|
||||
.thenThrow(new ConnectException()).thenThrow(new NacosRuntimeException(2048));
|
||||
ServerListManager server = new ServerListManager(Arrays.asList("127.0.0.1", "127.0.0.2"));
|
||||
final ServerHttpAgent serverHttpAgent = new ServerHttpAgent(server);
|
||||
resetNacosHttpTemplate(serverHttpAgent, nacosRestTemplate);
|
||||
|
||||
String path = "config.do";
|
||||
Map<String, String> parmas = new HashMap<>();
|
||||
parmas.put("dataId", "12345");
|
||||
try {
|
||||
serverHttpAgent.httpDelete(path, Header.newInstance().getHeader(), parmas, "UTF-8", 3000L);
|
||||
Assert.fail();
|
||||
} catch (NacosRuntimeException e) {
|
||||
Assert.assertEquals(e.getErrCode(), 2048);
|
||||
} catch (Exception e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetterAndSetter() throws NacosException {
|
||||
ServerListManager server = new ServerListManager("aaa", "namespace1");
|
||||
|
@ -16,9 +16,16 @@
|
||||
|
||||
package com.alibaba.nacos.client.config.impl;
|
||||
|
||||
import com.alibaba.nacos.api.config.ConfigChangeEvent;
|
||||
import com.alibaba.nacos.api.config.PropertyChangeType;
|
||||
import com.alibaba.nacos.api.config.listener.AbstractSharedListener;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
|
||||
import com.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.common.utils.MD5Utils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -28,6 +35,7 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class CacheDataTest {
|
||||
|
||||
@ -80,6 +88,16 @@ public class CacheDataTest {
|
||||
Assert.assertEquals(timeStamp, cacheData1.getLocalConfigInfoVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotifyWarnTimeout() {
|
||||
System.setProperty("nacos.listener.notify.warn.timeout", "5000");
|
||||
long notifyWarnTimeout = CacheData.initNotifyWarnTimeout();
|
||||
Assert.assertEquals(5000, notifyWarnTimeout);
|
||||
System.setProperty("nacos.listener.notify.warn.timeout", "1bf000abc");
|
||||
long notifyWarnTimeout2 = CacheData.initNotifyWarnTimeout();
|
||||
Assert.assertEquals(60000, notifyWarnTimeout2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListener() throws NacosException {
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
@ -133,4 +151,112 @@ public class CacheDataTest {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckListenerMd5NotifyTimeouts() throws NacosException {
|
||||
System.setProperty("nacos.listener.notify.warn.timeout", "1000");
|
||||
long notifyWarnTimeout = CacheData.initNotifyWarnTimeout();
|
||||
Assert.assertEquals(1000, notifyWarnTimeout);
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
final CacheData data = new CacheData(filter, "name1", "keytimeouts", "group", "tenant");
|
||||
Listener listener = new Listener() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return Runnable::run;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receiveConfigInfo(String configInfo) {
|
||||
try {
|
||||
Thread.sleep(11000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
AtomicReference<String> dataIdNotifyTimeouts = new AtomicReference();
|
||||
NotifyCenter.registerSubscriber(new Subscriber() {
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
ChangeNotifyBlockEvent changeNotifyBlockEvent = (ChangeNotifyBlockEvent) event;
|
||||
dataIdNotifyTimeouts.set(changeNotifyBlockEvent.getDataId());
|
||||
System.out.println("timeout:" + changeNotifyBlockEvent.getDataId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Event> subscribeType() {
|
||||
return ChangeNotifyBlockEvent.class;
|
||||
}
|
||||
});
|
||||
data.addListener(listener);
|
||||
data.setContent("new");
|
||||
data.checkListenerMd5();
|
||||
Assert.assertTrue(data.checkListenersMd5Consistent());
|
||||
Assert.assertEquals("keytimeouts", dataIdNotifyTimeouts.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbstractSharedListener() throws NacosException {
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
final CacheData data = new CacheData(filter, "name1", "keyshare", "group", "tenant");
|
||||
|
||||
final String[] dataIdReceive = new String[1];
|
||||
final String[] groupReceive = new String[1];
|
||||
final String[] contentReceive = new String[1];
|
||||
|
||||
Listener listener = new AbstractSharedListener() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return Runnable::run;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerReceive(String dataId, String group, String configInfo) {
|
||||
dataIdReceive[0] = dataId;
|
||||
groupReceive[0] = group;
|
||||
contentReceive[0] = configInfo;
|
||||
}
|
||||
|
||||
};
|
||||
data.addListener(listener);
|
||||
String content = "content" + System.currentTimeMillis();
|
||||
data.setContent(content);
|
||||
data.checkListenerMd5();
|
||||
Assert.assertTrue(data.checkListenersMd5Consistent());
|
||||
Assert.assertEquals(dataIdReceive[0], "keyshare");
|
||||
Assert.assertEquals(groupReceive[0], "group");
|
||||
Assert.assertEquals(contentReceive[0], content);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbstractConfigChangeListener() throws NacosException {
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
final CacheData data = new CacheData(filter, "name1", "keyshare", "group", "tenant");
|
||||
data.setType("properties");
|
||||
data.setContent("a=a\nb=b\nc=c");
|
||||
|
||||
AtomicReference<ConfigChangeEvent> changeItemReceived = new AtomicReference<>();
|
||||
Listener listener = new AbstractConfigChangeListener() {
|
||||
@Override
|
||||
public void receiveConfigChange(ConfigChangeEvent event) {
|
||||
changeItemReceived.set(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return Runnable::run;
|
||||
}
|
||||
|
||||
};
|
||||
data.addListener(listener);
|
||||
String content = "b=b\nc=abc\nd=d";
|
||||
data.setContent(content);
|
||||
data.checkListenerMd5();
|
||||
Assert.assertTrue(data.checkListenersMd5Consistent());
|
||||
Assert.assertEquals(PropertyChangeType.DELETED, changeItemReceived.get().getChangeItem("a").getType());
|
||||
Assert.assertEquals(PropertyChangeType.MODIFIED, changeItemReceived.get().getChangeItem("c").getType());
|
||||
Assert.assertEquals(PropertyChangeType.ADDED, changeItemReceived.get().getChangeItem("d").getType());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,43 +17,89 @@
|
||||
package com.alibaba.nacos.client.config.impl;
|
||||
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigType;
|
||||
import com.alibaba.nacos.api.config.listener.AbstractListener;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
|
||||
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.client.config.common.GroupKey;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
|
||||
import com.alibaba.nacos.client.env.NacosClientProperties;
|
||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClient;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.MD5Utils;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static com.alibaba.nacos.api.annotation.NacosProperties.NAMESPACE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class ClientWorkerTest {
|
||||
|
||||
MockedStatic<RpcClientFactory> rpcClientFactoryMockedStatic;
|
||||
|
||||
MockedStatic<LocalConfigInfoProcessor> localConfigInfoProcessorMockedStatic;
|
||||
|
||||
private static final String TEST_NAMESPACE = "TEST_NAMESPACE";
|
||||
|
||||
private ClientWorker clientWorker;
|
||||
|
||||
private ClientWorker clientWorkerSpy;
|
||||
|
||||
@Mock
|
||||
RpcClient rpcClient;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
public void before() {
|
||||
rpcClientFactoryMockedStatic = Mockito.mockStatic(RpcClientFactory.class);
|
||||
|
||||
rpcClientFactoryMockedStatic.when(
|
||||
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
|
||||
any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
|
||||
localConfigInfoProcessorMockedStatic = Mockito.mockStatic(LocalConfigInfoProcessor.class);
|
||||
Properties properties = new Properties();
|
||||
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE);
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(properties);
|
||||
@ -67,12 +113,18 @@ public class ClientWorkerTest {
|
||||
clientWorkerSpy = Mockito.spy(clientWorker);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
rpcClientFactoryMockedStatic.close();
|
||||
localConfigInfoProcessorMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstruct() throws NacosException {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
Assert.assertNotNull(clientWorker);
|
||||
@ -83,7 +135,7 @@ public class ClientWorkerTest {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
String dataId = "a";
|
||||
@ -113,7 +165,7 @@ public class ClientWorkerTest {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
|
||||
@ -155,14 +207,13 @@ public class ClientWorkerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishConfig() throws NacosException {
|
||||
public void testPublishConfigSuccess() throws NacosException {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
ClientWorker.ConfigRpcTransportClient mockClient = Mockito.mock(ClientWorker.ConfigRpcTransportClient.class);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
@ -176,24 +227,244 @@ public class ClientWorkerTest {
|
||||
String casMd5 = "1111";
|
||||
|
||||
String type = "properties";
|
||||
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
|
||||
.thenReturn(new ConfigPublishResponse());
|
||||
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
|
||||
type);
|
||||
Assert.assertTrue(b);
|
||||
|
||||
boolean b = clientWorker
|
||||
.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5, type);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishConfigFail() throws NacosException {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
String content = "d";
|
||||
|
||||
String appName = "app";
|
||||
String tag = "tag";
|
||||
|
||||
String betaIps = "1.1.1.1";
|
||||
String casMd5 = "1111";
|
||||
|
||||
String type = "properties";
|
||||
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
|
||||
.thenReturn(ConfigPublishResponse.buildFailResponse(503, "over limit"));
|
||||
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
|
||||
type);
|
||||
Assert.assertFalse(b);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishConfigException() throws NacosException {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
String content = "d";
|
||||
|
||||
String appName = "app";
|
||||
String tag = "tag";
|
||||
|
||||
String betaIps = "1.1.1.1";
|
||||
String casMd5 = "1111";
|
||||
|
||||
String type = "properties";
|
||||
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())).thenThrow(new NacosException());
|
||||
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
|
||||
type);
|
||||
Assert.assertFalse(b);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveConfig() throws NacosException {
|
||||
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
|
||||
String tag = "tag";
|
||||
try {
|
||||
Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class), anyLong()))
|
||||
.thenThrow(new NacosException(503, "overlimit"));
|
||||
|
||||
clientWorker.removeConfig(dataId, group, tenant, tag);
|
||||
Assert.fail();
|
||||
} catch (NacosException e) {
|
||||
Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg());
|
||||
Assert.assertEquals(-401, e.getErrCode());
|
||||
Assert.assertEquals("overlimit", e.getErrMsg());
|
||||
Assert.assertEquals(503, e.getErrCode());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeConfigConfigSuccess() throws NacosException {
|
||||
|
||||
Properties prop = new Properties();
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(null, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
String content = "content" + System.currentTimeMillis();
|
||||
|
||||
Mockito.when(rpcClient.request(any(ConfigQueryRequest.class), anyLong()))
|
||||
.thenReturn(ConfigQueryResponse.buildSuccessResponse(content));
|
||||
|
||||
ConfigResponse configResponse = clientWorker.getServerConfig(dataId, group, tenant, 100, true);
|
||||
Assert.assertEquals(content, configResponse.getContent());
|
||||
localConfigInfoProcessorMockedStatic.verify(
|
||||
() -> LocalConfigInfoProcessor.saveSnapshot(eq(clientWorker.getAgentName()), eq(dataId), eq(group),
|
||||
eq(tenant), eq(content)), times(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleConfigChangeReqeust() throws Exception {
|
||||
|
||||
Properties prop = new Properties();
|
||||
String tenant = "c";
|
||||
|
||||
prop.put(NAMESPACE, tenant);
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(null, agent, nacosClientProperties);
|
||||
|
||||
AtomicReference<Map<String, CacheData>> cacheMapMocked = Mockito.mock(AtomicReference.class);
|
||||
Field cacheMap = ClientWorker.class.getDeclaredField("cacheMap");
|
||||
cacheMap.setAccessible(true);
|
||||
cacheMap.set(clientWorker, cacheMapMocked);
|
||||
Map<String, CacheData> cacheDataMapMocked = Mockito.mock(Map.class);
|
||||
Mockito.when(cacheMapMocked.get()).thenReturn(cacheDataMapMocked);
|
||||
CacheData cacheDataMocked = Mockito.mock(CacheData.class);
|
||||
AtomicBoolean atomicBoolean = Mockito.mock(AtomicBoolean.class);
|
||||
Mockito.when(cacheDataMocked.getReceiveNotifyChanged()).thenReturn(atomicBoolean);
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
Mockito.when(cacheDataMapMocked.get(GroupKey.getKeyTenant(dataId, group, tenant))).thenReturn(cacheDataMocked);
|
||||
ConfigChangeNotifyRequest configChangeNotifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
|
||||
((ClientWorker.ConfigRpcTransportClient) clientWorker.getAgent()).handleConfigChangeNotifyRequest(
|
||||
configChangeNotifyRequest, "testname");
|
||||
Mockito.verify(cacheDataMocked, times(1)).setConsistentWithServer(false);
|
||||
Mockito.verify(atomicBoolean, times(1)).set(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleClientMetricsReqeust() throws Exception {
|
||||
|
||||
Properties prop = new Properties();
|
||||
String tenant = "c";
|
||||
|
||||
prop.put(NAMESPACE, tenant);
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(null, agent, nacosClientProperties);
|
||||
|
||||
AtomicReference<Map<String, CacheData>> cacheMapMocked = Mockito.mock(AtomicReference.class);
|
||||
Field cacheMap = ClientWorker.class.getDeclaredField("cacheMap");
|
||||
cacheMap.setAccessible(true);
|
||||
cacheMap.set(clientWorker, cacheMapMocked);
|
||||
Map<String, CacheData> cacheDataMapMocked = Mockito.mock(Map.class);
|
||||
Mockito.when(cacheMapMocked.get()).thenReturn(cacheDataMapMocked);
|
||||
CacheData cacheDataMocked = Mockito.mock(CacheData.class);
|
||||
String content = "content1324567";
|
||||
String md5 = MD5Utils.md5Hex(content, "UTF-8");
|
||||
Mockito.when(cacheDataMocked.getContent()).thenReturn(content);
|
||||
Mockito.when(cacheDataMocked.getMd5()).thenReturn(md5);
|
||||
Field uuid1 = ClientWorker.class.getDeclaredField("uuid");
|
||||
uuid1.setAccessible(true);
|
||||
String uuid = (String) uuid1.get(clientWorker);
|
||||
String dataId = "a23456789";
|
||||
String group = "b";
|
||||
Mockito.when(cacheDataMapMocked.get(GroupKey.getKeyTenant(dataId, group, tenant))).thenReturn(cacheDataMocked);
|
||||
ClientConfigMetricRequest configMetricsRequest = new ClientConfigMetricRequest();
|
||||
|
||||
configMetricsRequest.setMetricsKeys(Arrays.asList(
|
||||
ClientConfigMetricRequest.MetricsKey.build(ClientConfigMetricRequest.MetricsKey.CACHE_DATA,
|
||||
GroupKey.getKeyTenant(dataId, group, tenant)),
|
||||
ClientConfigMetricRequest.MetricsKey.build(ClientConfigMetricRequest.MetricsKey.SNAPSHOT_DATA,
|
||||
GroupKey.getKeyTenant(dataId, group, tenant))));
|
||||
|
||||
ClientConfigMetricResponse metricResponse = ((ClientWorker.ConfigRpcTransportClient) clientWorker.getAgent()).handleClientMetricsRequest(
|
||||
configMetricsRequest);
|
||||
JsonNode jsonNode = JacksonUtils.toObj(metricResponse.getMetrics().get(uuid).toString());
|
||||
String metricValues = jsonNode.get("metricValues")
|
||||
.get(ClientConfigMetricRequest.MetricsKey.build(ClientConfigMetricRequest.MetricsKey.CACHE_DATA,
|
||||
GroupKey.getKeyTenant(dataId, group, tenant)).toString()).textValue();
|
||||
|
||||
int colonIndex = metricValues.toString().lastIndexOf(":");
|
||||
Assert.assertEquals(content, metricValues.substring(0, colonIndex));
|
||||
Assert.assertEquals(md5, metricValues.substring(colonIndex + 1, metricValues.toString().length()));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeConfigConfigNotFound() throws NacosException {
|
||||
|
||||
Properties prop = new Properties();
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(null, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
ConfigQueryResponse configQueryResponse = new ConfigQueryResponse();
|
||||
configQueryResponse.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config not found");
|
||||
Mockito.when(rpcClient.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);
|
||||
|
||||
ConfigResponse configResponse = clientWorker.getServerConfig(dataId, group, tenant, 100, true);
|
||||
Assert.assertEquals(null, configResponse.getContent());
|
||||
localConfigInfoProcessorMockedStatic.verify(
|
||||
() -> LocalConfigInfoProcessor.saveSnapshot(eq(clientWorker.getAgentName()), eq(dataId), eq(group),
|
||||
eq(tenant), eq(null)), times(1));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeConfigConfigConflict() throws NacosException {
|
||||
|
||||
Properties prop = new Properties();
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(null, agent, nacosClientProperties);
|
||||
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
ConfigQueryResponse configQueryResponse = new ConfigQueryResponse();
|
||||
configQueryResponse.setErrorInfo(ConfigQueryResponse.CONFIG_QUERY_CONFLICT, "config is being modified");
|
||||
Mockito.when(rpcClient.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);
|
||||
|
||||
try {
|
||||
clientWorker.getServerConfig(dataId, group, tenant, 100, false);
|
||||
clientWorker.getServerConfig(dataId, group, tenant, 100, true);
|
||||
Assert.fail();
|
||||
} catch (NacosException e) {
|
||||
Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg());
|
||||
Assert.assertEquals(-401, e.getErrCode());
|
||||
Assert.assertEquals(NacosException.CONFLICT, e.getErrCode());
|
||||
}
|
||||
}
|
||||
|
||||
@ -202,15 +473,10 @@ public class ClientWorkerTest {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
|
||||
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
String dataId = "a";
|
||||
String group = "b";
|
||||
String tenant = "c";
|
||||
String content = "d";
|
||||
clientWorker.shutdown();
|
||||
|
||||
Field agent1 = ClientWorker.class.getDeclaredField("agent");
|
||||
agent1.setAccessible(true);
|
||||
ConfigTransportClient o = (ConfigTransportClient) agent1.get(clientWorker);
|
||||
@ -220,6 +486,132 @@ public class ClientWorkerTest {
|
||||
Assert.assertEquals(null, clientWorker.getAgentName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteConfigListen() throws Exception {
|
||||
Properties prop = new Properties();
|
||||
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
|
||||
ServerListManager agent = Mockito.mock(ServerListManager.class);
|
||||
Mockito.when(agent.getName()).thenReturn("mocktest");
|
||||
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
|
||||
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
|
||||
clientWorker.shutdown();
|
||||
|
||||
List<CacheData> cacheDatas = new ArrayList<>();
|
||||
String group = "group123";
|
||||
String tenant = "tenant122324";
|
||||
//mock discards cache
|
||||
String dataIdDiscard = "dataIdDiscard" + System.currentTimeMillis();
|
||||
|
||||
CacheData cacheDataDiscard = discardCache(filter, agent.getName(), dataIdDiscard, group, tenant);
|
||||
cacheDatas.add(cacheDataDiscard);
|
||||
//mock use local cache
|
||||
String dataIdUseLocalCache = "dataIdUseLocalCache" + System.currentTimeMillis();
|
||||
CacheData cacheUseLocalCache = useLocalCache(filter, agent.getName(), dataIdUseLocalCache, group, tenant,
|
||||
"content" + System.currentTimeMillis());
|
||||
Assert.assertFalse(cacheUseLocalCache.isUseLocalConfigInfo());
|
||||
|
||||
cacheDatas.add(cacheUseLocalCache);
|
||||
|
||||
//mock normal cache
|
||||
String dataIdNormal = "dataIdNormal" + System.currentTimeMillis();
|
||||
CacheData cacheNormal = normalNotConsistentCache(filter, agent.getName(), dataIdNormal, group, tenant);
|
||||
AtomicReference<String> normalContent = new AtomicReference<>();
|
||||
cacheNormal.addListener(new Listener() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receiveConfigInfo(String configInfo) {
|
||||
System.out.println(configInfo);
|
||||
normalContent.set(configInfo);
|
||||
}
|
||||
});
|
||||
cacheDatas.add(cacheNormal);
|
||||
cacheNormal.setInitializing(false);
|
||||
Map<String, CacheData> cacheDataMapMocked = Mockito.mock(Map.class);
|
||||
Mockito.when(cacheDataMapMocked.get(GroupKey.getKeyTenant(dataIdNormal, group, tenant)))
|
||||
.thenReturn(cacheNormal);
|
||||
Mockito.when(cacheDataMapMocked.containsKey(GroupKey.getKeyTenant(dataIdNormal, group, tenant)))
|
||||
.thenReturn(true);
|
||||
|
||||
Mockito.when(cacheDataMapMocked.values()).thenReturn(cacheDatas);
|
||||
AtomicReference<Map<String, CacheData>> cacheMapMocked = Mockito.mock(AtomicReference.class);
|
||||
Mockito.when(cacheMapMocked.get()).thenReturn(cacheDataMapMocked);
|
||||
Field cacheMap = ClientWorker.class.getDeclaredField("cacheMap");
|
||||
cacheMap.setAccessible(true);
|
||||
cacheMap.set(clientWorker, cacheMapMocked);
|
||||
|
||||
//mock request
|
||||
ConfigChangeBatchListenResponse.ConfigContext configContext = new ConfigChangeBatchListenResponse.ConfigContext();
|
||||
configContext.setDataId(dataIdNormal);
|
||||
configContext.setGroup(group);
|
||||
configContext.setTenant(tenant);
|
||||
ConfigChangeBatchListenResponse response = new ConfigChangeBatchListenResponse();
|
||||
response.setChangedConfigs(Arrays.asList(configContext));
|
||||
|
||||
RpcClient rpcClientInner = Mockito.mock(RpcClient.class);
|
||||
Mockito.when(rpcClientInner.isWaitInitiated()).thenReturn(true, false);
|
||||
rpcClientFactoryMockedStatic.when(
|
||||
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
|
||||
any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
|
||||
// mock listen and remove listen request
|
||||
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong()))
|
||||
.thenReturn(response, response);
|
||||
// mock query changed config
|
||||
ConfigQueryResponse configQueryResponse = new ConfigQueryResponse();
|
||||
configQueryResponse.setContent("content" + System.currentTimeMillis());
|
||||
configQueryResponse.setContentType(ConfigType.JSON.getType());
|
||||
Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);
|
||||
(clientWorker.getAgent()).executeConfigListen();
|
||||
//assert
|
||||
//use local cache.
|
||||
Assert.assertTrue(cacheUseLocalCache.isUseLocalConfigInfo());
|
||||
//discard cache to be deleted.
|
||||
Assert.assertFalse(cacheMapMocked.get().containsKey(GroupKey.getKeyTenant(dataIdDiscard, group, tenant)));
|
||||
//normal cache listener be notified.
|
||||
Assert.assertEquals(configQueryResponse.getContent(), normalContent.get());
|
||||
|
||||
}
|
||||
|
||||
private CacheData discardCache(ConfigFilterChainManager filter, String envName, String dataId, String group,
|
||||
String tenant) {
|
||||
CacheData cacheData = new CacheData(filter, envName, dataId, group, tenant);
|
||||
cacheData.setDiscard(true);
|
||||
cacheData.setConsistentWithServer(false);
|
||||
File file = Mockito.mock(File.class);
|
||||
Mockito.when(file.exists()).thenReturn(false);
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant)).thenReturn(file);
|
||||
return cacheData;
|
||||
}
|
||||
|
||||
private CacheData normalNotConsistentCache(ConfigFilterChainManager filter, String envName, String dataId,
|
||||
String group, String tenant) throws NacosException {
|
||||
CacheData cacheData = new CacheData(filter, envName, dataId, group, tenant);
|
||||
cacheData.setDiscard(false);
|
||||
cacheData.setConsistentWithServer(false);
|
||||
File file = Mockito.mock(File.class);
|
||||
Mockito.when(file.exists()).thenReturn(false);
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant)).thenReturn(file);
|
||||
return cacheData;
|
||||
}
|
||||
|
||||
private CacheData useLocalCache(ConfigFilterChainManager filter, String envName, String dataId, String group,
|
||||
String tenant, String failOverContent) {
|
||||
CacheData cacheData = new CacheData(filter, envName, dataId, group, tenant);
|
||||
cacheData.setDiscard(true);
|
||||
File file = Mockito.mock(File.class);
|
||||
Mockito.when(file.exists()).thenReturn(true);
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant)).thenReturn(file);
|
||||
localConfigInfoProcessorMockedStatic.when(
|
||||
() -> LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant)).thenReturn(failOverContent);
|
||||
return cacheData;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsHealthServer() throws NacosException, NoSuchFieldException, IllegalAccessException {
|
||||
Properties prop = new Properties();
|
||||
|
@ -42,12 +42,15 @@ public class ConfigCommonConfigTest {
|
||||
public void setUp() throws Exception {
|
||||
environment = new MockEnvironment();
|
||||
EnvUtil.setEnvironment(environment);
|
||||
commonConfig = ConfigCommonConfig.getInstance();
|
||||
Constructor<ConfigCommonConfig> declaredConstructor = ConfigCommonConfig.class.getDeclaredConstructor();
|
||||
declaredConstructor.setAccessible(true);
|
||||
commonConfig = declaredConstructor.newInstance();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getMaxPushRetryTimes() {
|
||||
assertEquals(50, commonConfig.getMaxPushRetryTimes());
|
||||
Integer property = EnvUtil.getProperty("nacos.config.push.maxRetryTime", Integer.class, 50);
|
||||
assertEquals(property.intValue(), commonConfig.getMaxPushRetryTimes());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.core.control.TpsControl;
|
||||
import com.alibaba.nacos.core.control.TpsControlConfig;
|
||||
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
|
||||
import com.alibaba.nacos.plugin.control.Loggers;
|
||||
import com.alibaba.nacos.plugin.control.tps.TpsControlManager;
|
||||
import com.alibaba.nacos.plugin.control.tps.request.TpsCheckRequest;
|
||||
import com.alibaba.nacos.plugin.control.tps.response.TpsCheckResponse;
|
||||
|
||||
@ -48,6 +49,8 @@ public class NacosHttpTpsFilter implements Filter {
|
||||
|
||||
private ControllerMethodsCache controllerMethodsCache;
|
||||
|
||||
private TpsControlManager tpsControlManager;
|
||||
|
||||
public NacosHttpTpsFilter(ControllerMethodsCache controllerMethodsCache) {
|
||||
this.controllerMethodsCache = controllerMethodsCache;
|
||||
}
|
||||
@ -57,6 +60,12 @@ public class NacosHttpTpsFilter implements Filter {
|
||||
Filter.super.init(filterConfig);
|
||||
}
|
||||
|
||||
private void initTpsControlManager() {
|
||||
if (tpsControlManager == null) {
|
||||
tpsControlManager = ControlManagerCenter.getInstance().getTpsControlManager();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
|
||||
throws IOException, ServletException {
|
||||
@ -81,8 +90,8 @@ public class NacosHttpTpsFilter implements Filter {
|
||||
if (StringUtils.isBlank(httpTpsCheckRequest.getPointName())) {
|
||||
httpTpsCheckRequest.setPointName(pointName);
|
||||
}
|
||||
TpsCheckResponse checkResponse = ControlManagerCenter.getInstance().getTpsControlManager()
|
||||
.check(httpTpsCheckRequest);
|
||||
initTpsControlManager();
|
||||
TpsCheckResponse checkResponse = tpsControlManager.check(httpTpsCheckRequest);
|
||||
if (!checkResponse.isSuccess()) {
|
||||
AsyncContext asyncContext = httpServletRequest.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
@ -97,7 +106,7 @@ public class NacosHttpTpsFilter implements Filter {
|
||||
Loggers.TPS.warn("Fail to http tps check", throwable);
|
||||
}
|
||||
|
||||
filterChain.doFilter(httpServletRequest, servletResponse);
|
||||
filterChain.doFilter(httpServletRequest, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -43,12 +43,12 @@ import java.lang.reflect.Method;
|
||||
@Service
|
||||
public class TpsControlRequestFilter extends AbstractRequestFilter {
|
||||
|
||||
TpsControlManager tpsControlManager = ControlManagerCenter.getInstance().getTpsControlManager();
|
||||
private TpsControlManager tpsControlManager;
|
||||
|
||||
@Override
|
||||
protected Response filter(Request request, RequestMeta meta, Class handlerClazz) {
|
||||
|
||||
Method method = null;
|
||||
Method method;
|
||||
try {
|
||||
method = getHandleMethod(handlerClazz);
|
||||
} catch (NacosException e) {
|
||||
@ -73,6 +73,8 @@ public class TpsControlRequestFilter extends AbstractRequestFilter {
|
||||
tpsCheckRequest.setPointName(pointName);
|
||||
}
|
||||
|
||||
initTpsControlManager();
|
||||
|
||||
TpsCheckResponse check = tpsControlManager.check(tpsCheckRequest);
|
||||
|
||||
if (!check.isSuccess()) {
|
||||
@ -83,20 +85,24 @@ public class TpsControlRequestFilter extends AbstractRequestFilter {
|
||||
"Tps Flow restricted:" + check.getMessage());
|
||||
return response;
|
||||
} catch (Exception e) {
|
||||
com.alibaba.nacos.plugin.control.Loggers.TPS
|
||||
.warn("Tps check fail , request: {},exception:{}", request.getClass().getSimpleName(),
|
||||
e);
|
||||
com.alibaba.nacos.plugin.control.Loggers.TPS.warn("Tps check fail , request: {},exception:{}",
|
||||
request.getClass().getSimpleName(), e);
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
com.alibaba.nacos.plugin.control.Loggers.TPS
|
||||
.warn("Tps check exception , request: {},exception:{}", request.getClass().getSimpleName(),
|
||||
throwable);
|
||||
com.alibaba.nacos.plugin.control.Loggers.TPS.warn("Tps check exception , request: {},exception:{}",
|
||||
request.getClass().getSimpleName(), throwable);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void initTpsControlManager() {
|
||||
if (tpsControlManager == null) {
|
||||
tpsControlManager = ControlManagerCenter.getInstance().getTpsControlManager();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,17 +46,6 @@ public abstract class AbstractRequestFilter {
|
||||
requestFilters.registerFilter(this);
|
||||
}
|
||||
|
||||
protected Class getResponseClazz(Class handlerClazz) throws NacosException {
|
||||
ParameterizedType parameterizedType = (ParameterizedType) handlerClazz.getGenericSuperclass();
|
||||
try {
|
||||
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
|
||||
return Class.forName(actualTypeArguments[1].getTypeName());
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new NacosException(NacosException.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected Method getHandleMethod(Class handlerClazz) throws NacosException {
|
||||
try {
|
||||
Method method = handlerClazz.getMethod("handle", Request.class, RequestMeta.class);
|
||||
|
@ -0,0 +1,206 @@
|
||||
/*
|
||||
* Copyright 1999-2021 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.core.control.http;
|
||||
|
||||
import com.alibaba.nacos.api.remote.request.Request;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.core.code.ControllerMethodsCache;
|
||||
import com.alibaba.nacos.core.remote.HealthCheckRequestHandler;
|
||||
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
|
||||
import com.alibaba.nacos.plugin.control.tps.TpsControlManager;
|
||||
import com.alibaba.nacos.plugin.control.tps.request.TpsCheckRequest;
|
||||
import com.alibaba.nacos.plugin.control.tps.response.TpsCheckResponse;
|
||||
import org.apache.catalina.core.AsyncContextImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.springframework.mock.web.MockFilterChain;
|
||||
import org.springframework.mock.web.MockHttpServletRequest;
|
||||
import org.springframework.mock.web.MockHttpServletResponse;
|
||||
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class NacosHttpTpsFilterTest {
|
||||
|
||||
@Mock
|
||||
private ControlManagerCenter controlManagerCenter;
|
||||
|
||||
@Mock
|
||||
private TpsControlManager tpsControlManager;
|
||||
|
||||
NacosHttpTpsFilter nacosHttpTpsFilter;
|
||||
|
||||
MockedStatic<ControlManagerCenter> controlManagerCenterMockedStatic;
|
||||
|
||||
@Mock
|
||||
ControllerMethodsCache controllerMethodsCache;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
controlManagerCenterMockedStatic = Mockito.mockStatic(ControlManagerCenter.class);
|
||||
controlManagerCenterMockedStatic.when(() -> ControlManagerCenter.getInstance())
|
||||
.thenReturn(controlManagerCenter);
|
||||
when(controlManagerCenter.getTpsControlManager()).thenReturn(tpsControlManager);
|
||||
nacosHttpTpsFilter = new NacosHttpTpsFilter(controllerMethodsCache);
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
controlManagerCenterMockedStatic.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check passed ,response is null.
|
||||
*/
|
||||
@Test
|
||||
public void testPass() throws Exception {
|
||||
HttpTpsCheckRequestParserRegistry.register(new HttpTpsCheckRequestParser() {
|
||||
@Override
|
||||
public TpsCheckRequest parse(HttpServletRequest httpServletRequest) {
|
||||
return new TpsCheckRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPointName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
});
|
||||
|
||||
TpsCheckResponse tpsCheckResponse = new TpsCheckResponse(true, 200, "success");
|
||||
when(tpsControlManager.check(any(TpsCheckRequest.class))).thenReturn(tpsCheckResponse);
|
||||
|
||||
//mock http tps control method
|
||||
Method method = HealthCheckRequestHandler.class.getMethod("handle", Request.class, RequestMeta.class);
|
||||
|
||||
MockHttpServletRequest httpServletRequest = Mockito.mock(MockHttpServletRequest.class);
|
||||
MockHttpServletResponse httpServletResponse = Mockito.mock(MockHttpServletResponse.class);
|
||||
MockFilterChain filterChain = Mockito.mock(MockFilterChain.class);
|
||||
when(controllerMethodsCache.getMethod(eq(httpServletRequest))).thenReturn(method);
|
||||
Mockito.doNothing().when(filterChain).doFilter(any(ServletRequest.class), any(ServletResponse.class));
|
||||
//execute test.
|
||||
nacosHttpTpsFilter.doFilter(httpServletRequest, httpServletResponse, filterChain);
|
||||
|
||||
//verify
|
||||
Mockito.verify(filterChain, Mockito.times(1)).doFilter(httpServletRequest, httpServletResponse);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check rejected ,response is not null.
|
||||
*/
|
||||
@Test
|
||||
public void testRejected() throws Exception {
|
||||
HttpTpsCheckRequestParserRegistry.register(new HttpTpsCheckRequestParser() {
|
||||
@Override
|
||||
public TpsCheckRequest parse(HttpServletRequest httpServletRequest) {
|
||||
return new TpsCheckRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPointName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
});
|
||||
|
||||
TpsCheckResponse tpsCheckResponse = new TpsCheckResponse(false, 5031, "rejected");
|
||||
when(tpsControlManager.check(any(TpsCheckRequest.class))).thenReturn(tpsCheckResponse);
|
||||
|
||||
//mock http tps control method
|
||||
Method method = HealthCheckRequestHandler.class.getMethod("handle", Request.class, RequestMeta.class);
|
||||
|
||||
MockHttpServletRequest httpServletRequest = Mockito.mock(MockHttpServletRequest.class);
|
||||
MockHttpServletResponse httpServletResponse = Mockito.mock(MockHttpServletResponse.class);
|
||||
MockFilterChain filterChain = Mockito.mock(MockFilterChain.class);
|
||||
when(controllerMethodsCache.getMethod(eq(httpServletRequest))).thenReturn(method);
|
||||
AsyncContextImpl asyncContext = Mockito.mock(AsyncContextImpl.class);
|
||||
Mockito.when(httpServletRequest.startAsync()).thenReturn(asyncContext);
|
||||
//execute test.
|
||||
nacosHttpTpsFilter.doFilter(httpServletRequest, httpServletResponse, filterChain);
|
||||
|
||||
//verify
|
||||
Mockito.verify(filterChain, Mockito.times(0)).doFilter(any(), any());
|
||||
Thread.sleep(1100L);
|
||||
Mockito.verify(httpServletResponse, Mockito.times(1)).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check exception ,return null skip.
|
||||
*/
|
||||
@Test
|
||||
public void testTpsCheckException() throws Exception {
|
||||
HttpTpsCheckRequestParserRegistry.register(new HttpTpsCheckRequestParser() {
|
||||
@Override
|
||||
public TpsCheckRequest parse(HttpServletRequest httpServletRequest) {
|
||||
return new TpsCheckRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPointName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
});
|
||||
|
||||
when(tpsControlManager.check(any(TpsCheckRequest.class))).thenThrow(new RuntimeException("324565"));
|
||||
|
||||
//mock http tps control method
|
||||
Method method = HealthCheckRequestHandler.class.getMethod("handle", Request.class, RequestMeta.class);
|
||||
|
||||
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
|
||||
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
|
||||
MockFilterChain filterChain = Mockito.mock(MockFilterChain.class);
|
||||
|
||||
when(controllerMethodsCache.getMethod(eq(httpServletRequest))).thenReturn(method);
|
||||
//execute test.
|
||||
nacosHttpTpsFilter.doFilter(httpServletRequest, httpServletResponse, filterChain);
|
||||
|
||||
//verify
|
||||
Mockito.verify(filterChain, Mockito.times(1)).doFilter(httpServletRequest, httpServletResponse);
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright 1999-2021 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.core.control.remote;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
|
||||
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
|
||||
import com.alibaba.nacos.api.remote.request.Request;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.api.remote.response.Response;
|
||||
import com.alibaba.nacos.core.remote.HealthCheckRequestHandler;
|
||||
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
|
||||
import com.alibaba.nacos.plugin.control.tps.TpsControlManager;
|
||||
import com.alibaba.nacos.plugin.control.tps.request.TpsCheckRequest;
|
||||
import com.alibaba.nacos.plugin.control.tps.response.TpsCheckResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TpsControlRequestFilterTest {
|
||||
|
||||
@Mock
|
||||
private ControlManagerCenter controlManagerCenter;
|
||||
|
||||
@Mock
|
||||
private TpsControlManager tpsControlManager;
|
||||
|
||||
TpsControlRequestFilter tpsControlRequestFilter;
|
||||
|
||||
MockedStatic<ControlManagerCenter> controlManagerCenterMockedStatic;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
tpsControlRequestFilter = new TpsControlRequestFilter();
|
||||
controlManagerCenterMockedStatic = Mockito.mockStatic(ControlManagerCenter.class);
|
||||
controlManagerCenterMockedStatic.when(() -> ControlManagerCenter.getInstance())
|
||||
.thenReturn(controlManagerCenter);
|
||||
Mockito.when(controlManagerCenter.getTpsControlManager()).thenReturn(tpsControlManager);
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
controlManagerCenterMockedStatic.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check passed ,response is null.
|
||||
*/
|
||||
@Test
|
||||
public void testPass() {
|
||||
RemoteTpsCheckRequestParserRegistry.register(new RemoteTpsCheckRequestParser() {
|
||||
@Override
|
||||
public TpsCheckRequest parse(Request request, RequestMeta meta) {
|
||||
return new TpsCheckRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPointName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "HealthCheck";
|
||||
}
|
||||
});
|
||||
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
|
||||
RequestMeta requestMeta = new RequestMeta();
|
||||
TpsCheckResponse tpsCheckResponse = new TpsCheckResponse(true, 200, "success");
|
||||
Mockito.when(tpsControlManager.check(any(TpsCheckRequest.class))).thenReturn(tpsCheckResponse);
|
||||
Response filterResponse = tpsControlRequestFilter.filter(healthCheckRequest, requestMeta,
|
||||
HealthCheckRequestHandler.class);
|
||||
Assert.assertNull(filterResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check rejected ,response is not null.
|
||||
*/
|
||||
@Test
|
||||
public void testRejected() {
|
||||
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
|
||||
RequestMeta requestMeta = new RequestMeta();
|
||||
TpsCheckResponse tpsCheckResponse = new TpsCheckResponse(false, 5031, "rejected");
|
||||
Mockito.when(tpsControlManager.check(any(TpsCheckRequest.class))).thenReturn(tpsCheckResponse);
|
||||
Response filterResponse = tpsControlRequestFilter.filter(healthCheckRequest, requestMeta,
|
||||
HealthCheckRequestHandler.class);
|
||||
Assert.assertNotNull(filterResponse);
|
||||
Assert.assertEquals(NacosException.OVER_THRESHOLD, filterResponse.getErrorCode());
|
||||
Assert.assertEquals("Tps Flow restricted:" + tpsCheckResponse.getMessage(), filterResponse.getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
* test tps check exception ,return null skip.
|
||||
*/
|
||||
@Test
|
||||
public void testTpsCheckException() {
|
||||
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
|
||||
RequestMeta requestMeta = new RequestMeta();
|
||||
Mockito.when(tpsControlManager.check(any(TpsCheckRequest.class))).thenThrow(new NacosRuntimeException(12345));
|
||||
Response filterResponse = tpsControlRequestFilter.filter(healthCheckRequest, requestMeta,
|
||||
HealthCheckRequestHandler.class);
|
||||
Assert.assertNull(filterResponse);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user