config module coverage (#11592)

* coverage

* coverage

* coverage up tp 80% ,fix checkstyle pmd

* coverage up tp 80% ,fix checkstyle pmd

* fix checkstyle

* optmize grpc connection push

* testcase

* grp connection coverage
This commit is contained in:
nov.lzf 2024-01-08 14:25:15 +08:00 committed by GitHub
parent dbb1b65fcb
commit c70a86820d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2745 additions and 515 deletions

View File

@ -332,8 +332,8 @@ public class ConfigChangeAspect {
retVal = pjp.proceed(args);
}
} catch (Throwable e) {
LOGGER.warn("config change plugin proceed failed {}", e.getMessage());
configChangeResponse.setMsg("config change plugin proceed failed " + e.getMessage());
LOGGER.warn("config change join point failed {}", e.getMessage());
configChangeResponse.setMsg("config change join point fail" + e.getMessage());
retVal = wrapErrorResp(configChangeResponse);
}

View File

@ -91,39 +91,16 @@ public class ClientMetricsController {
tenant);
Map<String, Object> responseMap = new HashMap<>(3);
Collection<Member> members = serverMemberManager.allMembers();
final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientBeanHolder
.getNacosAsyncRestTemplate(Loggers.CLUSTER);
final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientBeanHolder.getNacosAsyncRestTemplate(
Loggers.CLUSTER);
CountDownLatch latch = new CountDownLatch(members.size());
for (Member member : members) {
String url = HttpUtils
.buildUrl(false, member.getAddress(), EnvUtil.getContextPath(), Constants.METRICS_CONTROLLER_PATH,
"current");
String url = HttpUtils.buildUrl(false, member.getAddress(), EnvUtil.getContextPath(),
Constants.METRICS_CONTROLLER_PATH, "current");
Query query = Query.newInstance().addParam("ip", ip).addParam("dataId", dataId).addParam("group", group)
.addParam("tenant", tenant);
nacosAsyncRestTemplate.get(url, Header.EMPTY, query, new GenericType<Map>() {
}.getType(), new Callback<Map>() {
@Override
public void onReceive(RestResult<Map> result) {
if (result.ok()) {
responseMap.putAll(result.getData());
}
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.CORE
.error("Get config metrics error from member address={}, ip={},dataId={},group={},tenant={},error={}",
member.getAddress(), ip, dataId, group, tenant, throwable);
latch.countDown();
}
@Override
public void onCancel() {
latch.countDown();
}
});
}.getType(), new ClusterMetricsCallBack(responseMap, latch, dataId, group, tenant, ip, member));
}
try {
latch.await(3L, TimeUnit.SECONDS);
@ -134,6 +111,54 @@ public class ClientMetricsController {
return ResponseEntity.ok().body(responseMap);
}
static class ClusterMetricsCallBack implements Callback<Map> {
Map<String, Object> responseMap;
CountDownLatch latch;
String dataId;
String group;
String tenant;
String ip;
Member member;
public ClusterMetricsCallBack(Map<String, Object> responseMap, CountDownLatch latch, String dataId,
String group, String tenant, String ip, Member member) {
this.responseMap = responseMap;
this.latch = latch;
this.dataId = dataId;
this.group = group;
this.tenant = tenant;
this.member = member;
this.ip = ip;
}
@Override
public void onReceive(RestResult<Map> result) {
if (result.ok()) {
responseMap.putAll(result.getData());
}
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.CORE.error(
"Get config metrics error from member address={}, ip={},dataId={},group={},tenant={},error={}",
member.getAddress(), ip, dataId, group, tenant, throwable);
latch.countDown();
}
@Override
public void onCancel() {
latch.countDown();
}
}
/**
* Get client config listener lists of subscriber in local machine.
@ -150,18 +175,18 @@ public class ClientMetricsController {
try {
ClientConfigMetricRequest clientMetrics = new ClientConfigMetricRequest();
if (StringUtils.isNotBlank(dataId)) {
clientMetrics.getMetricsKeys().add(ClientConfigMetricRequest.MetricsKey
.build(CACHE_DATA, GroupKey2.getKey(dataId, group, tenant)));
clientMetrics.getMetricsKeys().add(ClientConfigMetricRequest.MetricsKey
.build(SNAPSHOT_DATA, GroupKey2.getKey(dataId, group, tenant)));
clientMetrics.getMetricsKeys().add(ClientConfigMetricRequest.MetricsKey.build(CACHE_DATA,
GroupKey2.getKey(dataId, group, tenant)));
clientMetrics.getMetricsKeys().add(ClientConfigMetricRequest.MetricsKey.build(SNAPSHOT_DATA,
GroupKey2.getKey(dataId, group, tenant)));
}
ClientConfigMetricResponse request1 = (ClientConfigMetricResponse) connectionByIp
.request(clientMetrics, 1000L);
ClientConfigMetricResponse request1 = (ClientConfigMetricResponse) connectionByIp.request(clientMetrics,
1000L);
metrics.putAll(request1.getMetrics());
} catch (Exception e) {
Loggers.CORE.error("Get config metrics error from client ip={},dataId={},group={},tenant={},error={}", ip, dataId,
group, tenant, e);
Loggers.CORE.error("Get config metrics error from client ip={},dataId={},group={},tenant={},error={}",
ip, dataId, group, tenant, e);
}
}
return metrics;

View File

@ -65,7 +65,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
}
@PostConstruct
private void registerTpsPoint() {
void registerTpsPoint() {
tpsControlManager.registerTpsPoint(POINT_CONFIG_PUSH);
tpsControlManager.registerTpsPoint(POINT_CONFIG_PUSH_SUCCESS);
tpsControlManager.registerTpsPoint(POINT_CONFIG_PUSH_FAIL);
@ -101,12 +101,9 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
}
ConnectionMeta metaInfo = connection.getMetaInfo();
//beta ips check.
String clientIp = metaInfo.getClientIp();
String clientTag = metaInfo.getTag();
if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
continue;
}
//tag check
if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
continue;
@ -116,7 +113,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest,
ConfigCommonConfig.getInstance().getMaxPushRetryTimes(), client, clientIp, metaInfo.getAppName());
push(rpcPushRetryTask);
push(rpcPushRetryTask, connectionManager);
notifyClientCount++;
}
Loggers.REMOTE_PUSH.info("push [{}] clients, groupKey=[{}]", notifyClientCount, groupKey);
@ -169,59 +166,96 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes;
}
public int getTryTimes() {
return tryTimes;
}
public ConfigChangeNotifyRequest getNotifyRequest() {
return notifyRequest;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public String getClientIp() {
return clientIp;
}
public String getAppName() {
return appName;
}
public String getConnectionId() {
return connectionId;
}
@Override
public void run() {
tryTimes++;
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH);
if (!tpsControlManager.check(tpsCheckRequest).isSuccess()) {
push(this);
push(this, connectionManager);
} else {
rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
@Override
public void onSuccess() {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_SUCCESS);
tpsControlManager.check(tpsCheckRequest);
}
@Override
public void onFail(Throwable e) {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_FAIL);
tpsControlManager.check(tpsCheckRequest);
Loggers.REMOTE_PUSH
.warn("Push fail, dataId={}, group={}, tenant={}, clientId={}", notifyRequest.getDataId(),
notifyRequest.getGroup(), notifyRequest.getTenant(), connectionId, e);
push(RpcPushTask.this);
}
}, ConfigExecutor.getClientConfigNotifierServiceExecutor());
rpcPushService.pushWithCallback(connectionId, notifyRequest,
new RpcPushCallback(this, tpsControlManager, connectionManager),
ConfigExecutor.getClientConfigNotifierServiceExecutor());
}
}
}
private void push(RpcPushTask retryTask) {
ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
static class RpcPushCallback extends AbstractPushCallBack {
RpcPushTask rpcPushTask;
TpsControlManager tpsControlManager;
ConnectionManager connectionManager;
public RpcPushCallback(RpcPushTask rpcPushTask, TpsControlManager tpsControlManager,
ConnectionManager connectionManager) {
super(3000L);
this.rpcPushTask = rpcPushTask;
this.tpsControlManager = tpsControlManager;
this.connectionManager = connectionManager;
}
@Override
public void onSuccess() {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_SUCCESS);
tpsControlManager.check(tpsCheckRequest);
}
@Override
public void onFail(Throwable e) {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_FAIL);
tpsControlManager.check(tpsCheckRequest);
Loggers.REMOTE_PUSH.warn("Push fail, dataId={}, group={}, tenant={}, clientId={}",
rpcPushTask.getNotifyRequest().getDataId(), rpcPushTask.getNotifyRequest().getGroup(),
rpcPushTask.getNotifyRequest().getTenant(), rpcPushTask.getConnectionId(), e);
push(rpcPushTask, connectionManager);
}
}
private static void push(RpcPushTask retryTask, ConnectionManager connectionManager) {
ConfigChangeNotifyRequest notifyRequest = retryTask.getNotifyRequest();
if (retryTask.isOverTimes()) {
Loggers.REMOTE_PUSH
.warn("push callback retry fail over times. dataId={},group={},tenant={},clientId={}, will unregister client.",
notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(),
retryTask.connectionId);
connectionManager.unregister(retryTask.connectionId);
} else if (connectionManager.getConnection(retryTask.connectionId) != null) {
Loggers.REMOTE_PUSH.warn(
"push callback retry fail over times. dataId={},group={},tenant={},clientId={}, will unregister client.",
notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(),
retryTask.getConnectionId());
connectionManager.unregister(retryTask.getConnectionId());
} else if (connectionManager.getConnection(retryTask.getConnectionId()) != null) {
// first time:delay 0s; second time:delay 2s; third time:delay 4s
ConfigExecutor.getClientConfigNotifierServiceExecutor()
.schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
ConfigExecutor.scheduleClientConfigNotifier(retryTask, retryTask.getTryTimes() * 2, TimeUnit.SECONDS);
} else {
// client is already offline, ignore task.
}
}
}

View File

@ -30,30 +30,6 @@ import java.util.concurrent.ConcurrentMap;
*/
public class ClientTrackService {
/**
* Track client's md5 value.
*/
public static void trackClientMd5(String ip, Map<String, String> clientMd5Map) {
ClientRecord record = getClientRecord(ip);
record.setLastTime(System.currentTimeMillis());
record.getGroupKey2md5Map().putAll(clientMd5Map);
}
/**
* TrackClientMd5.
*
* @param ip ip string value.
* @param clientMd5Map clientMd5Map.
* @param clientLastPollingTsMap clientLastPollingTsMap.
*/
public static void trackClientMd5(String ip, Map<String, String> clientMd5Map,
Map<String, Long> clientLastPollingTsMap) {
ClientRecord record = getClientRecord(ip);
record.setLastTime(System.currentTimeMillis());
record.getGroupKey2md5Map().putAll(clientMd5Map);
record.getGroupKey2pollingTsMap().putAll(clientLastPollingTsMap);
}
/**
* Put the specified value(ip/groupKey/clientMd5) into clientRecords Map.
*
@ -109,26 +85,7 @@ public class ClientTrackService {
return status;
}
/**
* Ip -> SubscriberStatus.
*/
public static Map<String, SubscriberStatus> listSubsByGroup(String groupKey) {
Map<String, SubscriberStatus> subs = new HashMap<>(100);
for (ClientRecord clientRec : clientRecords.values()) {
String clientMd5 = clientRec.getGroupKey2md5Map().get(groupKey);
Long lastPollingTs = clientRec.getGroupKey2pollingTsMap().get(groupKey);
if (null != clientMd5 && null != lastPollingTs) {
Boolean isUpdate = ConfigCacheService.isUptodate(groupKey, clientMd5);
subs.put(clientRec.getIp(), new SubscriberStatus(groupKey, isUpdate, clientMd5, lastPollingTs));
}
}
return subs;
}
/**
* Specify subscriber's ip and look up whether data is latest.
* groupKey -> isUptodate.
@ -144,23 +101,6 @@ public class ClientTrackService {
return result;
}
/**
* Specify groupKey and look up whether subscriber and data is latest.
* IP -> isUptodate.
*/
public static Map<String, Boolean> listSubscriberByGroup(String groupKey) {
Map<String, Boolean> subs = new HashMap<>(100);
for (ClientRecord clientRec : clientRecords.values()) {
String clientMd5 = clientRec.getGroupKey2md5Map().get(groupKey);
if (null != clientMd5) {
Boolean isuptodate = ConfigCacheService.isUptodate(groupKey, clientMd5);
subs.put(clientRec.getIp(), isuptodate);
}
}
return subs;
}
/**
* Get and return the record of specified client ip.
*

View File

@ -28,6 +28,7 @@ import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskServiceFactory;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.google.common.collect.Lists;
import java.io.IOException;
@ -144,7 +145,7 @@ public class ConfigCacheService {
|| errMsg.contains(DISK_QUOTA_EN)) {
// Protect from disk full.
FATAL_LOG.error("Local Disk Full,Exit", ioe);
System.exit(0);
EnvUtil.systemExit();
}
}
return false;
@ -328,75 +329,6 @@ public class ConfigCacheService {
}
}
/**
* 保存配置文件并缓存md5.
*/
public static boolean dumpChange(String dataId, String group, String tenant, String content, long lastModifiedTs,
String encryptedDataKey) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
makeSure(groupKey, encryptedDataKey);
final int lockResult = tryWriteLock(groupKey);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
//check timestamp
boolean lastModifiedOutdated = lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey);
if (lastModifiedOutdated) {
DUMP_LOG.warn("[dump-change-ignore] timestamp is outdated,groupKey={}", groupKey);
return true;
}
boolean newLastModified = lastModifiedTs > ConfigCacheService.getLastModifiedTs(groupKey);
String md5 = MD5Utils.md5Hex(content, ENCODE_UTF8);
//check md5 & update local disk cache.
String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
boolean md5Changed = !md5.equals(localContentMd5);
if (md5Changed) {
if (!PropertyUtil.isDirectRead()) {
DUMP_LOG.info("[dump-change] md5 changed, save to disk cache ,groupKey={}, md5={}", groupKey, md5);
ConfigDiskServiceFactory.getInstance().saveToDisk(dataId, group, tenant, content);
} else {
//ignore to save disk cache in direct model
}
} else {
DUMP_LOG.warn("[dump-change-ignore] ignore to save to disk cache. md5 consistent,groupKey={}, md5={}",
groupKey, md5);
}
//check md5 and timestamp & update local jvm cache.
if (md5Changed) {
DUMP_LOG.info(
"[dump-change] md5 changed, update md5 and timestamp in jvm cache ,groupKey={},newMd5={},oldMd5={},lastModifiedTs={}",
groupKey, md5, localContentMd5, lastModifiedTs);
updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
} else if (newLastModified) {
DUMP_LOG.info(
"[dump-change] md5 consistent ,timestamp changed, update timestamp only in jvm cache ,groupKey={}, md5={},lastModifiedTs={}",
groupKey, md5, lastModifiedTs);
updateTimeStamp(groupKey, lastModifiedTs, encryptedDataKey);
} else {
DUMP_LOG.warn(
"[dump-change-ignore] ignore to save to jvm cache. md5 consistent and no new timestamp changed.groupKey={}",
groupKey);
}
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
return false;
} finally {
releaseWriteLock(groupKey);
}
}
/**
* Delete config file, and delete cache.
*
@ -654,12 +586,6 @@ public class ConfigCacheService {
return (null != item && item.getConfigCacheBeta() != null) ? item.getConfigCacheBeta().getLastModifiedTs() : 0L;
}
public static long getBatchLastModifiedTs(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item && item.getConfigCacheBatch() != null) ? item.getConfigCacheBatch().getLastModifiedTs()
: 0L;
}
public static long getTagLastModifiedTs(String groupKey, String tag) {
CacheItem item = CACHE.get(groupKey);
if (item.getConfigCacheTags() == null || !item.getConfigCacheTags().containsKey(tag)) {

View File

@ -16,10 +16,10 @@
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.model.SampleResult;
@ -41,12 +41,11 @@ import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
@ -63,8 +62,6 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.PULL_LOG;
@Service
public class LongPollingService {
private static final int FIXED_POLLING_INTERVAL_MS = 10000;
private static final int SAMPLE_PERIOD = 100;
private static final int SAMPLE_TIMES = 3;
@ -73,28 +70,6 @@ public class LongPollingService {
private Map<String, Long> retainIps = new ConcurrentHashMap<>();
private static boolean isFixedPolling() {
return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
}
private static int getFixedPollingInterval() {
return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);
}
public boolean isClientLongPolling(String clientIp) {
return getClientPollingRecord(clientIp) != null;
}
public Map<String, String> getClientSubConfigInfo(String clientIp) {
ClientLongPolling record = getClientPollingRecord(clientIp);
if (record == null) {
return Collections.<String, String>emptyMap();
}
return record.clientMd5Map;
}
public SampleResult getSubscribleInfo(String dataId, String group, String tenant) {
String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);
SampleResult sampleResult = new SampleResult();
@ -145,33 +120,6 @@ public class LongPollingService {
return mergeResult;
}
/**
* Collect application subscribe configinfos.
*
* @return configinfos results.
*/
public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {
if (allSubs == null || allSubs.isEmpty()) {
return null;
}
HashMap<String, Set<String>> app2Groupkeys = new HashMap<>(50);
for (ClientLongPolling clientLongPolling : allSubs) {
if (StringUtils.isEmpty(clientLongPolling.appName) || "unknown"
.equalsIgnoreCase(clientLongPolling.appName)) {
continue;
}
Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPolling.appName);
Set<String> clientSubscribeConfigs = clientLongPolling.clientMd5Map.keySet();
if (appSubscribeConfigs == null) {
appSubscribeConfigs = new HashSet<>(clientSubscribeConfigs.size());
}
appSubscribeConfigs.addAll(clientSubscribeConfigs);
app2Groupkeys.put(clientLongPolling.appName, appSubscribeConfigs);
}
return app2Groupkeys;
}
public SampleResult getCollectSubscribleInfo(String dataId, String group, String tenant) {
List<SampleResult> sampleResultLst = new ArrayList<>(50);
for (int i = 0; i < SAMPLE_TIMES; i++) {
@ -213,22 +161,6 @@ public class LongPollingService {
return sampleResult;
}
private ClientLongPolling getClientPollingRecord(String clientIp) {
if (allSubs == null) {
return null;
}
for (ClientLongPolling clientLongPolling : allSubs) {
HttpServletRequest request = (HttpServletRequest) clientLongPolling.asyncContext.getRequest();
if (clientIp.equals(RequestUtil.getRemoteIp(request))) {
return clientLongPolling;
}
}
return null;
}
/**
* Add LongPollingClient.
*
@ -240,47 +172,44 @@ public class LongPollingService {
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = -1L;
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
timeout = Math.max(10000, Long.parseLong(str) - delayTime);
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
ConnectionCheckResponse connectionCheckResponse = checkLimit(req);
if (!connectionCheckResponse.isSuccess()) {
generate503Response(req, rsp, connectionCheckResponse.getMessage());
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
// Must be called by http thread, or send response.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
String ip = RequestUtil.getRemoteIp(req);
ConnectionCheckResponse connectionCheckResponse = checkLimit(req);
if (!connectionCheckResponse.isSuccess()) {
RpcScheduledExecutor.CONTROL_SCHEDULER.schedule(
() -> generate503Response(asyncContext, rsp, connectionCheckResponse.getMessage()),
1000L + new Random().nextInt(2000), TimeUnit.MILLISECONDS);
return;
}
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
int minLongPoolingTimeout = SwitchService.getSwitchInteger("MIN_LONG_POOLING_TIMEOUT", 10000);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
String requestLongPollingTimeOut = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
long timeout = Math.max(minLongPoolingTimeout, Long.parseLong(requestLongPollingTimeOut) - delayTime);
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
@ -312,14 +241,11 @@ public class LongPollingService {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
@Override
@ -347,10 +273,6 @@ public class LongPollingService {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
@ -359,11 +281,10 @@ public class LongPollingService {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Collections.singletonList(groupKey));
}
}
@ -404,7 +325,7 @@ public class LongPollingService {
}
}
class ClientLongPolling implements Runnable {
public class ClientLongPolling implements Runnable {
@Override
public void run() {
@ -416,26 +337,12 @@ public class LongPollingService {
boolean removeFlag = allSubs.remove(ClientLongPolling.this);
if (removeFlag) {
if (isFixedPolling()) {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
} else {
LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail.");
}
@ -458,13 +365,13 @@ public class LongPollingService {
}
void generateResponse(List<String> changedGroups) {
if (null == changedGroups) {
// Tell web container to send http response.
asyncContext.complete();
return;
}
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
@ -539,7 +446,7 @@ public class LongPollingService {
}
}
void generate503Response(HttpServletRequest request, HttpServletResponse response, String message) {
void generate503Response(AsyncContext asyncContext, HttpServletResponse response, String message) {
try {
@ -549,6 +456,7 @@ public class LongPollingService {
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
response.getWriter().println(message);
asyncContext.complete();
} catch (Exception ex) {
PULL_LOG.error(ex.toString(), ex);
}
@ -558,10 +466,6 @@ public class LongPollingService {
return retainIps;
}
public void setRetainIps(Map<String, Long> retainIps) {
this.retainIps = retainIps;
}
public int getSubscriberCount() {
return allSubs.size();
}

View File

@ -39,28 +39,10 @@ public class SwitchService {
public static final String SWITCH_META_DATA_ID = "com.alibaba.nacos.meta.switch";
public static final String FIXED_POLLING = "isFixedPolling";
public static final String FIXED_POLLING_INTERVAL = "fixedPollingInertval";
public static final String FIXED_DELAY_TIME = "fixedDelayTime";
public static final String DISABLE_APP_COLLECTOR = "disableAppCollector";
private static volatile Map<String, String> switches = new HashMap<>();
public static boolean getSwitchBoolean(String key, boolean defaultValue) {
boolean rtn;
try {
String value = switches.get(key);
rtn = value != null ? Boolean.parseBoolean(value) : defaultValue;
} catch (Exception e) {
rtn = defaultValue;
LogUtil.FATAL_LOG.error("corrupt switch value {}={}", key, switches.get(key));
}
return rtn;
}
public static int getSwitchInteger(String key, int defaultValue) {
int rtn;
try {
@ -73,11 +55,6 @@ public class SwitchService {
return rtn;
}
public static String getSwitchString(String key, String defaultValue) {
String value = switches.get(key);
return StringUtils.isBlank(value) ? defaultValue : value;
}
/**
* Load config.
*

View File

@ -79,7 +79,7 @@ public class GroupCapacityPersistService {
this.mapperManager = MapperManager.instance(isDataSourceLogEnable);
}
private static final class GroupCapacityRowMapper implements RowMapper<GroupCapacity> {
static final class GroupCapacityRowMapper implements RowMapper<GroupCapacity> {
@Override
public GroupCapacity mapRow(ResultSet rs, int rowNum) throws SQLException {

View File

@ -76,7 +76,7 @@ public class TenantCapacityPersistService {
this.mapperManager = MapperManager.instance(isDataSourceLogEnable);
}
private static final class TenantCapacityRowMapper implements RowMapper<TenantCapacity> {
static final class TenantCapacityRowMapper implements RowMapper<TenantCapacity> {
@Override
public TenantCapacity mapRow(ResultSet rs, int rowNum) throws SQLException {

View File

@ -120,9 +120,12 @@ public class DumpChangeConfigWorker implements Runnable {
new Object[] {groupKey, cf.getLastModified(), cf.getMd5()});
ConfigInfoWrapper configInfoWrapper = configInfoPersistService.findConfigInfo(cf.getDataId(),
cf.getGroup(), cf.getTenant());
ConfigCacheService.dumpChange(configInfoWrapper.getDataId(), configInfoWrapper.getGroup(),
LogUtil.DUMP_LOG.info("[dump-change] find change config {}, {}, md5={}",
new Object[] {groupKey, cf.getLastModified(), cf.getMd5()});
ConfigCacheService.dump(configInfoWrapper.getDataId(), configInfoWrapper.getGroup(),
configInfoWrapper.getTenant(), configInfoWrapper.getContent(),
configInfoWrapper.getLastModified(), configInfoWrapper.getEncryptedDataKey());
configInfoWrapper.getLastModified(), configInfoWrapper.getType(),
configInfoWrapper.getEncryptedDataKey());
final String content = configInfoWrapper.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE_GBK);
final String md5Utf8 = MD5Utils.md5Hex(content, Constants.ENCODE_UTF8);

View File

@ -127,7 +127,7 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
private final IdGeneratorManager idGeneratorManager;
private MapperManager mapperManager;
MapperManager mapperManager;
private HistoryConfigInfoPersistService historyConfigInfoPersistService;
@ -184,11 +184,11 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
}
private ConfigOperateResult getConfigInfoOperateResult(String dataId, String group, String tenant) {
ConfigInfoStateWrapper configInfo4Beta = this.findConfigInfoState(dataId, group, tenant);
if (configInfo4Beta == null) {
ConfigInfoStateWrapper configInfo4 = this.findConfigInfoState(dataId, group, tenant);
if (configInfo4 == null) {
return new ConfigOperateResult(false);
}
return new ConfigOperateResult(configInfo4Beta.getId(), configInfo4Beta.getLastModified());
return new ConfigOperateResult(configInfo4.getId(), configInfo4.getLastModified());
}
@ -229,7 +229,8 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
@Override
public ConfigOperateResult insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo,
Map<String, Object> configAdvanceInfo) {
if (Objects.isNull(findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()))) {
if (Objects.isNull(
findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()))) {
return addConfigInfo(srcIp, srcUser, configInfo, configAdvanceInfo);
} else {
return updateConfigInfo(configInfo, srcIp, srcUser, configAdvanceInfo);
@ -239,7 +240,8 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
@Override
public ConfigOperateResult insertOrUpdateCas(String srcIp, String srcUser, ConfigInfo configInfo,
Map<String, Object> configAdvanceInfo) {
if (Objects.isNull(findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()))) {
if (Objects.isNull(
findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()))) {
return addConfigInfo(srcIp, srcUser, configInfo, configAdvanceInfo);
} else {
return updateConfigInfoCas(configInfo, srcIp, srcUser, configAdvanceInfo);
@ -256,7 +258,7 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.PERSIST_ENCODE);
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
@ -338,8 +340,8 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
configAdvanceInfo.put("type", type);
configAdvanceInfo.put("desc", configInfo.getDesc());
try {
ConfigInfo foundCfg = findConfigInfo(configInfo2Save.getDataId(), configInfo2Save.getGroup(),
configInfo2Save.getTenant());
ConfigInfoStateWrapper foundCfg = findConfigInfoState(configInfo2Save.getDataId(),
configInfo2Save.getGroup(), configInfo2Save.getTenant());
if (foundCfg != null) {
throw new Throwable("DuplicateKeyException: config already exists, should be overridden");
}
@ -363,6 +365,7 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
skipitem.put("dataId", skipConfigInfo.getDataId());
skipitem.put("group", skipConfigInfo.getGroup());
skipData.add(skipitem);
skipCount++;
}
break;
} else if (SameConfigPolicy.SKIP.equals(policy)) {
@ -586,7 +589,8 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
Timestamp time = new Timestamp(System.currentTimeMillis());
@ -602,7 +606,7 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
context.putUpdateParameter(FieldConstant.EFFECT, effect);
context.putUpdateParameter(FieldConstant.TYPE, type);
context.putUpdateParameter(FieldConstant.C_SCHEMA, schema);
context.putUpdateParameter(FieldConstant.ENCRYPTED_DATA_KEY, encryptedDataKey);
context.putWhereParameter(FieldConstant.DATA_ID, configInfo.getDataId());
context.putWhereParameter(FieldConstant.GROUP_ID, configInfo.getGroup());
context.putWhereParameter(FieldConstant.TENANT_ID, tenantTmp);

View File

@ -609,7 +609,8 @@ public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
try {
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
@ -627,7 +628,7 @@ public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
context.putUpdateParameter(FieldConstant.EFFECT, effect);
context.putUpdateParameter(FieldConstant.TYPE, type);
context.putUpdateParameter(FieldConstant.C_SCHEMA, schema);
context.putUpdateParameter(FieldConstant.ENCRYPTED_DATA_KEY, encryptedDataKey);
context.putWhereParameter(FieldConstant.DATA_ID, configInfo.getDataId());
context.putWhereParameter(FieldConstant.GROUP_ID, configInfo.getGroup());
context.putWhereParameter(FieldConstant.TENANT_ID, tenantTmp);

View File

@ -34,36 +34,32 @@ import java.util.concurrent.TimeUnit;
*/
public final class ConfigExecutor {
private static final ScheduledExecutorService TIMER_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class), 8,
new NameThreadFactory("com.alibaba.nacos.config.server.timer"));
private static final ScheduledExecutorService TIMER_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), 8,
new NameThreadFactory("com.alibaba.nacos.config.server.timer"));
private static final ScheduledExecutorService CAPACITY_MANAGEMENT_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
new NameThreadFactory("com.alibaba.nacos.config.CapacityManagement"));
private static final ScheduledExecutorService CAPACITY_MANAGEMENT_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class),
new NameThreadFactory("com.alibaba.nacos.config.CapacityManagement"));
private static final ScheduledExecutorService ASYNC_NOTIFY_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class), 100,
new NameThreadFactory("com.alibaba.nacos.config.AsyncNotifyService"));
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_PLUGIN_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.plugin.AsyncService"));
private static final ScheduledExecutorService CONFIG_SUB_SERVICE_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.ConfigSubService"));
private static final ScheduledExecutorService ASYNC_NOTIFY_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), 100,
new NameThreadFactory("com.alibaba.nacos.config.AsyncNotifyService"));
private static final ScheduledExecutorService LONG_POLLING_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
new NameThreadFactory("com.alibaba.nacos.config.LongPolling"));
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_PLUGIN_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.plugin.AsyncService"));
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.server.remote.ConfigChangeNotifier"));
private static final ScheduledExecutorService CONFIG_SUB_SERVICE_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.ConfigSubService"));
private static final ScheduledExecutorService LONG_POLLING_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), new NameThreadFactory("com.alibaba.nacos.config.LongPolling"));
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(Config.class), ThreadUtils.getSuitableThreadCount(),
new NameThreadFactory("com.alibaba.nacos.config.server.remote.ConfigChangeNotifier"));
public static void scheduleConfigTask(Runnable command, long initialDelay, long delay, TimeUnit unit) {
TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
@ -84,11 +80,11 @@ public final class ConfigExecutor {
public static void scheduleAsyncNotify(Runnable command, long delay, TimeUnit unit) {
ASYNC_NOTIFY_EXECUTOR.schedule(command, delay, unit);
}
public static void executeAsyncConfigChangePluginTask(Runnable runnable) {
ASYNC_CONFIG_CHANGE_PLUGIN_EXECUTOR.execute(runnable);
}
public static int asyncNotifyQueueSize() {
return ((ScheduledThreadPoolExecutor) ASYNC_NOTIFY_EXECUTOR).getQueue().size();
}
@ -105,6 +101,10 @@ public final class ConfigExecutor {
return ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR;
}
public static ScheduledFuture<?> scheduleClientConfigNotifier(Runnable runnable, long delay, TimeUnit unit) {
return ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR.schedule(runnable, delay, unit);
}
public static void scheduleLongPolling(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
LONG_POLLING_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}

View File

@ -0,0 +1,227 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.aspect;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.config.server.configuration.ConfigChangeConfigs;
import com.alibaba.nacos.config.server.model.SameConfigPolicy;
import com.alibaba.nacos.config.server.utils.RequestUtil;
import com.alibaba.nacos.plugin.config.ConfigChangePluginManager;
import com.alibaba.nacos.plugin.config.constants.ConfigChangeConstants;
import com.alibaba.nacos.plugin.config.constants.ConfigChangeExecuteTypes;
import com.alibaba.nacos.plugin.config.constants.ConfigChangePointCutTypes;
import com.alibaba.nacos.plugin.config.model.ConfigChangeRequest;
import com.alibaba.nacos.plugin.config.model.ConfigChangeResponse;
import com.alibaba.nacos.plugin.config.spi.ConfigChangePluginService;
import com.alibaba.nacos.sys.utils.PropertiesUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Arrays;
import java.util.Properties;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@RunWith(SpringJUnit4ClassRunner.class)
public class ConfigChangeAspectTest {
ConfigChangeAspect configChangeAspect;
ConfigChangeConfigs configChangeConfigs;
@Mock
ConfigChangePluginService configChangePluginService;
MockedStatic<PropertiesUtil> propertiesStatic;
MockedStatic<RequestUtil> requestUtilMockedStatic;
@Before
public void before() {
//mock config change service enabled.
propertiesStatic = Mockito.mockStatic(PropertiesUtil.class);
requestUtilMockedStatic = Mockito.mockStatic(RequestUtil.class);
Properties properties = new Properties();
properties.put("mockedConfigChangeService.enabled", "true");
propertiesStatic.when(() -> PropertiesUtil.getPropertiesWithPrefix(any(),
eq(ConfigChangeConstants.NACOS_CORE_CONFIG_PLUGIN_PREFIX))).thenReturn(properties);
requestUtilMockedStatic.when(() -> RequestUtil.getSrcUserName(any(HttpServletRequest.class)))
.thenReturn("mockedUser");
Mockito.when(configChangePluginService.getServiceType()).thenReturn("mockedConfigChangeService");
Mockito.when(configChangePluginService.pointcutMethodNames()).thenReturn(ConfigChangePointCutTypes.values());
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_AFTER_TYPE);
ConfigChangePluginManager.join(configChangePluginService);
configChangeConfigs = new ConfigChangeConfigs();
configChangeAspect = new ConfigChangeAspect(configChangeConfigs);
}
@After
public void after() {
propertiesStatic.close();
requestUtilMockedStatic.close();
}
@Test
public void testImportConfigAround() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_AFTER_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
String srcUser = "user12324";
String namespace = "tenant234";
SameConfigPolicy policy = SameConfigPolicy.ABORT;
MultipartFile file = Mockito.mock(MultipartFile.class);
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn("mock success return");
Object o = configChangeAspect.importConfigAround(proceedingJoinPoint, request, srcUser, namespace, policy,
file);
Thread.sleep(20L);
// expect service executed.
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
//expect join point processed success.
Assert.assertEquals("mock success return", o);
}
@Test
public void testPublishOrUpdateConfigAround() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_AFTER_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
String srcUser = "user12324";
String dataId = "d1";
String group = "g1";
String tenant = "t1";
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn("mock success return");
Object o = configChangeAspect.publishOrUpdateConfigAround(proceedingJoinPoint, request, response, dataId, group,
tenant, "c1", null, null, srcUser, null, null, null, null, null);
Thread.sleep(20L);
// expect service executed.
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
//expect join point processed success.
Assert.assertEquals("mock success return", o);
}
@Test
public void testRemoveConfigByIdAround() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_AFTER_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
String dataId = "d1";
String group = "g1";
String tenant = "t1";
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn("mock success return");
Object o = configChangeAspect.removeConfigByIdAround(proceedingJoinPoint, request, response, dataId, group,
tenant);
Thread.sleep(20L);
// expect service executed.
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
//expect join point processed success.
Assert.assertEquals("mock success return", o);
}
@Test
public void testRemoveConfigByIdsAround() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_AFTER_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn("mock success return");
Object o = configChangeAspect.removeConfigByIdsAround(proceedingJoinPoint, request, Arrays.asList(1L, 2L));
Thread.sleep(20L);
// expect service executed.
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
//expect join point processed success.
Assert.assertEquals("mock success return", o);
}
@Test
public void testPublishConfigAroundRpc() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_BEFORE_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
ConfigPublishRequest request = new ConfigPublishRequest();
RequestMeta requestMeta = new RequestMeta();
ConfigPublishResponse configPublishResponse = ConfigPublishResponse.buildSuccessResponse();
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn(configPublishResponse);
//execute
Object o = configChangeAspect.publishConfigAroundRpc(proceedingJoinPoint, request, requestMeta);
//expect
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
Assert.assertEquals(configPublishResponse, o);
}
@Test
public void testPublishConfigAroundRpcException() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_BEFORE_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
ConfigPublishRequest request = new ConfigPublishRequest();
RequestMeta requestMeta = new RequestMeta();
Mockito.when(proceedingJoinPoint.proceed(any())).thenThrow(new NacosRuntimeException(503));
//execute
Object o = configChangeAspect.publishConfigAroundRpc(proceedingJoinPoint, request, requestMeta);
//expect
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
Assert.assertTrue(((ConfigPublishResponse) o).getMessage().contains("config change join point fail"));
}
@Test
public void testRemoveConfigAroundRpc() throws Throwable {
Mockito.when(configChangePluginService.executeType()).thenReturn(ConfigChangeExecuteTypes.EXECUTE_BEFORE_TYPE);
ProceedingJoinPoint proceedingJoinPoint = Mockito.mock(ProceedingJoinPoint.class);
ConfigRemoveRequest request = new ConfigRemoveRequest();
RequestMeta requestMeta = new RequestMeta();
ConfigPublishResponse configPublishResponse = ConfigPublishResponse.buildSuccessResponse();
Mockito.when(proceedingJoinPoint.proceed(any())).thenReturn(configPublishResponse);
//execute
Object o = configChangeAspect.removeConfigAroundRpc(proceedingJoinPoint, request, requestMeta);
//expect
Mockito.verify(configChangePluginService, Mockito.times(1))
.execute(any(ConfigChangeRequest.class), any(ConfigChangeResponse.class));
Assert.assertEquals(configPublishResponse, o);
}
}

View File

@ -39,7 +39,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import javax.servlet.ServletContext;
import java.sql.Timestamp;
import static org.mockito.ArgumentMatchers.eq;
@ -49,7 +48,7 @@ import static org.mockito.Mockito.when;
@ContextConfiguration(classes = MockServletContext.class)
@WebAppConfiguration
public class CapacityControllerTest {
@InjectMocks
CapacityController capacityController;
@ -70,8 +69,8 @@ public class CapacityControllerTest {
}
@Test
public void testGetCapacity() throws Exception {
public void testGetCapacityNormal() throws Exception {
Capacity capacity = new Capacity();
capacity.setId(1L);
capacity.setMaxAggrCount(1);
@ -96,13 +95,42 @@ public class CapacityControllerTest {
}
@Test
public void testUpdateCapacity1x() throws Exception {
public void testGetCapacityException() throws Exception {
Capacity capacity = new Capacity();
capacity.setId(1L);
capacity.setMaxAggrCount(1);
capacity.setMaxSize(1);
capacity.setMaxAggrSize(1);
capacity.setGmtCreate(new Timestamp(1));
capacity.setGmtModified(new Timestamp(2));
when(capacityService.getCapacityWithDefault(eq("test"), eq("test"))).thenReturn(capacity);
// tenant & group is null
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(Constants.CAPACITY_CONTROLLER_PATH);
String actualValue = mockMvc.perform(builder).andReturn().getResponse().getContentAsString();
System.out.println(actualValue);
// tenant is blank& group is null
MockHttpServletRequestBuilder builder2 = MockMvcRequestBuilders.get(Constants.CAPACITY_CONTROLLER_PATH)
.param("tenant", "");
String actualValue2 = mockMvc.perform(builder2).andReturn().getResponse().getContentAsString();
System.out.println(actualValue2);
// tenant is blank& group is null
when(capacityService.getCapacityWithDefault(eq("g1"), eq("123"))).thenThrow(new NullPointerException());
MockHttpServletRequestBuilder builder3 = MockMvcRequestBuilders.get(Constants.CAPACITY_CONTROLLER_PATH)
.param("tenant", "123").param("group", "g1");
String actualValue3 = mockMvc.perform(builder3).andReturn().getResponse().getContentAsString();
System.out.println(actualValue3);
}
@Test
public void testUpdateCapacity1x() throws Exception {
when(capacityService.insertOrUpdateCapacity("test", "test", 1, 1, 1, 1)).thenReturn(true);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.post(Constants.CAPACITY_CONTROLLER_PATH)
.param("group", "test").param("tenant", "test")
.param("quota", "1").param("maxSize", "1")
.param("group", "test").param("tenant", "test").param("quota", "1").param("maxSize", "1")
.param("maxAggrCount", "1").param("maxAggrSize", "1");
String actualValue = mockMvc.perform(builder).andReturn().getResponse().getContentAsString();
String code = JacksonUtils.toObj(actualValue).get("code").toString();
@ -140,8 +168,7 @@ public class CapacityControllerTest {
when(capacityService.insertOrUpdateCapacity("test", "test", 1, 1, 1, 1)).thenReturn(false);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.post(Constants.CAPACITY_CONTROLLER_PATH)
.param("group", "test").param("tenant", "test")
.param("quota", "1").param("maxSize", "1")
.param("group", "test").param("tenant", "test").param("quota", "1").param("maxSize", "1")
.param("maxAggrCount", "1").param("maxAggrSize", "1");
String actualValue = mockMvc.perform(builder).andReturn().getResponse().getContentAsString();
String code = JacksonUtils.toObj(actualValue).get("code").toString();

View File

@ -17,7 +17,9 @@
package com.alibaba.nacos.config.server.controller;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
@ -43,7 +45,10 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import javax.servlet.ServletContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@ -54,7 +59,7 @@ import static org.mockito.Mockito.when;
@ContextConfiguration(classes = MockServletContext.class)
@WebAppConfiguration
public class ClientMetricsControllerTest {
@InjectMocks
ClientMetricsController clientMetricsController;
@ -80,19 +85,70 @@ public class ClientMetricsControllerTest {
@Test
public void testGetClusterMetric() throws Exception {
when(memberManager.allMembers()).thenReturn(new ArrayList<>());
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(Constants.METRICS_CONTROLLER_PATH + "/cluster")
.param("ip", "127.0.0.1").param("tenant", "test")
List<Member> members = new ArrayList<>();
Member m1 = new Member();
m1.setIp("127.0.0.1");
m1.setPort(8848);
members.add(m1);
Member m2 = new Member();
m2.setIp("127.0.0.1");
m2.setPort(9848);
members.add(m2);
Member m3 = new Member();
m3.setIp("127.0.0.1");
m3.setPort(7848);
members.add(m3);
when(memberManager.allMembers()).thenReturn(members);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(
Constants.METRICS_CONTROLLER_PATH + "/cluster").param("ip", "127.0.0.1").param("tenant", "test")
.param("dataId", "test").param("group", "test");
int actualValue = mockMvc.perform(builder).andReturn().getResponse().getStatus();
Assert.assertEquals(200, actualValue);
}
@Test
public void testGetCurrentMetric() throws Exception {
public void testClusterMetricsCallBack() {
Member m1 = new Member();
m1.setIp("127.0.0.1");
m1.setPort(8848);
//success result
RestResult<Map> result1 = new RestResult<>();
HashMap<String, Object> stringObjectHashMap = new HashMap<>();
stringObjectHashMap.put("test", "md5..");
result1.setData(stringObjectHashMap);
result1.setCode(200);
CountDownLatch latch = new CountDownLatch(5);
String dataId = "d1";
String group = "g1";
String tenant = "t1";
String ip = "192.168.0.1";
Map<String, Object> responseMap = new HashMap<>();
ClientMetricsController.ClusterMetricsCallBack clusterMetricsCallBack = new ClientMetricsController.ClusterMetricsCallBack(
responseMap, latch, dataId, group, tenant, ip, m1);
clusterMetricsCallBack.onReceive(result1);
//fail result
RestResult<Map> result2 = new RestResult<>();
HashMap<String, Object> stringObjectHashMap2 = new HashMap<>();
stringObjectHashMap2.put("test2", "md5..");
result2.setData(stringObjectHashMap2);
result2.setCode(500);
clusterMetricsCallBack.onReceive(result2);
//error and cancel
clusterMetricsCallBack.onError(new NullPointerException());
clusterMetricsCallBack.onCancel();
clusterMetricsCallBack.onCancel();
Assert.assertEquals(stringObjectHashMap, responseMap);
Assert.assertEquals(0, latch.getCount());
}
@Test
public void testGetCurrentMetric() throws Exception {
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
response.putMetric("test", "test");
Connection connection = Mockito.mock(Connection.class);
@ -100,13 +156,13 @@ public class ClientMetricsControllerTest {
List<Connection> connections = new ArrayList<>();
connections.add(connection);
when(connectionManager.getConnectionByIp(eq("127.0.0.1"))).thenReturn(connections);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(Constants.METRICS_CONTROLLER_PATH + "/current")
.param("ip", "127.0.0.1").param("tenant", "test")
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(
Constants.METRICS_CONTROLLER_PATH + "/current").param("ip", "127.0.0.1").param("tenant", "test")
.param("dataId", "test").param("group", "test");
String actualValue = mockMvc.perform(builder).andReturn().getResponse().getContentAsString();
Assert.assertEquals("{\"test\":\"test\"}", actualValue);
}
}

View File

@ -17,6 +17,9 @@
package com.alibaba.nacos.config.server.controller;
import com.alibaba.nacos.common.http.param.MediaType;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.controller.parameters.SameNamespaceCloneConfigBean;
@ -24,12 +27,15 @@ import com.alibaba.nacos.config.server.model.ConfigAdvanceInfo;
import com.alibaba.nacos.config.server.model.ConfigAllInfo;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper;
import com.alibaba.nacos.config.server.model.ConfigMetadata;
import com.alibaba.nacos.config.server.model.GroupkeyListenserStatus;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.ConfigOperationService;
import com.alibaba.nacos.config.server.service.ConfigSubService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.YamlParserUtil;
import com.alibaba.nacos.config.server.utils.ZipUtils;
import com.alibaba.nacos.core.namespace.repository.NamespacePersistService;
import com.alibaba.nacos.persistence.model.Page;
@ -57,13 +63,16 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import javax.servlet.ServletContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(SpringJUnit4ClassRunner.class)
@ -166,6 +175,31 @@ public class ConfigControllerTest {
@Test
public void testDeleteConfigs() throws Exception {
List<ConfigInfo> resultInfos = new ArrayList<>();
String dataId = "dataId1123";
String group = "group34567";
String tenant = "tenant45678";
resultInfos.add(new ConfigInfo(dataId, group, tenant));
Mockito.when(configInfoPersistService.removeConfigInfoByIds(eq(Arrays.asList(1L, 2L)), anyString(), eq(null)))
.thenReturn(resultInfos);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.delete(Constants.CONFIG_CONTROLLER_PATH)
.param("delType", "ids").param("ids", "1,2");
@ -175,6 +209,9 @@ public class ConfigControllerTest {
String data = JacksonUtils.toObj(actualValue).get("data").toString();
Assert.assertEquals("200", code);
Assert.assertEquals("true", data);
Thread.sleep(200L);
//expect
Assert.assertTrue(reference.get() != null);
}
@Test
@ -337,9 +374,24 @@ public class ConfigControllerTest {
@Test
public void testExportConfig() throws Exception {
String dataId = "dataId1.json";
String group = "group2";
String tenant = "tenant234";
String appname = "appname2";
ConfigAllInfo configAllInfo = new ConfigAllInfo();
configAllInfo.setDataId(dataId);
configAllInfo.setGroup(group);
configAllInfo.setTenant(tenant);
configAllInfo.setContent("contet45678");
configAllInfo.setAppName(appname);
List<ConfigAllInfo> dataList = new ArrayList<>();
dataList.add(configAllInfo);
Mockito.when(configInfoPersistService.findAllConfigInfo4Export(eq(dataId), eq(group), eq(tenant), eq(appname),
eq(Arrays.asList(1L, 2L)))).thenReturn(dataList);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(Constants.CONFIG_CONTROLLER_PATH)
.param("export", "true").param("dataId", "test").param("group", "test").param("tenant", "")
.param("ids", "1,2");
.param("export", "true").param("dataId", dataId).param("group", group).param("tenant", tenant)
.param("appName", appname).param("ids", "1,2");
int actualValue = mockmvc.perform(builder).andReturn().getResponse().getStatus();
@ -348,10 +400,23 @@ public class ConfigControllerTest {
@Test
public void testExportConfigV2() throws Exception {
String dataId = "dataId2.json";
String group = "group2";
String tenant = "tenant234";
String appname = "appname2";
ConfigAllInfo configAllInfo = new ConfigAllInfo();
configAllInfo.setDataId(dataId);
configAllInfo.setGroup(group);
configAllInfo.setTenant(tenant);
configAllInfo.setAppName(appname);
configAllInfo.setContent("content1234");
List<ConfigAllInfo> dataList = new ArrayList<>();
dataList.add(configAllInfo);
Mockito.when(configInfoPersistService.findAllConfigInfo4Export(eq(dataId), eq(group), eq(tenant), eq(appname),
eq(Arrays.asList(1L, 2L)))).thenReturn(dataList);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(Constants.CONFIG_CONTROLLER_PATH)
.param("exportV2", "true").param("dataId", "test").param("group", "test").param("tenant", "")
.param("ids", "1,2");
.param("exportV2", "true").param("dataId", dataId).param("group", group).param("tenant", tenant)
.param("appName", appname).param("ids", "1,2");
int actualValue = mockmvc.perform(builder).andReturn().getResponse().getStatus();
@ -389,6 +454,48 @@ public class ConfigControllerTest {
zipUtilsMockedStatic.close();
}
@Test
public void testImportAndPublishConfigV2() throws Exception {
List<ZipUtils.ZipItem> zipItems = new ArrayList<>();
String dataId = "dataId23456.json";
String group = "group132";
String content = "content1234";
ZipUtils.ZipItem zipItem = new ZipUtils.ZipItem(group + "/" + dataId, content);
zipItems.add(zipItem);
ConfigMetadata configMetadata = new ConfigMetadata();
configMetadata.setMetadata(new ArrayList<>());
ConfigMetadata.ConfigExportItem configExportItem = new ConfigMetadata.ConfigExportItem();
configExportItem.setDataId(dataId);
configExportItem.setGroup(group);
configExportItem.setType("json");
configExportItem.setAppName("appna123");
configMetadata.getMetadata().add(configExportItem);
ZipUtils.UnZipResult unziped = new ZipUtils.UnZipResult(zipItems,
new ZipUtils.ZipItem(Constants.CONFIG_EXPORT_METADATA_NEW, YamlParserUtil.dumpObject(configMetadata)));
MockMultipartFile file = new MockMultipartFile("file", "test.zip", "application/zip", "test".getBytes());
MockedStatic<ZipUtils> zipUtilsMockedStatic = Mockito.mockStatic(ZipUtils.class);
zipUtilsMockedStatic.when(() -> ZipUtils.unzip(eq(file.getBytes()))).thenReturn(unziped);
when(namespacePersistService.tenantInfoCountByTenantId("public")).thenReturn(1);
Map<String, Object> map = new HashMap<>();
map.put("test", "test");
when(configInfoPersistService.batchInsertOrUpdate(anyList(), anyString(), anyString(), any(),
any())).thenReturn(map);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.multipart(Constants.CONFIG_CONTROLLER_PATH)
.file(file).param("import", "true").param("src_user", "test").param("namespace", "public")
.param("policy", "ABORT");
String actualValue = mockmvc.perform(builder).andReturn().getResponse().getContentAsString();
String code = JacksonUtils.toObj(actualValue).get("code").toString();
Assert.assertEquals("200", code);
Map<String, Object> resultMap = JacksonUtils.toObj(JacksonUtils.toObj(actualValue).get("data").toString(),
Map.class);
Assert.assertEquals(map.get("test"), resultMap.get("test").toString());
zipUtilsMockedStatic.close();
}
@Test
public void testCloneConfig() throws Exception {
SameNamespaceCloneConfigBean sameNamespaceCloneConfigBean = new SameNamespaceCloneConfigBean();

View File

@ -16,12 +16,20 @@
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMeta;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.core.remote.grpc.GrpcConnection;
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
import com.alibaba.nacos.plugin.control.tps.TpsControlManager;
import com.alibaba.nacos.plugin.control.tps.request.TpsCheckRequest;
import com.alibaba.nacos.plugin.control.tps.response.TpsCheckResponse;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,7 +40,17 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@RunWith(MockitoJUnitRunner.class)
public class RpcConfigChangeNotifierTest {
@ -56,9 +74,15 @@ public class RpcConfigChangeNotifierTest {
MockedStatic<ControlManagerCenter> controlManagerCenterMockedStatic;
MockedStatic<EnvUtil> envUtilMockedStatic;
@Before
public void setUp() {
envUtilMockedStatic = Mockito.mockStatic(EnvUtil.class);
envUtilMockedStatic.when(
() -> EnvUtil.getProperty(eq("nacos.config.push.maxRetryTime"), eq(Integer.class), anyInt()))
.thenReturn(3);
controlManagerCenterMockedStatic = Mockito.mockStatic(ControlManagerCenter.class);
Mockito.when(ControlManagerCenter.getInstance()).thenReturn(controlManagerCenter);
Mockito.when(ControlManagerCenter.getInstance().getTpsControlManager()).thenReturn(tpsControlManager);
@ -69,16 +93,109 @@ public class RpcConfigChangeNotifierTest {
}
@After
public void after() {
envUtilMockedStatic.close();
controlManagerCenterMockedStatic.close();
}
@Test
public void testOnEvent() {
public void testOnDataEvent() throws InterruptedException {
final String groupKey = GroupKey2.getKey("nacos.internal.tps.control_rule_1", "nacos", "tenant");
final String limitGroupKey = GroupKey2.getKey("nacos.internal.tps.nacos.internal.connection.limit.rule",
"nacos", "tenant");
List<String> betaIps = new ArrayList<>();
betaIps.add("1.1.1.1");
Set<String> mockConnectionIds = new HashSet<>();
mockConnectionIds.add("con1");
mockConnectionIds.add("con2");
mockConnectionIds.add("con3");
GrpcConnection mockConn1 = Mockito.mock(GrpcConnection.class);
GrpcConnection mockConn3 = Mockito.mock(GrpcConnection.class);
//mock con1 push normal
Mockito.when(connectionManager.getConnection(eq("con1"))).thenReturn(mockConn1);
Mockito.when(mockConn1.getMetaInfo()).thenReturn(
new ConnectionMeta("con1", "192.168.0.1", "192.168.0.2", 34567, 9848, "GRPC", "2.2.0", null,
new HashMap<>()));
//mock con1 noy exist
Mockito.when(connectionManager.getConnection(eq("con2"))).thenReturn(null);
Mockito.when(connectionManager.getConnection(eq("con3"))).thenReturn(mockConn3);
Mockito.when(mockConn3.getMetaInfo()).thenReturn(
new ConnectionMeta("con3", "192.168.0.1", "192.168.0.2", 34567, 9848, "GRPC", "2.2.0", null,
new HashMap<>()));
Mockito.when(configChangeListenContext.getListeners(eq(groupKey))).thenReturn(mockConnectionIds);
//mock push tps passed
Mockito.when(tpsControlManager.check(any(TpsCheckRequest.class)))
.thenReturn(new TpsCheckResponse(true, 200, "success"));
rpcConfigChangeNotifier.onEvent(new LocalDataChangeEvent(groupKey, true, betaIps));
rpcConfigChangeNotifier.onEvent(new LocalDataChangeEvent(limitGroupKey));
//wait rpc push executed.
Thread.sleep(50L);
//expect rpc push task run.
Mockito.verify(rpcPushService, times(1)).pushWithCallback(eq("con1"), any(ConfigChangeNotifyRequest.class),
any(RpcConfigChangeNotifier.RpcPushCallback.class), any(Executor.class));
Mockito.verify(rpcPushService, times(1)).pushWithCallback(eq("con3"), any(ConfigChangeNotifyRequest.class),
any(RpcConfigChangeNotifier.RpcPushCallback.class), any(Executor.class));
}
@Test
public void testRpcCallBack() {
MockedStatic<ConfigExecutor> configExecutorMockedStatic = Mockito.mockStatic(ConfigExecutor.class);
try {
RpcConfigChangeNotifier.RpcPushTask task = Mockito.mock(RpcConfigChangeNotifier.RpcPushTask.class);
Mockito.when(task.getConnectionId()).thenReturn("testconn1");
Mockito.when(connectionManager.getConnection(eq("testconn1")))
.thenReturn(Mockito.mock(GrpcConnection.class));
ConfigChangeNotifyRequest notifyRequest = new ConfigChangeNotifyRequest();
notifyRequest.setDataId("d1");
notifyRequest.setGroup("g1");
Mockito.when(task.getNotifyRequest()).thenReturn(notifyRequest);
//mock task not overtimes and receive exception on callback
Mockito.when(task.isOverTimes()).thenReturn(false);
Mockito.when(task.getTryTimes()).thenReturn(2);
RpcConfigChangeNotifier.RpcPushCallback rpcPushCallback = new RpcConfigChangeNotifier.RpcPushCallback(task,
tpsControlManager, connectionManager);
rpcPushCallback.onFail(new RuntimeException());
//expect config push fail be recorded.
Mockito.verify(tpsControlManager, times(1)).check(any(TpsCheckRequest.class));
//expect schedule this task next retry times
configExecutorMockedStatic.verify(
() -> ConfigExecutor.scheduleClientConfigNotifier(any(RpcConfigChangeNotifier.RpcPushTask.class),
eq(2 * 2L), eq(TimeUnit.SECONDS)));
//mock
rpcPushCallback.onSuccess();
//expect config push success be recorded.
Mockito.verify(tpsControlManager, times(2)).check(any(TpsCheckRequest.class));
//mock task is over times
Mockito.when(task.isOverTimes()).thenReturn(true);
rpcPushCallback.onFail(new NullPointerException());
Mockito.verify(connectionManager, times(1)).unregister(eq("testconn1"));
} finally {
configExecutorMockedStatic.close();
}
}
static final String POINT_CONFIG_PUSH = "CONFIG_PUSH_COUNT";
static final String POINT_CONFIG_PUSH_SUCCESS = "CONFIG_PUSH_SUCCESS";
static final String POINT_CONFIG_PUSH_FAIL = "CONFIG_PUSH_FAIL";
@Test
public void testRegisterTpsPoint() {
rpcConfigChangeNotifier.registerTpsPoint();
Mockito.verify(tpsControlManager, Mockito.times(1)).registerTpsPoint(eq(POINT_CONFIG_PUSH));
Mockito.verify(tpsControlManager, Mockito.times(1)).registerTpsPoint(eq(POINT_CONFIG_PUSH_SUCCESS));
Mockito.verify(tpsControlManager, Mockito.times(1)).registerTpsPoint(eq(POINT_CONFIG_PUSH_FAIL));
}
}

View File

@ -0,0 +1,315 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.model.CacheItem;
import com.alibaba.nacos.config.server.model.ConfigCache;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskService;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskServiceFactory;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
@RunWith(SpringJUnit4ClassRunner.class)
public class ConfigCacheServiceTest {
MockedStatic<PropertyUtil> propertyUtilMockedStatic;
MockedStatic<ConfigDiskServiceFactory> configDiskServiceFactoryMockedStatic;
@Mock
ConfigDiskService configDiskService;
MockedStatic<EnvUtil> envUtilMockedStatic;
@Before
public void before() {
envUtilMockedStatic = Mockito.mockStatic(EnvUtil.class);
configDiskServiceFactoryMockedStatic = Mockito.mockStatic(ConfigDiskServiceFactory.class);
configDiskServiceFactoryMockedStatic.when(() -> ConfigDiskServiceFactory.getInstance())
.thenReturn(configDiskService);
propertyUtilMockedStatic = Mockito.mockStatic(PropertyUtil.class);
}
@After
public void after() {
envUtilMockedStatic.close();
propertyUtilMockedStatic.close();
configDiskServiceFactoryMockedStatic.close();
}
@Test
public void testDumpFormal() throws Exception {
String dataId = "dataIdtestDumpMd5NewTsNewMd5123";
String group = "group11";
String tenant = "tenant112";
String content = "mockContnet11";
String md5 = "mockmd511";
String groupKey = GroupKey2.getKey(dataId, group, tenant);
//make sure not exist prev cache.
CacheItem contentCache = ConfigCacheService.getContentCache(groupKey);
Assert.assertTrue(contentCache == null);
long ts = System.currentTimeMillis();
String type = "json";
String encryptedDataKey = "key12345";
boolean result = ConfigCacheService.dumpWithMd5(dataId, group, tenant, content, md5, ts, type,
encryptedDataKey);
Assert.assertTrue(result);
//verify cache.
CacheItem contentCache1 = ConfigCacheService.getContentCache(groupKey);
Assert.assertEquals(ts, contentCache1.getConfigCache().getLastModifiedTs());
Assert.assertEquals(md5, contentCache1.getConfigCache().getMd5Utf8());
Assert.assertEquals(type, contentCache1.getType());
Assert.assertEquals(encryptedDataKey, contentCache1.getConfigCache().getEncryptedDataKey());
Mockito.verify(configDiskService, times(1)).saveToDisk(eq(dataId), eq(group), eq(tenant), eq(content));
//modified ts and content and md5
String contentNew = content + "11";
long newTs = System.currentTimeMillis() + 12L;
ConfigCacheService.dump(dataId, group, tenant, contentNew, newTs, type, encryptedDataKey);
//expect save to disk invoked.
Mockito.verify(configDiskService, times(1)).saveToDisk(eq(dataId), eq(group), eq(tenant), eq(contentNew));
Assert.assertEquals(newTs, contentCache1.getConfigCache().getLastModifiedTs());
String newMd5 = MD5Utils.md5Hex(contentNew, "UTF-8");
Assert.assertEquals(newMd5, contentCache1.getConfigCache().getMd5Utf8());
//modified ts old
long oldTs2 = newTs - 123L;
String contentWithOldTs = contentNew + "123456";
ConfigCacheService.dump(dataId, group, tenant, contentWithOldTs, oldTs2, type, encryptedDataKey);
//expect save to disk invoked.
Mockito.verify(configDiskService, times(0)).saveToDisk(eq(dataId), eq(group), eq(tenant), eq(contentWithOldTs));
//not change ts and md5
Assert.assertEquals(newTs, contentCache1.getConfigCache().getLastModifiedTs());
Assert.assertEquals(newMd5, contentCache1.getConfigCache().getMd5Utf8());
//modified ts new only
long newTs2 = newTs + 123L;
ConfigCacheService.dump(dataId, group, tenant, contentNew, newTs2, type, encryptedDataKey);
Assert.assertEquals(newTs2, contentCache1.getConfigCache().getLastModifiedTs());
//save to disk error
doThrow(new IOException("No space left on device")).when(configDiskService)
.saveToDisk(anyString(), anyString(), anyString(), anyString());
try {
long newTs3 = newTs2 + 123L;
boolean dumpErrorResult = ConfigCacheService.dump(dataId, group, tenant, contentNew + "234567", newTs3,
type, encryptedDataKey);
envUtilMockedStatic.verify(() -> EnvUtil.systemExit(), times(1));
Assert.assertFalse(dumpErrorResult);
} catch (Throwable throwable) {
Assert.assertFalse(true);
}
//test remove
boolean remove = ConfigCacheService.remove(dataId, group, tenant);
Assert.assertTrue(remove);
Mockito.verify(configDiskService, times(1)).removeConfigInfo(dataId, group, tenant);
CacheItem contentCacheAfterRemove = ConfigCacheService.getContentCache(groupKey);
Assert.assertNull(contentCacheAfterRemove);
}
@Test
public void testDumpBeta() throws Exception {
String dataId = "dataIdtestDumpBetaNewCache123";
String group = "group11";
String tenant = "tenant112";
String content = "mockContnet11";
String md5 = MD5Utils.md5Hex(content, "UTF-8");
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String encryptedDataKey = "key12345";
List<String> betaIps = Arrays.asList("127.0.0.1", "127.0.0.2");
long ts = System.currentTimeMillis();
//init beta cache
boolean result = ConfigCacheService.dumpBeta(dataId, group, tenant, content, ts, String.join(",", betaIps),
encryptedDataKey);
Assert.assertTrue(result);
CacheItem contentCache = ConfigCacheService.getContentCache(groupKey);
Assert.assertEquals(md5, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(ts, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(betaIps, contentCache.getIps4Beta());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Mockito.verify(configDiskService, times(1)).saveBetaToDisk(eq(dataId), eq(group), eq(tenant), eq(content));
//ts newer ,md5 update
long tsNew = System.currentTimeMillis();
String contentNew = content + tsNew;
String md5New = MD5Utils.md5Hex(contentNew, "UTF-8");
List<String> betaIpsNew = Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3");
boolean resultNew = ConfigCacheService.dumpBeta(dataId, group, tenant, contentNew, tsNew,
String.join(",", betaIpsNew), encryptedDataKey);
Assert.assertTrue(resultNew);
Assert.assertEquals(md5New, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(tsNew, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Assert.assertEquals(betaIpsNew, contentCache.getIps4Beta());
Mockito.verify(configDiskService, times(1)).saveBetaToDisk(eq(dataId), eq(group), eq(tenant), eq(contentNew));
//ts old ,md5 update
long tsOld = tsNew - 1;
String contentWithOldTs = "contentWithOldTs" + tsOld;
List<String> betaIpsWithOldTs = Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.4");
boolean resultOld = ConfigCacheService.dumpBeta(dataId, group, tenant, contentWithOldTs, tsOld,
String.join(",", betaIpsWithOldTs), encryptedDataKey);
Assert.assertTrue(resultOld);
Assert.assertEquals(md5New, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(tsNew, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Assert.assertEquals(betaIpsNew, contentCache.getIps4Beta());
Mockito.verify(configDiskService, times(0))
.saveBetaToDisk(eq(dataId), eq(group), eq(tenant), eq(contentWithOldTs));
//ts new ,md5 not update,beta ips list changes
long tsNew2 = tsNew + 1;
String contentWithPrev = contentNew;
List<String> betaIpsNew2 = Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.4", "127.0.0.5");
boolean resultNew2 = ConfigCacheService.dumpBeta(dataId, group, tenant, contentWithPrev, tsNew2,
String.join(",", betaIpsNew2), encryptedDataKey);
Assert.assertTrue(resultNew2);
Assert.assertEquals(md5New, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(tsNew2, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Assert.assertEquals(betaIpsNew2, contentCache.getIps4Beta());
//ts new only,md5 not update,beta ips not change
long tsNew3 = tsNew2 + 1;
String contentWithPrev2 = contentNew;
List<String> betaIpsNew3 = betaIpsNew2;
boolean resultNew3 = ConfigCacheService.dumpBeta(dataId, group, tenant, contentWithPrev2, tsNew3,
String.join(",", betaIpsNew3), encryptedDataKey);
Assert.assertTrue(resultNew3);
Assert.assertEquals(md5New, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(tsNew3, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Assert.assertEquals(betaIpsNew2, contentCache.getIps4Beta());
//ts not update,md5 not update,beta ips not change
long tsNew4 = tsNew3;
String contentWithPrev4 = contentNew;
List<String> betaIpsNew4 = betaIpsNew2;
boolean resultNew4 = ConfigCacheService.dumpBeta(dataId, group, tenant, contentWithPrev4, tsNew4,
String.join(",", betaIpsNew4), encryptedDataKey);
Assert.assertTrue(resultNew4);
Assert.assertEquals(md5New, contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(tsNew3, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, contentCache.getConfigCacheBeta().getEncryptedDataKey());
Assert.assertEquals(betaIpsNew4, contentCache.getIps4Beta());
//test remove
boolean removeBeta = ConfigCacheService.removeBeta(dataId, group, tenant);
Assert.assertTrue(removeBeta);
Mockito.verify(configDiskService, times(1)).removeConfigInfo4Beta(dataId, group, tenant);
ConfigCache betaCacheAfterRemove = ConfigCacheService.getContentCache(groupKey).getConfigCacheBeta();
Assert.assertNull(betaCacheAfterRemove);
}
@Test
public void testDumpTag() throws Exception {
String dataId = "dataIdtestDumpTag133323";
String group = "group11";
String tenant = "tenant112";
String content = "mockContnet11";
String tag = "tag12345";
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String encryptedDataKey = "key12345";
long ts = System.currentTimeMillis();
//init dump tag
boolean dumpTagResult = ConfigCacheService.dumpTag(dataId, group, tenant, tag, content, ts, encryptedDataKey);
Assert.assertTrue(dumpTagResult);
Mockito.verify(configDiskService, times(1))
.saveTagToDisk(eq(dataId), eq(group), eq(tenant), eq(tag), eq(content));
CacheItem contentCache = ConfigCacheService.getContentCache(groupKey);
ConfigCache configCacheTag = contentCache.getConfigCacheTags().get(tag);
Assert.assertEquals(ts, configCacheTag.getLastModifiedTs());
String md5 = MD5Utils.md5Hex(content, "UTF-8");
Assert.assertEquals(md5, configCacheTag.getMd5Utf8());
//ts newer ,md5 update
long tsNew = System.currentTimeMillis();
String contentNew = content + tsNew;
String md5New = MD5Utils.md5Hex(contentNew, "UTF-8");
boolean resultNew = ConfigCacheService.dumpTag(dataId, group, tenant, tag, contentNew, tsNew, encryptedDataKey);
Assert.assertTrue(resultNew);
Assert.assertEquals(md5New, configCacheTag.getMd5Utf8());
Assert.assertEquals(tsNew, configCacheTag.getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, configCacheTag.getEncryptedDataKey());
Mockito.verify(configDiskService, times(1))
.saveTagToDisk(eq(dataId), eq(group), eq(tenant), eq(tag), eq(contentNew));
//ts old ,md5 update
long tsOld = tsNew - 1;
String contentWithOldTs = "contentWithOldTs" + tsOld;
boolean resultOld = ConfigCacheService.dumpTag(dataId, group, tenant, tag, contentWithOldTs, tsOld,
encryptedDataKey);
Assert.assertTrue(resultOld);
Assert.assertEquals(md5New, configCacheTag.getMd5Utf8());
Assert.assertEquals(tsNew, configCacheTag.getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, configCacheTag.getEncryptedDataKey());
Mockito.verify(configDiskService, times(0))
.saveTagToDisk(eq(dataId), eq(group), eq(tenant), eq(tag), eq(contentWithOldTs));
//ts new only,md5 not update
long tsNew2 = tsNew + 1;
String contentWithPrev2 = contentNew;
boolean resultNew2 = ConfigCacheService.dumpTag(dataId, group, tenant, tag, contentWithPrev2, tsNew2,
encryptedDataKey);
Assert.assertTrue(resultNew2);
Assert.assertEquals(md5New, configCacheTag.getMd5Utf8());
Assert.assertEquals(tsNew2, configCacheTag.getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, configCacheTag.getEncryptedDataKey());
//ts not update,md5 not update
long tsNew3 = tsNew2;
String contentWithPrev3 = contentNew;
boolean resultNew3 = ConfigCacheService.dumpTag(dataId, group, tenant, tag, contentWithPrev3, tsNew3,
encryptedDataKey);
Assert.assertTrue(resultNew3);
Assert.assertEquals(md5New, configCacheTag.getMd5Utf8());
Assert.assertEquals(tsNew3, configCacheTag.getLastModifiedTs());
Assert.assertEquals(encryptedDataKey, configCacheTag.getEncryptedDataKey());
//test remove
boolean removeTag = ConfigCacheService.removeTag(dataId, group, tenant, tag);
Assert.assertTrue(removeTag);
Mockito.verify(configDiskService, times(1)).removeConfigInfo4Tag(dataId, group, tenant, tag);
Map<String, ConfigCache> configCacheTags = ConfigCacheService.getContentCache(groupKey).getConfigCacheTags();
Assert.assertNull(configCacheTags);
}
}

View File

@ -0,0 +1,301 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.config.server.utils.MD5Util;
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
import com.alibaba.nacos.plugin.control.connection.ConnectionControlManager;
import com.alibaba.nacos.plugin.control.connection.response.ConnectionCheckResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@RunWith(MockitoJUnitRunner.class)
public class LongPollingServiceTest {
LongPollingService longPollingService;
MockedStatic<ConfigCacheService> configCacheServiceMockedStatic;
MockedStatic<ConfigExecutor> configExecutorMocked;
MockedStatic<ControlManagerCenter> connectionControlManagerMockedStatic;
@Mock
ControlManagerCenter controlManagerCenter;
@Mock
ConnectionControlManager connectionControlManager;
MockedStatic<SwitchService> switchServiceMockedStatic;
@Before
public void before() {
longPollingService = new LongPollingService();
switchServiceMockedStatic = Mockito.mockStatic(SwitchService.class);
configCacheServiceMockedStatic = Mockito.mockStatic(ConfigCacheService.class);
configExecutorMocked = Mockito.mockStatic(ConfigExecutor.class);
connectionControlManagerMockedStatic = Mockito.mockStatic(ControlManagerCenter.class);
connectionControlManagerMockedStatic.when(() -> ControlManagerCenter.getInstance())
.thenReturn(controlManagerCenter);
Mockito.when(controlManagerCenter.getConnectionControlManager()).thenReturn(connectionControlManager);
}
@After
public void after() {
configCacheServiceMockedStatic.close();
if (!configExecutorMocked.isClosed()) {
configExecutorMocked.close();
}
connectionControlManagerMockedStatic.close();
switchServiceMockedStatic.close();
}
@Test
public void testAddLongPollingClientHasNotEqualsMd5() throws IOException {
Map<String, String> clientMd5Map = new HashMap<>();
String group = "group";
String tenant = "tenat";
String dataIdEquals = "dataIdEquals0";
String dataIdNotEquals = "dataIdNotEquals0";
String groupKeyEquals = GroupKey.getKeyTenant(dataIdEquals, group, tenant);
String groupKeyNotEquals = GroupKey.getKeyTenant(dataIdNotEquals, group, tenant);
String md5Equals0 = MD5Utils.md5Hex("countEquals0", "UTF-8");
clientMd5Map.put(groupKeyEquals, md5Equals0);
String md5NotEquals1 = MD5Utils.md5Hex("countNotEquals", "UTF-8");
clientMd5Map.put(groupKeyNotEquals, md5NotEquals1);
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER)))
.thenReturn(null);
String clientIp = "192.168.0.1";
Mockito.when(httpServletRequest.getHeader(eq("X-Forwarded-For"))).thenReturn(clientIp);
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(eq(groupKeyNotEquals), eq(md5NotEquals1), eq(clientIp), eq(null)))
.thenReturn(false);
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(eq(groupKeyEquals), eq(md5Equals0), eq(clientIp), eq(null)))
.thenReturn(true);
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
PrintWriter printWriter = Mockito.mock(PrintWriter.class);
Mockito.when(httpServletResponse.getWriter()).thenReturn(printWriter);
int propSize = 3;
longPollingService.addLongPollingClient(httpServletRequest, httpServletResponse, clientMd5Map, propSize);
String responseString = MD5Util.compareMd5ResultString(Arrays.asList(groupKeyNotEquals));
//expect print not equals group
Mockito.verify(printWriter, times(1)).println(eq(responseString));
Mockito.verify(httpServletResponse, times(1)).setStatus(eq(HttpServletResponse.SC_OK));
}
@Test
public void testRejectByConnectionLimit() throws Exception {
//mock connection no limit
ConnectionCheckResponse connectionCheckResponse = new ConnectionCheckResponse();
connectionCheckResponse.setSuccess(false);
Mockito.when(connectionControlManager.check(any())).thenReturn(connectionCheckResponse);
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
PrintWriter printWriter = Mockito.mock(PrintWriter.class);
Mockito.when(httpServletResponse.getWriter()).thenReturn(printWriter);
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER)))
.thenReturn(null);
String clientIp = "192.168.0.1";
Mockito.when(httpServletRequest.getHeader(eq("X-Forwarded-For"))).thenReturn(clientIp);
Mockito.when(httpServletRequest.startAsync()).thenReturn(Mockito.mock(AsyncContext.class));
int propSize = 3;
Map<String, String> clientMd5Map = new HashMap<>();
longPollingService.addLongPollingClient(httpServletRequest, httpServletResponse, clientMd5Map, propSize);
Thread.sleep(3000L);
//expect response not returned
Mockito.verify(httpServletResponse, times(1)).setStatus(eq(503));
}
@Test
public void testAddLongPollingClientAllEqualsMd5() throws IOException {
//mock connection no limit
ConnectionCheckResponse connectionCheckResponse = new ConnectionCheckResponse();
connectionCheckResponse.setSuccess(true);
Mockito.when(connectionControlManager.check(any())).thenReturn(connectionCheckResponse);
Map<String, String> clientMd5Map = new HashMap<>();
String group = "group";
String tenant = "tenat";
String dataIdEquals = "dataIdEquals01";
String dataIdNotEquals = "dataIdNotEquals01";
String groupKeyEquals = GroupKey.getKeyTenant(dataIdEquals, group, tenant);
String groupKeyNotEquals = GroupKey.getKeyTenant(dataIdNotEquals, group, tenant);
String md5Equals0 = MD5Utils.md5Hex("countEquals01", "UTF-8");
clientMd5Map.put(groupKeyEquals, md5Equals0);
String md5NotEquals1 = MD5Utils.md5Hex("countNotEquals1", "UTF-8");
clientMd5Map.put(groupKeyNotEquals, md5NotEquals1);
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_HEADER))).thenReturn("5000");
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER)))
.thenReturn(null);
String clientIp = "192.168.0.1";
Mockito.when(httpServletRequest.getHeader(eq("X-Forwarded-For"))).thenReturn(clientIp);
Mockito.when(httpServletRequest.startAsync()).thenReturn(Mockito.mock(AsyncContext.class));
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(eq(groupKeyNotEquals), eq(md5NotEquals1), eq(clientIp), eq(null)))
.thenReturn(true);
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(eq(groupKeyEquals), eq(md5Equals0), eq(clientIp), eq(null)))
.thenReturn(true);
int propSize = 3;
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
longPollingService.addLongPollingClient(httpServletRequest, httpServletResponse, clientMd5Map, propSize);
//expect response not returned
Mockito.verify(httpServletResponse, times(0)).setStatus(anyInt());
//expect to schedule a task
configExecutorMocked.verify(
() -> ConfigExecutor.executeLongPolling(any(LongPollingService.ClientLongPolling.class)), times(1));
}
@Test
public void testReceiveDataChangeEventAndNotify() throws Exception {
configExecutorMocked.close();
//mock connection no limit
ConnectionCheckResponse connectionCheckResponse = new ConnectionCheckResponse();
connectionCheckResponse.setSuccess(true);
Mockito.when(connectionControlManager.check(any())).thenReturn(connectionCheckResponse);
String dataIdChanged = "dataIdChanged";
String group = "group";
String tenant = "tenant";
String groupKeyChanged = GroupKey.getKeyTenant(dataIdChanged, group, tenant);
Map<String, String> clientMd5Map = new HashMap<>();
clientMd5Map.put(groupKeyChanged, "mockMd5");
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
PrintWriter printWriter = Mockito.mock(PrintWriter.class);
Mockito.when(httpServletResponse.getWriter()).thenReturn(printWriter);
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_HEADER))).thenReturn("5000");
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER)))
.thenReturn(null);
String clientIp = "192.168.0.1";
Mockito.when(httpServletRequest.getHeader(eq("X-Forwarded-For"))).thenReturn(clientIp);
AsyncContext asyncContext = Mockito.mock(AsyncContext.class);
Mockito.when(httpServletRequest.startAsync()).thenReturn(asyncContext);
Mockito.when(asyncContext.getRequest()).thenReturn(httpServletRequest);
Mockito.when(asyncContext.getResponse()).thenReturn(httpServletResponse);
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(anyString(), anyString(), anyString(), eq(null))).thenReturn(true);
longPollingService.addLongPollingClient(httpServletRequest, httpServletResponse, clientMd5Map, 3);
//test getSubscribleInfo by groupKey
SampleResult subscribleInfo = longPollingService.getCollectSubscribleInfo(dataIdChanged, group, tenant);
Map<String, String> lisentersGroupkeyStatus = subscribleInfo.getLisentersGroupkeyStatus();
Assert.assertFalse(lisentersGroupkeyStatus.isEmpty());
Assert.assertEquals("mockMd5", lisentersGroupkeyStatus.get(clientIp));
SampleResult collectSubscribleInfoByIp = longPollingService.getCollectSubscribleInfoByIp(clientIp);
Map<String, String> lisentersGroupkeyStatus1 = collectSubscribleInfoByIp.getLisentersGroupkeyStatus();
Assert.assertFalse(lisentersGroupkeyStatus1.isEmpty());
Assert.assertEquals("mockMd5", lisentersGroupkeyStatus1.get(groupKeyChanged));
//test receive config change event
LocalDataChangeEvent localDataChangeEvent = new LocalDataChangeEvent(groupKeyChanged);
NotifyCenter.publishEvent(localDataChangeEvent);
Thread.sleep(1100L);
String responseString = MD5Util.compareMd5ResultString(Arrays.asList(groupKeyChanged));
//expect print not equals group
Mockito.verify(printWriter, times(1)).println(eq(responseString));
Mockito.verify(asyncContext, times(1)).complete();
}
@Test
public void testLongPollingTimeout() throws Exception {
configExecutorMocked.close();
String dataIdChanged = "dataIdChanged";
String group = "group";
String tenant = "tenant";
String groupKeyChanged = GroupKey.getKeyTenant(dataIdChanged, group, tenant);
//mock connection no limit
ConnectionCheckResponse connectionCheckResponse = new ConnectionCheckResponse();
connectionCheckResponse.setSuccess(true);
Mockito.when(connectionControlManager.check(any())).thenReturn(connectionCheckResponse);
Map<String, String> clientMd5Map = new HashMap<>();
clientMd5Map.put(groupKeyChanged, "md5");
switchServiceMockedStatic.when(() -> SwitchService.getSwitchInteger(eq("MIN_LONG_POOLING_TIMEOUT"), eq(10000)))
.thenReturn(1000);
HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_HEADER))).thenReturn("1000");
Mockito.when(httpServletRequest.getHeader(eq(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER)))
.thenReturn(null);
String clientIp = "192.168.0.1";
Mockito.when(httpServletRequest.getHeader(eq("X-Forwarded-For"))).thenReturn(clientIp);
AsyncContext asyncContext = Mockito.mock(AsyncContext.class);
Mockito.when(httpServletRequest.startAsync()).thenReturn(asyncContext);
Mockito.when(asyncContext.getRequest()).thenReturn(httpServletRequest);
configCacheServiceMockedStatic.when(
() -> ConfigCacheService.isUptodate(anyString(), anyString(), anyString(), eq(null))).thenReturn(true);
HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class);
longPollingService.addLongPollingClient(httpServletRequest, httpServletResponse, clientMd5Map, 3);
//wait time out condition arrived.
Thread.sleep(1200L);
//expect print not equals group
Mockito.verify(asyncContext, times(1)).complete();
}
}

View File

@ -35,6 +35,7 @@ import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.RowMapper;
@ -43,6 +44,8 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@ -167,6 +170,16 @@ public class GroupCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test"), eq(1))).thenReturn(1);
Assert.assertTrue(service.incrementUsageWithDefaultQuotaLimit(groupCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test"), eq(1))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsageWithDefaultQuotaLimit(groupCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -178,6 +191,16 @@ public class GroupCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test2"))).thenReturn(1);
Assert.assertTrue(service.incrementUsageWithQuotaLimit(groupCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test2"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsageWithQuotaLimit(groupCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -190,6 +213,16 @@ public class GroupCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test3"))).thenReturn(1);
Assert.assertTrue(service.incrementUsage(groupCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test3"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsage(groupCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -201,6 +234,16 @@ public class GroupCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test4"))).thenReturn(1);
Assert.assertTrue(service.decrementUsage(groupCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test4"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.decrementUsage(groupCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -238,6 +281,41 @@ public class GroupCapacityPersistServiceTest {
});
Assert.assertTrue(service.updateGroupCapacity(group, quota, maxSize, maxAggrCount, maxAggrSize));
//mock get connection fail
when(jdbcTemplate.update(anyString(), any(Object.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.updateGroupCapacity(group, quota, maxSize, maxAggrCount, maxAggrSize);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
public void testGroupCapacityRowMapper() throws SQLException {
GroupCapacityPersistService.GroupCapacityRowMapper groupCapacityRowMapper = new GroupCapacityPersistService.GroupCapacityRowMapper();
ResultSet rs = Mockito.mock(ResultSet.class);
int quota = 12345;
Mockito.when(rs.getInt(eq("quota"))).thenReturn(quota);
int usage = 1244;
Mockito.when(rs.getInt(eq("usage"))).thenReturn(usage);
int maxSize = 123;
Mockito.when(rs.getInt(eq("max_size"))).thenReturn(maxSize);
int maxAggrCount = 123;
Mockito.when(rs.getInt(eq("max_aggr_count"))).thenReturn(maxAggrCount);
int maxAggrSize = 123;
Mockito.when(rs.getInt(eq("max_aggr_size"))).thenReturn(maxAggrSize);
String group = "testG";
Mockito.when(rs.getString(eq("group_id"))).thenReturn(group);
GroupCapacity groupCapacity = groupCapacityRowMapper.mapRow(rs, 1);
Assert.assertEquals(quota, groupCapacity.getQuota().intValue());
Assert.assertEquals(usage, groupCapacity.getUsage().intValue());
Assert.assertEquals(maxSize, groupCapacity.getMaxSize().intValue());
Assert.assertEquals(maxAggrCount, groupCapacity.getMaxAggrCount().intValue());
Assert.assertEquals(maxAggrSize, groupCapacity.getMaxAggrSize().intValue());
Assert.assertEquals(group, groupCapacity.getGroup());
}
@Test
@ -269,7 +347,7 @@ public class GroupCapacityPersistServiceTest {
String group = "test3";
argList.add(group);
when(jdbcTemplate.update(anyString(), eq(3), eq(timestamp), eq(group))).thenReturn(1);
Assert.assertTrue(service.updateMaxSize(group, maxSize));
}
@ -285,6 +363,16 @@ public class GroupCapacityPersistServiceTest {
group = "test";
when(jdbcTemplate.update(anyString(), eq(group), eq(timestamp), eq(group))).thenReturn(1);
Assert.assertTrue(service.correctUsage(group, timestamp));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(group), eq(timestamp), eq(group))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.correctUsage(group, timestamp);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -303,6 +391,16 @@ public class GroupCapacityPersistServiceTest {
Assert.assertEquals(list.size(), ret.size());
Assert.assertEquals(groupCapacity.getGroup(), ret.get(0).getGroup());
//mock get connection fail
when(jdbcTemplate.query(anyString(), eq(new Object[] {lastId, pageSize}), any(RowMapper.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.getCapacityList4CorrectUsage(lastId, pageSize);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -310,5 +408,15 @@ public class GroupCapacityPersistServiceTest {
when(jdbcTemplate.update(any(PreparedStatementCreator.class))).thenReturn(1);
Assert.assertTrue(service.deleteGroupCapacity("test"));
//mock get connection fail
when(jdbcTemplate.update(any(PreparedStatementCreator.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.deleteGroupCapacity("test");
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
}

View File

@ -32,6 +32,7 @@ import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.RowMapper;
@ -40,6 +41,8 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@ -100,6 +103,16 @@ public class TenantCapacityPersistServiceTest {
TenantCapacity capacity = new TenantCapacity();
capacity.setTenant("test");
Assert.assertTrue(service.insertTenantCapacity(capacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq("test"), eq(null), eq(null), eq(null), eq(null), eq(null), eq(null),
eq("test"))).thenThrow(new CannotGetJdbcConnectionException("conn fail"));
try {
service.insertTenantCapacity(capacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -113,6 +126,16 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test"), eq(1))).thenReturn(1);
Assert.assertTrue(service.incrementUsageWithDefaultQuotaLimit(tenantCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test"), eq(1))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsageWithDefaultQuotaLimit(tenantCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -125,6 +148,16 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test2"))).thenReturn(1);
Assert.assertTrue(service.incrementUsageWithQuotaLimit(tenantCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test2"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsageWithQuotaLimit(tenantCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -137,6 +170,16 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test3"))).thenReturn(1);
Assert.assertTrue(service.incrementUsage(tenantCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test3"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.incrementUsage(tenantCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -149,6 +192,16 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test4"))).thenReturn(1);
Assert.assertTrue(service.decrementUsage(tenantCapacity));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(timestamp), eq("test4"))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.decrementUsage(tenantCapacity);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -207,6 +260,16 @@ public class TenantCapacityPersistServiceTest {
return 0;
});
Assert.assertTrue(service.updateQuota(tenant, quota));
//mock get connection fail
when(jdbcTemplate.update(anyString(), any(Object.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.updateQuota(tenant, quota);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -217,6 +280,16 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(anyString(), eq(tenant), eq(timestamp), eq(tenant))).thenReturn(1);
Assert.assertTrue(service.correctUsage(tenant, timestamp));
//mock get connection fail
when(jdbcTemplate.update(anyString(), eq(tenant), eq(timestamp), eq(tenant))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.correctUsage(tenant, timestamp);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -235,6 +308,16 @@ public class TenantCapacityPersistServiceTest {
Assert.assertEquals(list.size(), ret.size());
Assert.assertEquals(tenantCapacity.getTenant(), ret.get(0).getTenant());
//mock get connection fail
when(jdbcTemplate.query(anyString(), eq(new Object[] {lastId, pageSize}), any(RowMapper.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.getCapacityList4CorrectUsage(lastId, pageSize);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
@ -242,5 +325,41 @@ public class TenantCapacityPersistServiceTest {
when(jdbcTemplate.update(any(PreparedStatementCreator.class))).thenReturn(1);
Assert.assertTrue(service.deleteTenantCapacity("test"));
//mock get connection fail
when(jdbcTemplate.update(any(PreparedStatementCreator.class))).thenThrow(
new CannotGetJdbcConnectionException("conn fail"));
try {
service.deleteTenantCapacity("test");
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertEquals("conn fail", e.getMessage());
}
}
@Test
public void testTenantCapacityRowMapper() throws SQLException {
TenantCapacityPersistService.TenantCapacityRowMapper groupCapacityRowMapper = new TenantCapacityPersistService.TenantCapacityRowMapper();
ResultSet rs = Mockito.mock(ResultSet.class);
int quota = 12345;
Mockito.when(rs.getInt(eq("quota"))).thenReturn(quota);
int usage = 1244;
Mockito.when(rs.getInt(eq("usage"))).thenReturn(usage);
int maxSize = 123;
Mockito.when(rs.getInt(eq("max_size"))).thenReturn(maxSize);
int maxAggrCount = 123;
Mockito.when(rs.getInt(eq("max_aggr_count"))).thenReturn(maxAggrCount);
int maxAggrSize = 123;
Mockito.when(rs.getInt(eq("max_aggr_size"))).thenReturn(maxAggrSize);
String tenant = "testTeat";
Mockito.when(rs.getString(eq("tenant_id"))).thenReturn(tenant);
TenantCapacity groupCapacity = groupCapacityRowMapper.mapRow(rs, 1);
Assert.assertEquals(quota, groupCapacity.getQuota().intValue());
Assert.assertEquals(usage, groupCapacity.getUsage().intValue());
Assert.assertEquals(maxSize, groupCapacity.getMaxSize().intValue());
Assert.assertEquals(maxAggrCount, groupCapacity.getMaxAggrCount().intValue());
Assert.assertEquals(maxAggrSize, groupCapacity.getMaxAggrSize().intValue());
Assert.assertEquals(tenant, groupCapacity.getTenant());
}
}

View File

@ -395,12 +395,12 @@ public class ExternalConfigInfoPersistServiceImplTest {
String srcUser = "srcUser";
//mock update config info cas
Mockito.when(
jdbcTemplate.update(anyString(), eq(content), eq(MD5Utils.md5Hex(content, Constants.PERSIST_ENCODE)),
eq(srcIp), eq(srcUser), any(Timestamp.class), eq(configInfoWrapperOld.getAppName()),
eq(configAdvanceInfo.get("desc")), eq(configAdvanceInfo.get("use")),
eq(configAdvanceInfo.get("effect")), eq(configAdvanceInfo.get("type")),
eq(configAdvanceInfo.get("schema")), eq(dataId), eq(group), eq(tenant), eq(casMd5)))
.thenReturn(1);
jdbcTemplate.update(anyString(), eq(content), eq(MD5Utils.md5Hex(content, Constants.PERSIST_ENCODE)),
eq(srcIp), eq(srcUser), any(Timestamp.class), eq(configInfoWrapperOld.getAppName()),
eq(configAdvanceInfo.get("desc")), eq(configAdvanceInfo.get("use")),
eq(configAdvanceInfo.get("effect")), eq(configAdvanceInfo.get("type")),
eq(configAdvanceInfo.get("schema")), eq(encryptedDataKey), eq(dataId), eq(group), eq(tenant),
eq(casMd5))).thenReturn(1);
//mock insert config tags.
Mockito.when(jdbcTemplate.update(
@ -422,7 +422,8 @@ public class ExternalConfigInfoPersistServiceImplTest {
eq(srcUser), any(Timestamp.class), eq(configInfoWrapperOld.getAppName()),
eq(configAdvanceInfo.get("desc")), eq(configAdvanceInfo.get("use")),
eq(configAdvanceInfo.get("effect")), eq(configAdvanceInfo.get("type")),
eq(configAdvanceInfo.get("schema")), eq(dataId), eq(group), eq(tenant), eq(casMd5));
eq(configAdvanceInfo.get("schema")), eq(encryptedDataKey), eq(dataId), eq(group), eq(tenant),
eq(casMd5));
//expect update config tags
Mockito.verify(jdbcTemplate, times(1)).update(eq(

View File

@ -39,7 +39,8 @@ import io.grpc.netty.shaded.io.netty.channel.Channel;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* grpc connection.
@ -69,43 +70,33 @@ public class GrpcConnection extends Connection {
*/
public void sendRequestNoAck(Request request) throws NacosException {
sendQueueBlockCheck();
final AtomicReference<NacosRuntimeException> exception = new AtomicReference<>();
final DefaultRequestFuture future = new DefaultRequestFuture(this.getMetaInfo().getConnectionId(), "0");
this.channel.eventLoop().execute(() -> {
Future<Boolean> executeFuture = this.channel.eventLoop().submit(() -> {
//StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {
try {
Payload payload = GrpcUtils.convert(request);
traceIfNecessary(payload);
streamObserver.onNext(payload);
future.setResponse(new Response() {
@Override
public String getMessage() {
return "";
}
});
return true;
} catch (Throwable e) {
if (e instanceof StatusRuntimeException) {
exception.set(new ConnectionAlreadyClosedException(e));
throw new ConnectionAlreadyClosedException(e);
} else if (e instanceof IllegalStateException) {
exception.set(new ConnectionAlreadyClosedException(e));
} else {
exception.set(new NacosRuntimeException(NacosException.SERVER_ERROR, e));
throw new ConnectionAlreadyClosedException(e);
}
future.setFailResult(exception.get());
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
});
try {
future.get();
} catch (Exception e) {
//ignore
executeFuture.get();
} catch (Throwable throwable) {
if (throwable instanceof ExecutionException && throwable.getCause() != null
&& throwable.getCause() instanceof NacosRuntimeException) {
throw (NacosRuntimeException) throwable.getCause();
}
throw new NacosRuntimeException(NacosException.SERVER_ERROR, throwable);
}
if (exception.get() != null) {
throw exception.get();
}
}
private void sendQueueBlockCheck() {
@ -144,8 +135,8 @@ public class GrpcConnection extends Connection {
Loggers.REMOTE_DIGEST.info("[{}]Send request to client ,payload={}", connectionId,
payload.toByteString().toStringUtf8());
} catch (Throwable throwable) {
Loggers.REMOTE_DIGEST
.warn("[{}]Send request to client trace error, ,error={}", connectionId, throwable);
Loggers.REMOTE_DIGEST.warn("[{}]Send request to client trace error, ,error={}", connectionId,
throwable);
}
}
}
@ -194,7 +185,7 @@ public class GrpcConnection extends Connection {
if (isTraced()) {
Loggers.REMOTE_DIGEST.warn("[{}] try to close connection ", connectionId);
}
try {
closeBiStream();
} catch (Throwable e) {

View File

@ -77,8 +77,10 @@ public class GrpcConnectionTest {
String ip = "1.1.1.1";
ConnectionMeta connectionMeta = new ConnectionMeta("connectId" + System.currentTimeMillis(), ip, ip, 8888, 9848,
"GRPC", "", "", new HashMap<>());
Mockito.when(channel.isOpen()).thenReturn(true);
Mockito.when(channel.isActive()).thenReturn(true);
connection = new GrpcConnection(connectionMeta, streamObserver, channel);
connection.setTraced(true);
}
@ -105,7 +107,7 @@ public class GrpcConnectionTest {
Mockito.doReturn(true).when(streamObserver).isReady();
try {
connection.request(new NotifySubscriberRequest(), 3000L);
connection.request(new NotifySubscriberRequest(), 1000L);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertTrue(e instanceof ConnectionAlreadyClosedException);
@ -132,7 +134,7 @@ public class GrpcConnectionTest {
public void testNormal() {
Mockito.doReturn(new DefaultEventLoop()).when(channel).eventLoop();
Mockito.doReturn(true).when(streamObserver).isReady();
Assert.assertTrue(connection.isConnected());
try {
new Thread(new Runnable() {
@Override
@ -200,4 +202,13 @@ public class GrpcConnectionTest {
Assert.assertTrue(connection.getMetaInfo().pushQueueBlockTimesLastOver(3000));
}
@Test
public void testClose() {
Mockito.doThrow(new IllegalStateException()).when(streamObserver).onCompleted();
connection.close();
Mockito.verify(channel, Mockito.times(1)).close();
}
}

View File

@ -36,6 +36,8 @@ public class FieldConstant {
public static final String APP_NAME = "app_name";
public static final String ENCRYPTED_DATA_KEY = "encrypted_data_key";
public static final String START_ROW = "startRow";
public static final String PAGE_SIZE = "pageSize";

View File

@ -499,13 +499,13 @@ public interface ConfigInfoMapper extends Mapper {
paramList.add(context.getUpdateParameter(FieldConstant.EFFECT));
paramList.add(context.getUpdateParameter(FieldConstant.TYPE));
paramList.add(context.getUpdateParameter(FieldConstant.C_SCHEMA));
paramList.add(context.getUpdateParameter(FieldConstant.ENCRYPTED_DATA_KEY));
paramList.add(context.getWhereParameter(FieldConstant.DATA_ID));
paramList.add(context.getWhereParameter(FieldConstant.GROUP_ID));
paramList.add(context.getWhereParameter(FieldConstant.TENANT_ID));
paramList.add(context.getWhereParameter(FieldConstant.MD5));
String sql = "UPDATE config_info SET "
+ "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?, app_name=?,c_desc=?,c_use=?,effect=?,type=?,c_schema=? "
String sql = "UPDATE config_info SET " + "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,"
+ " app_name=?,c_desc=?,c_use=?,effect=?,type=?,c_schema=?,encrypted_data_key=? "
+ "WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')";
return new MapperResult(sql, paramList);
}

View File

@ -323,7 +323,7 @@ public class ConfigInfoMapperByDerbyTest {
Object effect = "effect";
Object type = "type";
Object schema = "schema";
String encrypedDataKey = "key5678";
context.putUpdateParameter(FieldConstant.CONTENT, newContent);
context.putUpdateParameter(FieldConstant.MD5, newMD5);
context.putUpdateParameter(FieldConstant.SRC_IP, srcIp);
@ -335,8 +335,8 @@ public class ConfigInfoMapperByDerbyTest {
context.putUpdateParameter(FieldConstant.EFFECT, effect);
context.putUpdateParameter(FieldConstant.TYPE, type);
context.putUpdateParameter(FieldConstant.C_SCHEMA, schema);
Object dataId = "dataId";
context.putUpdateParameter(FieldConstant.ENCRYPTED_DATA_KEY, encrypedDataKey);
Object dataId = "dataId00";
Object group = "group";
Object md5 = "md5";
@ -347,10 +347,11 @@ public class ConfigInfoMapperByDerbyTest {
MapperResult mapperResult = configInfoMapperByDerby.updateConfigInfoAtomicCas(context);
Assert.assertEquals(mapperResult.getSql(), "UPDATE config_info SET "
+ "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?, app_name=?,c_desc=?,c_use=?,effect=?,type=?,c_schema=? "
+ "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?, app_name=?,c_desc=?,c_use=?,"
+ "effect=?,type=?,c_schema=?,encrypted_data_key=? "
+ "WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')");
Assert.assertArrayEquals(mapperResult.getParamList().toArray(),
new Object[] {newContent, newMD5, srcIp, srcUser, time, appNameTmp, desc, use, effect, type, schema,
dataId, group, tenantId, md5});
encrypedDataKey, dataId, group, tenantId, md5});
}
}

View File

@ -322,7 +322,7 @@ public class ConfigInfoMapperByMySqlTest {
Object effect = "effect";
Object type = "type";
Object schema = "schema";
String encryptedDataKey = "ey456789";
context.putUpdateParameter(FieldConstant.CONTENT, newContent);
context.putUpdateParameter(FieldConstant.MD5, newMD5);
context.putUpdateParameter(FieldConstant.SRC_IP, srcIp);
@ -334,7 +334,7 @@ public class ConfigInfoMapperByMySqlTest {
context.putUpdateParameter(FieldConstant.EFFECT, effect);
context.putUpdateParameter(FieldConstant.TYPE, type);
context.putUpdateParameter(FieldConstant.C_SCHEMA, schema);
context.putUpdateParameter(FieldConstant.ENCRYPTED_DATA_KEY, encryptedDataKey);
Object dataId = "dataId";
Object group = "group";
Object md5 = "md5";
@ -346,10 +346,11 @@ public class ConfigInfoMapperByMySqlTest {
MapperResult mapperResult = configInfoMapperByMySql.updateConfigInfoAtomicCas(context);
Assert.assertEquals(mapperResult.getSql(), "UPDATE config_info SET "
+ "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?, app_name=?,c_desc=?,c_use=?,effect=?,type=?,c_schema=? "
+ "content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?, app_name=?,c_desc=?,"
+ "c_use=?,effect=?,type=?,c_schema=?,encrypted_data_key=? "
+ "WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')");
Assert.assertArrayEquals(mapperResult.getParamList().toArray(),
new Object[] {newContent, newMD5, srcIp, srcUser, time, appNameTmp, desc, use, effect, type, schema,
dataId, group, tenantId, md5});
encryptedDataKey, dataId, group, tenantId, md5});
}
}

View File

@ -103,11 +103,11 @@ public class EnvUtil {
private static final String NACOS_TEMP_DIR_1 = "data";
private static final String NACOS_TEMP_DIR_2 = "tmp";
private static final String NACOS_CUSTOM_ENVIRONMENT_ENABLED = "nacos.custom.environment.enabled";
private static final String NACOS_CUSTOM_CONFIG_NAME = "customFirstNacosConfig";
@JustForTest
private static String confPath = "";
@ -115,7 +115,7 @@ public class EnvUtil {
private static String nacosHomePath = null;
private static ConfigurableEnvironment environment;
/**
* customEnvironment.
*/
@ -127,12 +127,13 @@ public class EnvUtil {
for (String key : propertyKeys) {
sourcePropertyMap.put(key, getProperty(key, Object.class));
}
Map<String, Object> targetMap = CustomEnvironmentPluginManager.getInstance().getCustomValues(sourcePropertyMap);
Map<String, Object> targetMap = CustomEnvironmentPluginManager.getInstance()
.getCustomValues(sourcePropertyMap);
MutablePropertySources propertySources = environment.getPropertySources();
propertySources.addFirst(new MapPropertySource(NACOS_CUSTOM_CONFIG_NAME, targetMap));
}
}
public static ConfigurableEnvironment getEnvironment() {
return environment;
}
@ -203,6 +204,10 @@ public class EnvUtil {
EnvUtil.localAddress = localAddress;
}
public static void systemExit() {
System.exit(0);
}
public static int getPort() {
if (port == -1) {
port = getProperty(SERVER_PORT_PROPERTY, Integer.class, DEFAULT_SERVER_PORT);
@ -302,8 +307,8 @@ public class EnvUtil {
}
public static float getMem() {
return (float) (1 - OperatingSystemBeanManager.getFreePhysicalMem() / OperatingSystemBeanManager
.getTotalPhysicalMem());
return (float) (1
- OperatingSystemBeanManager.getFreePhysicalMem() / OperatingSystemBeanManager.getTotalPhysicalMem());
}
public static String getConfPath() {