fix config test case and remove useless code (#11521)

* 修复客户端cmsgc导致服务端推送积压直接内存oom问题

* 修复ut及bug

* 修复ut及bug,checkstyle

* 修复ut及bug

* 修复ut

* 修复覆盖率

* 修复测试用例

* 修复测试用例,覆盖率

* 修复测试用例,覆盖率,删除无用代码

* 修复测试用例,覆盖率,pmd&checkstyle

* fix test case
This commit is contained in:
nov.lzf 2023-12-18 13:42:28 +08:00 committed by GitHub
parent 790bb197cf
commit ca9d55e264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1282 additions and 7261 deletions

View File

@ -1,121 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.model.app;
import com.alibaba.nacos.sys.utils.InetUtils;
/**
* ApplicationInfo.
*
* @author Nacos
*/
public class ApplicationInfo {
private static final long LOCK_EXPIRE_DURATION = 30 * 1000L;
private static final long RECENTLY_DURATION = 24 * 60 * 60 * 1000L;
private String appName;
private boolean isDynamicCollectDisabled = false;
private long lastSubscribeInfoCollectedTime = 0L;
private String subInfoCollectLockOwner = null;
private long subInfoCollectLockExpireTime = 0L;
public ApplicationInfo(String appName) {
this.appName = appName;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public boolean isDynamicCollectDisabled() {
return isDynamicCollectDisabled;
}
public void setDynamicCollectDisabled(boolean isDynamicCollectDisabled) {
this.isDynamicCollectDisabled = isDynamicCollectDisabled;
}
public long getLastSubscribeInfoCollectedTime() {
return lastSubscribeInfoCollectedTime;
}
public void setLastSubscribeInfoCollectedTime(long lastSubscribeInfoCollectedTime) {
this.lastSubscribeInfoCollectedTime = lastSubscribeInfoCollectedTime;
}
public String getSubInfoCollectLockOwner() {
return subInfoCollectLockOwner;
}
public void setSubInfoCollectLockOwner(String subInfoCollectLockOwner) {
this.subInfoCollectLockOwner = subInfoCollectLockOwner;
}
public long getSubInfoCollectLockExpireTime() {
return subInfoCollectLockExpireTime;
}
public void setSubInfoCollectLockExpireTime(long subInfoCollectLockExpireTime) {
this.subInfoCollectLockExpireTime = subInfoCollectLockExpireTime;
}
/**
* Judge whether sub info collected recently or not.
*
* @return The result whether sub info collected recently or not.
*/
public boolean isSubInfoRecentlyCollected() {
if (System.currentTimeMillis() - this.lastSubscribeInfoCollectedTime < RECENTLY_DURATION) {
return true;
}
return false;
}
/**
* Judge whether current server own the lock or not.
*
* @return The result whether current server own the lock or not.
*/
public boolean canCurrentServerOwnTheLock() {
boolean currentOwnerIsMe =
subInfoCollectLockOwner == null || InetUtils.getSelfIP().equals(subInfoCollectLockOwner);
if (currentOwnerIsMe) {
return true;
}
if (System.currentTimeMillis() - this.subInfoCollectLockExpireTime > LOCK_EXPIRE_DURATION) {
return true;
}
return false;
}
public String currentServer() {
return InetUtils.getSelfIP();
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.model.app;
/**
* ApplicationPublishRecord.
*
* @author Nacos
*/
public class ApplicationPublishRecord {
private String appName;
private GroupKey configInfo;
public ApplicationPublishRecord(String appName, String dataId, String groupId) {
this.appName = appName;
this.configInfo = new GroupKey(dataId, groupId);
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public GroupKey getConfigInfo() {
return configInfo;
}
public void setConfigInfo(GroupKey configInfo) {
this.configInfo = configInfo;
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.model.app;
import com.alibaba.nacos.config.server.utils.GroupKey2;
/**
* GroupKey.
*
* @author Nacos
*/
public class GroupKey extends GroupKey2 {
private String dataId;
private String group;
public GroupKey(String dataId, String group) {
this.dataId = dataId;
this.group = group;
}
public GroupKey(String groupKeyString) {
String[] groupKeys = parseKey(groupKeyString);
this.dataId = groupKeys[0];
this.group = groupKeys[1];
}
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
@Override
public String toString() {
return dataId + "+" + group;
}
public String getGroupkeyString() {
return getKey(dataId, group);
}
//TODO : equal as we use Set
}

View File

@ -1,163 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.model.app;
/**
* Created by qingliang on 2017/7/20.
*
* @author Nacos
*/
public class MonitorInfo {
/**
* Total memory can use.
*/
private long totalMemory;
/**
* Free memory.
*/
private long freeMemory;
/**
* Max memory can use.
*/
private volatile long maxMemory;
/**
* Cpu ratio.
*/
private double cpuRatio;
/**
* System load.
*/
private double load;
/**
* Young gc time counter.
*/
private int ygc;
/**
* Young gc time.
*/
private double ygct;
/**
* Full gc time counter.
*/
private int fgc;
/**
* Full gc time.
*/
private double fgct;
/**
* Gc time.
*/
private double gct;
public long getFreeMemory() {
return freeMemory;
}
public void setFreeMemory(long freeMemory) {
this.freeMemory = freeMemory;
}
public long getMaxMemory() {
return maxMemory;
}
public void setMaxMemory(long maxMemory) {
this.maxMemory = maxMemory;
}
public long getTotalMemory() {
return totalMemory;
}
public void setTotalMemory(long totalMemory) {
this.totalMemory = totalMemory;
}
public double getCpuRatio() {
return cpuRatio;
}
public void setCpuRatio(int cpuRatio) {
this.cpuRatio = cpuRatio;
}
public double getLoad() {
return load;
}
public void setLoad(int load) {
this.load = load;
}
public int getYgc() {
return ygc;
}
public void setYgc(int ygc) {
this.ygc = ygc;
}
public double getYgct() {
return ygct;
}
public void setYgct(int ygct) {
this.ygct = ygct;
}
public int getFgc() {
return fgc;
}
public void setFgc(int fgc) {
this.fgc = fgc;
}
public double getFgct() {
return fgct;
}
public void setFgct(int fgct) {
this.fgct = fgct;
}
public double getGct() {
return gct;
}
public void setGct(int gct) {
this.gct = gct;
}
@Override
public String toString() {
return "MonitorInfo{" + "totalMemory=" + totalMemory + ", freeMemory=" + freeMemory + ", maxMemory=" + maxMemory
+ ", cpuRatio=" + cpuRatio + ", load=" + load + ", ygc=" + ygc + ", ygct=" + ygct + ", fgc=" + fgc
+ ", fgct=" + fgct + ", gct=" + gct + '}';
}
}

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSy
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.service.dump.DumpRequest;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.core.paramcheck.ExtractorManager;
import com.alibaba.nacos.core.paramcheck.impl.ConfigRequestParamExtractor;
@ -50,20 +50,13 @@ public class ConfigChangeClusterSyncRequestHandler
public ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest,
RequestMeta meta) throws NacosException {
if (configChangeSyncRequest.isBeta()) {
dumpService.dumpBeta(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else if (configChangeSyncRequest.isBatch()) {
dumpService.dumpBatch(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else if (StringUtils.isNotBlank(configChangeSyncRequest.getTag())) {
dumpService.dumpTag(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getTag(),
configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else {
dumpService.dumpFormal(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
}
DumpRequest dumpRequest = DumpRequest.create(configChangeSyncRequest.getDataId(),
configChangeSyncRequest.getGroup(), configChangeSyncRequest.getTenant(),
configChangeSyncRequest.getLastModified(), meta.getClientIp());
dumpRequest.setBeta(configChangeSyncRequest.isBeta());
dumpRequest.setBatch(configChangeSyncRequest.isBatch());
dumpRequest.setTag(configChangeSyncRequest.getTag());
dumpService.dump(dumpRequest);
return new ConfigChangeClusterSyncResponse();
}

View File

@ -34,7 +34,6 @@ import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistServi
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ParamUtils;
import com.alibaba.nacos.config.server.utils.TimeUtils;
import com.alibaba.nacos.core.control.TpsControl;
import com.alibaba.nacos.core.paramcheck.ExtractorManager;
import com.alibaba.nacos.core.paramcheck.impl.ConfigRequestParamExtractor;
@ -44,7 +43,6 @@ import com.alibaba.nacos.plugin.auth.constant.ActionTypes;
import com.alibaba.nacos.plugin.auth.constant.SignType;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
@ -109,7 +107,6 @@ public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishReq
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setMd5(request.getCasMd5());
configInfo.setType(type);

View File

@ -28,7 +28,6 @@ import com.alibaba.nacos.config.server.model.ConfigInfoBase;
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskServiceFactory;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.DiskUtil;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.persistence.configuration.DatasourceConfiguration;
@ -36,10 +35,8 @@ import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacos.config.server.constant.Constants.ENCODE_UTF8;
@ -68,14 +65,14 @@ public class ConfigCacheService {
private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap<>();
private static ConfigInfoPersistService configInfoPersistService;
public static ConfigInfoPersistService getConfigInfoPersistService() {
if (configInfoPersistService == null) {
configInfoPersistService = ApplicationUtils.getBean(ConfigInfoPersistService.class);
}
return configInfoPersistService;
}
public static int groupCount() {
return CACHE.size();
}
@ -430,7 +427,8 @@ public class ConfigCacheService {
aggreds = config.getContent();
}
} else {
aggreds = DiskUtil.getConfig(AggrWhitelist.AGGRIDS_METADATA, "DEFAULT_GROUP", StringUtils.EMPTY);
aggreds = ConfigDiskServiceFactory.getInstance()
.getContent(AggrWhitelist.AGGRIDS_METADATA, "DEFAULT_GROUP", StringUtils.EMPTY);
}
if (aggreds != null) {
AggrWhitelist.load(aggreds);
@ -448,8 +446,8 @@ public class ConfigCacheService {
clientIpWhitelist = config.getContent();
}
} else {
clientIpWhitelist = DiskUtil.getConfig(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA, "DEFAULT_GROUP",
StringUtils.EMPTY);
clientIpWhitelist = ConfigDiskServiceFactory.getInstance()
.getContent(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA, "DEFAULT_GROUP", StringUtils.EMPTY);
}
if (clientIpWhitelist != null) {
ClientIpWhiteList.load(clientIpWhitelist);
@ -461,14 +459,14 @@ public class ConfigCacheService {
String switchContent = null;
try {
if (DatasourceConfiguration.isEmbeddedStorage()) {
ConfigInfoBase config = getConfigInfoPersistService().findConfigInfoBase(SwitchService.SWITCH_META_DATA_ID,
"DEFAULT_GROUP");
ConfigInfoBase config = getConfigInfoPersistService().findConfigInfoBase(
SwitchService.SWITCH_META_DATA_ID, "DEFAULT_GROUP");
if (config != null) {
switchContent = config.getContent();
}
} else {
switchContent = DiskUtil.getConfig(SwitchService.SWITCH_META_DATA_ID, "DEFAULT_GROUP",
StringUtils.EMPTY);
switchContent = ConfigDiskServiceFactory.getInstance()
.getContent(SwitchService.SWITCH_META_DATA_ID, "DEFAULT_GROUP", StringUtils.EMPTY);
}
if (switchContent != null) {
SwitchService.load(switchContent);
@ -478,36 +476,6 @@ public class ConfigCacheService {
}
}
/**
* Check md5.
*
* @return return diff result list.
*/
public static List<String> checkMd5() {
List<String> diffList = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (Entry<String/* groupKey */, CacheItem> entry : CACHE.entrySet()) {
String groupKey = entry.getKey();
String[] dg = GroupKey2.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
try {
String localMd5 = ConfigDiskServiceFactory.getInstance()
.getLocalConfigMd5(dataId, group, tenant, ENCODE_UTF8);
if (!entry.getValue().getConfigCache().getMd5(ENCODE_UTF8).equals(localMd5)) {
DEFAULT_LOG.warn("[md5-different] dataId:{},group:{}", dataId, group);
diffList.add(groupKey);
}
} catch (IOException e) {
DEFAULT_LOG.error("getLocalConfigMd5 fail,dataId:{},group:{}", dataId, group);
}
}
long endTime = System.currentTimeMillis();
DEFAULT_LOG.warn("checkMd5 cost:{}; diffCount:{}", endTime - startTime, diffList.size());
return diffList;
}
/**
* Delete config file, and delete cache.
*

View File

@ -16,17 +16,21 @@
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
import com.alibaba.nacos.config.server.service.notify.HttpClientManager;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -89,8 +93,8 @@ public class ConfigSubService {
try {
completionService.submit(new Job(ip.getAddress(), url, params));
} catch (Exception e) { // Send request failed.
LogUtil.DEFAULT_LOG
.warn("Get client info from {} with exception: {} during submit job", ip, e.getMessage());
LogUtil.DEFAULT_LOG.warn("Get client info from {} with exception: {} during submit job", ip,
e.getMessage());
}
}
// Get and merge result.
@ -175,7 +179,7 @@ public class ConfigSubService {
}
String urlAll = getUrl(ip, url) + "?" + paramUrl;
RestResult<String> result = NotifyService.invokeURL(urlAll, null, Constants.ENCODE);
RestResult<String> result = invokeUrl(urlAll, null, Constants.ENCODE);
// Http code 200
if (result.ok()) {
@ -192,6 +196,24 @@ public class ConfigSubService {
}
}
/**
* invoke url.
*
* @param url url.
* @param headers headers.
* @param encoding encoding.
* @return result.
* @throws Exception Exception.
*/
private static RestResult<String> invokeUrl(String url, List<String> headers, String encoding) throws Exception {
Header header = Header.newInstance();
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
return HttpClientManager.getNacosRestTemplate().get(url, header, Query.EMPTY, String.class);
}
public SampleResult getCollectSampleResult(String dataId, String group, String tenant, int sampleTime)
throws Exception {
List<SampleResult> resultList = new ArrayList<>();

View File

@ -0,0 +1,124 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.dump;
/**
* dump request.
* @author shiyiyue
*/
public class DumpRequest {
String dataId;
String group;
String tenant;
private boolean isBeta;
private boolean isBatch;
private String tag;
private long lastModifiedTs;
private String sourceIp;
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTenant() {
return tenant;
}
public void setTenant(String tenant) {
this.tenant = tenant;
}
public boolean isBeta() {
return isBeta;
}
public void setBeta(boolean beta) {
isBeta = beta;
}
public boolean isBatch() {
return isBatch;
}
public void setBatch(boolean batch) {
isBatch = batch;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public long getLastModifiedTs() {
return lastModifiedTs;
}
public void setLastModifiedTs(long lastModifiedTs) {
this.lastModifiedTs = lastModifiedTs;
}
public String getSourceIp() {
return sourceIp;
}
public void setSourceIp(String sourceIp) {
this.sourceIp = sourceIp;
}
/**
* create dump request.
* @param dataId dataId.
* @param group group.
* @param tenant tenant.
* @param lastModifiedTs lastModifiedTs.
* @param sourceIp sourceIp.
* @return
*/
public static DumpRequest create(String dataId, String group, String tenant, long lastModifiedTs, String sourceIp) {
DumpRequest dumpRequest = new DumpRequest();
dumpRequest.dataId = dataId;
dumpRequest.group = group;
dumpRequest.tenant = tenant;
dumpRequest.lastModifiedTs = lastModifiedTs;
dumpRequest.sourceIp = sourceIp;
return dumpRequest;
}
}

View File

@ -17,6 +17,10 @@
package com.alibaba.nacos.config.server.service.dump;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.constant.Constants;
@ -24,7 +28,9 @@ import com.alibaba.nacos.config.server.manager.TaskManager;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoAggr;
import com.alibaba.nacos.config.server.model.ConfigInfoChanged;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskServiceFactory;
import com.alibaba.nacos.config.server.service.dump.processor.DumpAllBetaProcessor;
import com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor;
import com.alibaba.nacos.config.server.service.dump.processor.DumpAllTagProcessor;
@ -127,14 +133,10 @@ public abstract class DumpService {
int total = 0;
private static final String TRUE_STR = "true";
private static final String BETA_TABLE_NAME = "config_info_beta";
private static final String TAG_TABLE_NAME = "config_info_tag";
Boolean isQuickStart = false;
private int retentionDays = 30;
/**
@ -156,10 +158,11 @@ public abstract class DumpService {
this.configInfoBetaPersistService = configInfoBetaPersistService;
this.configInfoTagPersistService = configInfoTagPersistService;
this.memberManager = memberManager;
this.processor = new DumpProcessor(this);
this.dumpAllProcessor = new DumpAllProcessor(this);
this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
this.processor = new DumpProcessor(this.configInfoPersistService, this.configInfoBetaPersistService,
this.configInfoTagPersistService);
this.dumpAllProcessor = new DumpAllProcessor(this.configInfoPersistService);
this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this.configInfoBetaPersistService);
this.dumpAllTagProcessor = new DumpAllTagProcessor(this.configInfoTagPersistService);
this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
this.dumpTaskMgr.setDefaultTaskProcessor(processor);
@ -171,6 +174,29 @@ public abstract class DumpService {
this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
DynamicDataSource.getInstance().getDataSource();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
DumpRequest dumpRequest = DumpRequest.create(evt.dataId, evt.group, evt.tenant, evt.lastModifiedTs,
NetUtils.localIP());
dumpRequest.setBeta(evt.isBeta);
dumpRequest.setBatch(evt.isBatch);
dumpRequest.setTag(evt.tag);
DumpService.this.dump(dumpRequest);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
public ConfigInfoPersistService getConfigInfoPersistService() {
@ -189,10 +215,6 @@ public abstract class DumpService {
return historyConfigInfoPersistService;
}
public ServerMemberManager getMemberManager() {
return memberManager;
}
/**
* initialize.
*
@ -234,13 +256,13 @@ public abstract class DumpService {
// update Beta cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
DiskUtil.clearAllBeta();
ConfigDiskServiceFactory.getInstance().clearAllBeta();
if (namespacePersistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(new DumpAllBetaTask());
}
// update Tag cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
DiskUtil.clearAllTag();
ConfigDiskServiceFactory.getInstance().clearAllTag();
if (namespacePersistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(new DumpAllTagTask());
}
@ -302,7 +324,7 @@ public abstract class DumpService {
try {
LogUtil.DEFAULT_LOG.info("start clear all config-info.");
DiskUtil.clearAll();
ConfigDiskServiceFactory.getInstance().clearAll();
dumpAllProcessor.process(new DumpAllTask());
} catch (Exception e) {
LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
@ -338,6 +360,27 @@ public abstract class DumpService {
return retentionDays;
}
/**
* dump operation.
*
* @param dumpRequest dumpRequest.
*/
public void dump(DumpRequest dumpRequest) {
if (dumpRequest.isBeta()) {
dumpBeta(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(),
dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp());
} else if (dumpRequest.isBatch()) {
dumpBatch(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(),
dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp());
} else if (StringUtils.isNotBlank(dumpRequest.getTag())) {
dumpTag(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(), dumpRequest.getTag(),
dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp());
} else {
dumpFormal(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(),
dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp());
}
}
/**
* dump formal config.
*
@ -347,7 +390,7 @@ public abstract class DumpService {
* @param lastModified lastModified.
* @param handleIp handleIp.
*/
public void dumpFormal(String dataId, String group, String tenant, long lastModified, String handleIp) {
private void dumpFormal(String dataId, String group, String tenant, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = dataId + group + tenant;
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, false, false, null, lastModified, handleIp));
@ -364,7 +407,7 @@ public abstract class DumpService {
* @param lastModified lastModified.
* @param handleIp handleIp.
*/
public void dumpBeta(String dataId, String group, String tenant, long lastModified, String handleIp) {
private void dumpBeta(String dataId, String group, String tenant, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = dataId + group + tenant + "+beta";
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, true, false, false, null, lastModified, handleIp));
@ -381,7 +424,7 @@ public abstract class DumpService {
* @param lastModified lastModified.
* @param handleIp handleIp.
*/
public void dumpBatch(String dataId, String group, String tenant, long lastModified, String handleIp) {
private void dumpBatch(String dataId, String group, String tenant, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = groupKey + "+batch";
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, true, false, null, lastModified, handleIp));
@ -398,7 +441,7 @@ public abstract class DumpService {
* @param lastModified lastModified.
* @param handleIp handleIp.
*/
public void dumpTag(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
private void dumpTag(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = groupKey + "+tag+" + tag;
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, false, true, tag, lastModified, handleIp));
@ -456,7 +499,6 @@ public abstract class DumpService {
}
}
final Timestamp time = TimeUtils.getCurrentTime();
// merge
if (datumList.size() > 0) {
ConfigInfo cf = MergeTaskProcessor.merge(dataId, group, tenant, datumList);

View File

@ -19,12 +19,11 @@ package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper;
import com.alibaba.nacos.persistence.model.Page;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.persistence.model.Page;
import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
@ -37,9 +36,8 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
*/
public class DumpAllBetaProcessor implements NacosTaskProcessor {
public DumpAllBetaProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.configInfoBetaPersistService = dumpService.getConfigInfoBetaPersistService();
public DumpAllBetaProcessor(ConfigInfoBetaPersistService configInfoBetaPersistService) {
this.configInfoBetaPersistService = configInfoBetaPersistService;
}
@Override
@ -49,12 +47,12 @@ public class DumpAllBetaProcessor implements NacosTaskProcessor {
int actualRowCount = 0;
for (int pageNo = 1; pageNo <= pageCount; pageNo++) {
Page<ConfigInfoBetaWrapper> page = configInfoBetaPersistService.findAllConfigInfoBetaForDumpAll(pageNo, PAGE_SIZE);
Page<ConfigInfoBetaWrapper> page = configInfoBetaPersistService.findAllConfigInfoBetaForDumpAll(pageNo,
PAGE_SIZE);
if (page != null) {
for (ConfigInfoBetaWrapper cf : page.getPageItems()) {
boolean result = ConfigCacheService
.dumpBeta(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getBetaIps(), cf.getEncryptedDataKey());
boolean result = ConfigCacheService.dumpBeta(cf.getDataId(), cf.getGroup(), cf.getTenant(),
cf.getContent(), cf.getLastModified(), cf.getBetaIps(), cf.getEncryptedDataKey());
LogUtil.DUMP_LOG.info("[dump-all-beta-ok] result={}, {}, {}, length={}, md5={}", result,
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
cf.getContent().length(), cf.getMd5());
@ -69,7 +67,5 @@ public class DumpAllBetaProcessor implements NacosTaskProcessor {
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final ConfigInfoBetaPersistService configInfoBetaPersistService;
}

View File

@ -25,7 +25,6 @@ import com.alibaba.nacos.config.server.service.AggrWhitelist;
import com.alibaba.nacos.config.server.service.ClientIpWhiteList;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.SwitchService;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
@ -41,9 +40,8 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
*/
public class DumpAllProcessor implements NacosTaskProcessor {
public DumpAllProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.configInfoPersistService = dumpService.getConfigInfoPersistService();
public DumpAllProcessor(ConfigInfoPersistService configInfoPersistService) {
this.configInfoPersistService = configInfoPersistService;
}
@Override
@ -86,7 +84,5 @@ public class DumpAllProcessor implements NacosTaskProcessor {
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final ConfigInfoPersistService configInfoPersistService;
}

View File

@ -21,7 +21,6 @@ import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper;
import com.alibaba.nacos.persistence.model.Page;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
@ -36,9 +35,8 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
*/
public class DumpAllTagProcessor implements NacosTaskProcessor {
public DumpAllTagProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.configInfoTagPersistService = dumpService.getConfigInfoTagPersistService();
public DumpAllTagProcessor(ConfigInfoTagPersistService configInfoTagPersistService) {
this.configInfoTagPersistService = configInfoTagPersistService;
}
@Override
@ -68,7 +66,5 @@ public class DumpAllTagProcessor implements NacosTaskProcessor {
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final ConfigInfoTagPersistService configInfoTagPersistService;
}

View File

@ -24,7 +24,6 @@ import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.model.event.ConfigDumpEvent;
import com.alibaba.nacos.config.server.service.dump.DumpConfigHandler;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.dump.task.DumpTask;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
@ -42,19 +41,18 @@ import java.util.Objects;
*/
public class DumpProcessor implements NacosTaskProcessor {
final DumpService dumpService;
final ConfigInfoPersistService configInfoPersistService;
final ConfigInfoBetaPersistService configInfoBetaPersistService;
final ConfigInfoTagPersistService configInfoTagPersistService;
public DumpProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.configInfoPersistService = dumpService.getConfigInfoPersistService();
this.configInfoBetaPersistService = dumpService.getConfigInfoBetaPersistService();
this.configInfoTagPersistService = dumpService.getConfigInfoTagPersistService();
public DumpProcessor(ConfigInfoPersistService configInfoPersistService,
ConfigInfoBetaPersistService configInfoBetaPersistService,
ConfigInfoTagPersistService configInfoTagPersistService) {
this.configInfoPersistService = configInfoPersistService;
this.configInfoBetaPersistService = configInfoBetaPersistService;
this.configInfoTagPersistService = configInfoTagPersistService;
}
@Override
@ -77,7 +75,7 @@ public class DumpProcessor implements NacosTaskProcessor {
type = "tag-" + tag;
}
LogUtil.DUMP_LOG.info("[dump] process {} task. groupKey={}", type, dumpTask.getGroupKey());
if (isBeta) {
// if publish beta, then dump config, update beta cache
ConfigInfoBetaWrapper cf = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
@ -89,7 +87,7 @@ public class DumpProcessor implements NacosTaskProcessor {
build.lastModifiedTs(Objects.isNull(cf) ? lastModifiedOut : cf.getLastModified());
return DumpConfigHandler.configDump(build.build());
}
if (StringUtils.isNotBlank(tag)) {
ConfigInfoTagWrapper cf = configInfoTagPersistService.findConfigInfo4Tag(dataId, group, tenant, tag);
build.remove(Objects.isNull(cf));
@ -99,7 +97,7 @@ public class DumpProcessor implements NacosTaskProcessor {
build.lastModifiedTs(Objects.isNull(cf) ? lastModifiedOut : cf.getLastModified());
return DumpConfigHandler.configDump(build.build());
}
ConfigInfoWrapper cf = configInfoPersistService.findConfigInfo(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
@ -107,6 +105,6 @@ public class DumpProcessor implements NacosTaskProcessor {
build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
build.lastModifiedTs(Objects.isNull(cf) ? lastModifiedOut : cf.getLastModified());
return DumpConfigHandler.configDump(build.build());
}
}

View File

@ -19,15 +19,14 @@ package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
@ -64,9 +63,6 @@ public class AsyncNotifyService {
private static final int MAX_COUNT = 6;
@Autowired
private DumpService dumpService;
@Autowired
private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
@ -101,7 +97,7 @@ public class AsyncNotifyService {
String tag = evt.tag;
MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
Collection<Member> ipList = memberManager.allMembers();
Collection<Member> ipList = memberManager.allMembersWithoutSelf();
// In fact, any type of queue here can be
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
@ -152,24 +148,8 @@ public class AsyncNotifyService {
syncRequest.setBatch(task.isBatch);
syncRequest.setTenant(task.getTenant());
Member member = task.member;
if (memberManager.getSelf().equals(member)) {
if (syncRequest.isBeta()) {
dumpService.dumpBeta(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
} else if (syncRequest.isBatch()) {
dumpService.dumpBatch(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
} else if (StringUtils.isNotBlank(syncRequest.getTag())) {
dumpService.dumpTag(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
} else {
dumpService.dumpFormal(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
}
continue;
}
String event = getNotifyEvent(task);
String event = getNotifyEvent(task);
if (memberManager.hasMember(member.getAddress())) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = isUnHealthy(member.getAddress());
@ -200,8 +180,18 @@ public class AsyncNotifyService {
}
}
static class NotifySingleRpcTask extends NotifyTask {
static class NotifySingleRpcTask extends AbstractDelayTask {
private String dataId;
private String group;
private String tenant;
private long lastModified;
private int failCount;
private Member member;
private boolean isBeta;
@ -212,7 +202,7 @@ public class AsyncNotifyService {
public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified,
boolean isBeta, Member member) {
super(dataId, group, tenant, lastModified);
this(dataId, group, tenant, lastModified);
this.member = member;
this.isBeta = isBeta;
this.tag = tag;
@ -220,13 +210,21 @@ public class AsyncNotifyService {
public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified,
boolean isBeta, boolean isBatch, Member member) {
super(dataId, group, tenant, lastModified);
this(dataId, group, tenant, lastModified);
this.member = member;
this.isBeta = isBeta;
this.tag = tag;
this.isBatch = isBatch;
}
private NotifySingleRpcTask(String dataId, String group, String tenant, long lastModified) {
this.dataId = dataId;
this.group = group;
this.setTenant(tenant);
this.lastModified = lastModified;
setTaskInterval(3000L);
}
public boolean isBeta() {
return isBeta;
}
@ -250,6 +248,52 @@ public class AsyncNotifyService {
public void setBatch(boolean batch) {
isBatch = batch;
}
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public int getFailCount() {
return failCount;
}
public void setFailCount(int failCount) {
this.failCount = failCount;
}
public long getLastModified() {
return lastModified;
}
public void setLastModified(long lastModified) {
this.lastModified = lastModified;
}
@Override
public void merge(AbstractDelayTask task) {
// Perform merge, but do nothing, tasks with the same dataId and group, later will replace the previous
}
public String getTenant() {
return tenant;
}
public void setTenant(String tenant) {
this.tenant = tenant;
}
}
private void asyncTaskExecute(NotifySingleRpcTask task) {
@ -343,7 +387,7 @@ public class AsyncNotifyService {
* @param task notify task
* @return delay
*/
private static int getDelayTime(NotifyTask task) {
private static int getDelayTime(NotifySingleRpcTask task) {
int failCount = task.getFailCount();
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {

View File

@ -1,74 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.config.server.manager.TaskManager;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* Service to notify other nodes to get the latest data. Monitor data change events and notify all servers.
*
* @author jiuRen
*/
public class NotifyService {
@Autowired
public NotifyService(ServerMemberManager memberManager) {
notifyTaskManager = new TaskManager("com.alibaba.nacos.NotifyTaskManager");
notifyTaskManager.setDefaultTaskProcessor(new NotifyTaskProcessor(memberManager));
}
protected NotifyService() {
}
/**
* In order to facilitate the system beta, without changing the notify.do interface, the new lastModifed parameter
* is passed through the Http header.
*/
public static final String NOTIFY_HEADER_LAST_MODIFIED = "lastModified";
public static final String NOTIFY_HEADER_OP_HANDLE_IP = "opHandleIp";
private TaskManager notifyTaskManager;
/**
* Invoke http get request.
*
* @param url url
* @param headers headers
* @param encoding encoding
* @return {@link com.alibaba.nacos.common.model.RestResult}
* @throws Exception throw Exception
*/
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public static RestResult<String> invokeURL(String url, List<String> headers, String encoding) throws Exception {
Header header = Header.newInstance();
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
return HttpClientManager.getNacosRestTemplate().get(url, header, Query.EMPTY, String.class);
}
}

View File

@ -1,94 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.utils.LogUtil;
import org.slf4j.Logger;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Notify Single server.
*
* @author Nacos
*/
public class NotifySingleService {
private static final Logger LOGGER = LogUtil.FATAL_LOG;
static class NotifyTaskProcessorWrapper extends NotifyTaskProcessor {
public NotifyTaskProcessorWrapper() {
/*
* serverListManager is useless here
*/
super(null);
}
@Override
public boolean process(NacosTask task) {
NotifySingleTask notifyTask = (NotifySingleTask) task;
return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(),
notifyTask.getLastModified(), notifyTask.target);
}
}
static class NotifySingleTask extends NotifyTask implements Runnable {
private static final NotifyTaskProcessorWrapper PROCESSOR = new NotifyTaskProcessorWrapper();
private final Executor executor;
private final String target;
private boolean isSuccess = false;
public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target,
Executor executor) {
super(dataId, group, tenant, lastModified);
this.target = target;
this.executor = executor;
}
@Override
public void run() {
try {
this.isSuccess = PROCESSOR.process(this);
} catch (Exception e) { // never goes here, but in case (never interrupts this notification thread)
this.isSuccess = false;
LogUtil.NOTIFY_LOG
.error("[notify-exception] target:{} dataid:{} group:{} ts:{}", target, getDataId(), getGroup(),
getLastModified());
LogUtil.NOTIFY_LOG.debug("[notify-exception] target:{} dataid:{} group:{} ts:{}",
target, getDataId(), getGroup(), getLastModified(), e);
}
if (!this.isSuccess) {
LogUtil.NOTIFY_LOG
.error("[notify-retry] target:{} dataid:{} group:{} ts:{}", target, getDataId(), getGroup(),
getLastModified());
try {
((ScheduledThreadPoolExecutor) executor).schedule(this, 500L, TimeUnit.MILLISECONDS);
} catch (Exception e) { // The notification failed, but at the same time, the node was offline
LOGGER.warn("[notify-thread-pool] cluster remove node {}, current thread was tear down.", target, e);
}
}
}
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.task.AbstractDelayTask;
/**
* Notify task.
*
* @author Nacos
*/
public class NotifyTask extends AbstractDelayTask {
private String dataId;
private String group;
private String tenant;
private long lastModified;
private int failCount;
public NotifyTask(String dataId, String group, String tenant, long lastModified) {
this.dataId = dataId;
this.group = group;
this.setTenant(tenant);
this.lastModified = lastModified;
setTaskInterval(3000L);
}
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public int getFailCount() {
return failCount;
}
public void setFailCount(int failCount) {
this.failCount = failCount;
}
public long getLastModified() {
return lastModified;
}
public void setLastModified(long lastModified) {
this.lastModified = lastModified;
}
@Override
public void merge(AbstractDelayTask task) {
// Perform merge, but do nothing, tasks with the same dataId and group, later will replace the previous
}
public String getTenant() {
return tenant;
}
public void setTenant(String tenant) {
this.tenant = tenant;
}
}

View File

@ -1,110 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.InetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Notification service. After the database changes, notify all servers, including themselves, to load new data.
*
* @author Nacos
*/
public class NotifyTaskProcessor implements NacosTaskProcessor {
static final Logger LOGGER = LoggerFactory.getLogger(NotifyTaskProcessor.class);
static final String URL_PATTERN =
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";
final ServerMemberManager memberManager;
public NotifyTaskProcessor(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
@Override
public boolean process(NacosTask task) {
NotifyTask notifyTask = (NotifyTask) task;
String dataId = notifyTask.getDataId();
String group = notifyTask.getGroup();
String tenant = notifyTask.getTenant();
long lastModified = notifyTask.getLastModified();
boolean isok = true;
for (Member ip : memberManager.allMembers()) {
isok = notifyToDump(dataId, group, tenant, lastModified, ip.getAddress()) && isok;
}
return isok;
}
/**
* Notify other servers.
*/
boolean notifyToDump(String dataId, String group, String tenant, long lastModified, String serverIp) {
long delayed = System.currentTimeMillis() - lastModified;
try {
/*
In order to facilitate the system beta, without changing the notify.do interface,
the new lastModifed parameter is passed through the Http header
*/
List<String> headers = Arrays.asList(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(lastModified), NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
String urlString = MessageFormat.format(URL_PATTERN, serverIp, EnvUtil.getContextPath(), dataId, group);
RestResult<String> result = NotifyService.invokeURL(urlString, headers, Constants.ENCODE);
if (result.ok()) {
ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(),
ConfigTraceService.NOTIFY_EVENT, ConfigTraceService.NOTIFY_TYPE_OK, delayed, serverIp);
MetricsMonitor.getNotifyRtTimer().record(delayed, TimeUnit.MILLISECONDS);
return true;
} else {
MetricsMonitor.getConfigNotifyException().increment();
LOGGER.error("[notify-error] {}, {}, to {}, result {}", dataId, group, serverIp, result.getCode());
ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(),
ConfigTraceService.NOTIFY_EVENT, ConfigTraceService.NOTIFY_TYPE_ERROR, delayed, serverIp);
return false;
}
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
String message = "[notify-exception] " + dataId + ", " + group + ", to " + serverIp + ", " + e.toString();
LOGGER.error(message);
LOGGER.debug(message, e);
ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(),
ConfigTraceService.NOTIFY_EVENT, ConfigTraceService.NOTIFY_TYPE_EXCEPTION, delayed, serverIp);
return false;
}
}
}

View File

@ -73,7 +73,7 @@ public class EmbeddedConfigInfoAggrPersistServiceImpl implements ConfigInfoAggrP
/**
* The constructor sets the dependency injection order.
*
* @param databaseOperate {@link EmbeddedStoragePersistServiceImpl}
* @param databaseOperate databaseOperate.
*/
public EmbeddedConfigInfoAggrPersistServiceImpl(DatabaseOperate databaseOperate) {
this.databaseOperate = databaseOperate;

View File

@ -73,7 +73,7 @@ public class EmbeddedConfigInfoBetaPersistServiceImpl implements ConfigInfoBetaP
/**
* The constructor sets the dependency injection order.
*
* @param databaseOperate {@link EmbeddedStoragePersistServiceImpl}
* @param databaseOperate databaseOperate.
*/
public EmbeddedConfigInfoBetaPersistServiceImpl(DatabaseOperate databaseOperate) {
this.databaseOperate = databaseOperate;

View File

@ -139,7 +139,7 @@ public class EmbeddedConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
/**
* The constructor sets the dependency injection order.
*
* @param databaseOperate {@link EmbeddedStoragePersistServiceImpl}
* @param databaseOperate databaseOperate.
* @param idGeneratorManager {@link IdGeneratorManager}
*/
public EmbeddedConfigInfoPersistServiceImpl(DatabaseOperate databaseOperate, IdGeneratorManager idGeneratorManager,

View File

@ -73,7 +73,7 @@ public class EmbeddedConfigInfoTagPersistServiceImpl implements ConfigInfoTagPer
/**
* The constructor sets the dependency injection order.
*
* @param databaseOperate {@link EmbeddedStoragePersistServiceImpl}
* @param databaseOperate databaseOperate.
*/
public EmbeddedConfigInfoTagPersistServiceImpl(DatabaseOperate databaseOperate) {
this.databaseOperate = databaseOperate;

View File

@ -75,7 +75,7 @@ public class EmbeddedHistoryConfigInfoPersistServiceImpl implements HistoryConfi
/**
* The constructor sets the dependency injection order.
*
* @param databaseOperate {@link EmbeddedStoragePersistServiceImpl}
* @param databaseOperate databaseOperate.
*/
public EmbeddedHistoryConfigInfoPersistServiceImpl(DatabaseOperate databaseOperate) {
this.databaseOperate = databaseOperate;

View File

@ -33,6 +33,7 @@ import com.alibaba.nacos.config.server.model.ConfigOperateResult;
import com.alibaba.nacos.config.server.model.SameConfigPolicy;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.HistoryConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.sql.ExternalStorageUtils;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.ParamUtils;
import com.alibaba.nacos.persistence.configuration.condition.ConditionOnExternalStorage;
@ -62,7 +63,6 @@ import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
@ -229,7 +229,7 @@ public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
}
}
}
@Override
public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {
@ -246,7 +246,7 @@ public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistSe
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
KeyHolder keyHolder = new GeneratedKeyHolder();
KeyHolder keyHolder = ExternalStorageUtils.createKeyHolder();
ConfigInfoMapper configInfoMapper = mapperManager
.findMapper(dataSourceService.getDataSourceType(), TableConstant.CONFIG_INFO);

View File

@ -0,0 +1,31 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.sql;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
/**
* external storage utils.
* @author shiyiyue
*/
public class ExternalStorageUtils {
public static KeyHolder createKeyHolder() {
return new GeneratedKeyHolder();
}
}

View File

@ -16,17 +16,11 @@
package com.alibaba.nacos.config.server.utils;
import com.alibaba.nacos.common.pathencoder.PathEncoderManager;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.apache.commons.io.FileUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
@ -36,210 +30,16 @@ import java.io.IOException;
*/
public class DiskUtil {
static final String BASE_DIR = File.separator + "data" + File.separator + "config-data";
static final String TENANT_BASE_DIR = File.separator + "data" + File.separator + "tenant-config-data";
static final String BETA_DIR = File.separator + "data" + File.separator + "beta-data";
static final String TENANT_BETA_DIR = File.separator + "data" + File.separator + "tenant-beta-data";
static final String TAG_DIR = File.separator + "data" + File.separator + "tag-data";
static final String TENANT_TAG_DIR = File.separator + "data" + File.separator + "tag-beta-data";
public static void saveHeartBeatToDisk(String heartBeatTime) throws IOException {
FileUtils.writeStringToFile(heartBeatFile(), heartBeatTime, Constants.ENCODE);
}
/**
* Save configuration information to disk.
*/
public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {
File targetFile = targetFile(dataId, group, tenant);
FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}
/**
* Save beta information to disk.
*/
public static void saveBetaToDisk(String dataId, String group, String tenant, String content) throws IOException {
File targetFile = targetBetaFile(dataId, group, tenant);
FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}
/**
* Save tag information to disk.
*/
public static void saveTagToDisk(String dataId, String group, String tenant, String tag, String content)
throws IOException {
File targetFile = targetTagFile(dataId, group, tenant, tag);
FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}
/**
* Deletes configuration files on disk.
*/
public static void removeConfigInfo(String dataId, String group, String tenant) {
FileUtils.deleteQuietly(targetFile(dataId, group, tenant));
}
/**
* Deletes beta configuration files on disk.
*/
public static void removeConfigInfo4Beta(String dataId, String group, String tenant) {
FileUtils.deleteQuietly(targetBetaFile(dataId, group, tenant));
}
/**
* Deletes tag configuration files on disk.
*/
public static void removeConfigInfo4Tag(String dataId, String group, String tenant, String tag) {
FileUtils.deleteQuietly(targetTagFile(dataId, group, tenant, tag));
}
public static void removeHeartHeat() {
FileUtils.deleteQuietly(heartBeatFile());
}
/**
* Returns the path of the server cache file.
*/
public static File targetFile(String dataId, String group, String tenant) {
// fix https://github.com/alibaba/nacos/issues/10067
dataId = PathEncoderManager.getInstance().encode(dataId);
group = PathEncoderManager.getInstance().encode(group);
tenant = PathEncoderManager.getInstance().encode(tenant);
File file;
if (StringUtils.isBlank(tenant)) {
file = new File(EnvUtil.getNacosHome(), BASE_DIR);
} else {
file = new File(EnvUtil.getNacosHome(), TENANT_BASE_DIR);
file = new File(file, tenant);
}
file = new File(file, group);
file = new File(file, dataId);
return file;
}
/**
* Returns the path of cache file in server.
*/
public static File targetBetaFile(String dataId, String group, String tenant) {
// fix https://github.com/alibaba/nacos/issues/10067
dataId = PathEncoderManager.getInstance().encode(dataId);
group = PathEncoderManager.getInstance().encode(group);
tenant = PathEncoderManager.getInstance().encode(tenant);
File file;
if (StringUtils.isBlank(tenant)) {
file = new File(EnvUtil.getNacosHome(), BETA_DIR);
} else {
file = new File(EnvUtil.getNacosHome(), TENANT_BETA_DIR);
file = new File(file, tenant);
}
file = new File(file, group);
file = new File(file, dataId);
return file;
}
/**
* Returns the path of the tag cache file in server.
*/
public static File targetTagFile(String dataId, String group, String tenant, String tag) {
File file;
// fix https://github.com/alibaba/nacos/issues/10067
dataId = PathEncoderManager.getInstance().encode(dataId);
group = PathEncoderManager.getInstance().encode(group);
tenant = PathEncoderManager.getInstance().encode(tenant);
if (StringUtils.isBlank(tenant)) {
file = new File(EnvUtil.getNacosHome(), TAG_DIR);
} else {
file = new File(EnvUtil.getNacosHome(), TENANT_TAG_DIR);
file = new File(file, tenant);
}
file = new File(file, group);
file = new File(file, dataId);
file = new File(file, tag);
return file;
}
public static String getConfig(String dataId, String group, String tenant) throws IOException {
File file = targetFile(dataId, group, tenant);
if (file.exists()) {
try (FileInputStream fis = new FileInputStream(file)) {
return IoUtils.toString(fis, Constants.ENCODE);
} catch (FileNotFoundException e) {
return StringUtils.EMPTY;
}
} else {
return StringUtils.EMPTY;
}
}
public static String getLocalConfigMd5(String dataId, String group, String tenant) throws IOException {
return MD5Utils.md5Hex(getConfig(dataId, group, tenant), Constants.ENCODE);
}
public static File heartBeatFile() {
return new File(EnvUtil.getNacosHome(), "status" + File.separator + "heartBeat.txt");
}
public static String relativePath(String dataId, String group) {
return BASE_DIR + "/" + dataId + "/" + group;
}
/**
* Clear all config file.
*/
public static void clearAll() {
File file = new File(EnvUtil.getNacosHome(), BASE_DIR);
if (FileUtils.deleteQuietly(file)) {
LogUtil.DEFAULT_LOG.info("clear all config-info success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info failed.");
}
File fileTenant = new File(EnvUtil.getNacosHome(), TENANT_BASE_DIR);
if (FileUtils.deleteQuietly(fileTenant)) {
LogUtil.DEFAULT_LOG.info("clear all config-info-tenant success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info-tenant failed.");
}
}
/**
* Clear all beta config file.
*/
public static void clearAllBeta() {
File file = new File(EnvUtil.getNacosHome(), BETA_DIR);
if (FileUtils.deleteQuietly(file)) {
LogUtil.DEFAULT_LOG.info("clear all config-info-beta success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info-beta failed.");
}
File fileTenant = new File(EnvUtil.getNacosHome(), TENANT_BETA_DIR);
if (FileUtils.deleteQuietly(fileTenant)) {
LogUtil.DEFAULT_LOG.info("clear all config-info-beta-tenant success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info-beta-tenant failed.");
}
}
/**
* Clear all tag config file.
*/
public static void clearAllTag() {
File file = new File(EnvUtil.getNacosHome(), TAG_DIR);
if (FileUtils.deleteQuietly(file)) {
LogUtil.DEFAULT_LOG.info("clear all config-info-tag success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info-tag failed.");
}
File fileTenant = new File(EnvUtil.getNacosHome(), TENANT_TAG_DIR);
if (FileUtils.deleteQuietly(fileTenant)) {
LogUtil.DEFAULT_LOG.info("clear all config-info-tag-tenant success.");
} else {
LogUtil.DEFAULT_LOG.warn("clear all config-info-tag-tenant failed.");
}
}
}

View File

@ -28,7 +28,6 @@ import com.alibaba.nacos.config.server.service.dump.disk.ConfigRocksDbDiskServic
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.utils.DiskUtil;
import com.alibaba.nacos.config.server.utils.MD5Util;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
@ -91,8 +90,6 @@ public class ConfigServletInnerTest {
MockedStatic<ConfigCacheService> configCacheServiceMockedStatic;
MockedStatic<DiskUtil> diskUtilMockedStatic;
MockedStatic<PropertyUtil> propertyUtilMockedStatic;
MockedStatic<MD5Util> md5UtilMockedStatic;
@ -102,7 +99,6 @@ public class ConfigServletInnerTest {
EnvUtil.setEnvironment(new StandardEnvironment());
ReflectionTestUtils.setField(configServletInner, "longPollingService", longPollingService);
configCacheServiceMockedStatic = Mockito.mockStatic(ConfigCacheService.class);
diskUtilMockedStatic = Mockito.mockStatic(DiskUtil.class);
propertyUtilMockedStatic = Mockito.mockStatic(PropertyUtil.class);
propertyUtilMockedStatic.when(PropertyUtil::getMaxContent).thenReturn(1024 * 1000);
md5UtilMockedStatic = Mockito.mockStatic(MD5Util.class);
@ -113,9 +109,7 @@ public class ConfigServletInnerTest {
@After
public void after() {
if (diskUtilMockedStatic != null) {
diskUtilMockedStatic.close();
}
if (configCacheServiceMockedStatic != null) {
configCacheServiceMockedStatic.close();
}
@ -192,8 +186,6 @@ public class ConfigServletInnerTest {
// if direct read is false
propertyUtilMockedStatic.when(PropertyUtil::isDirectRead).thenReturn(false);
File file = tempFolder.newFile("test.txt");
diskUtilMockedStatic.when(() -> DiskUtil.targetBetaFile(anyString(), anyString(), anyString()))
.thenReturn(file);
when(configRocksDbDiskService.getBetaContent("test", "test", "test")).thenReturn(
"isBeta:true, direct read: false");
response = new MockHttpServletResponse();

View File

@ -19,25 +19,39 @@ package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoStateWrapper;
import com.alibaba.nacos.config.server.model.ConfigOperateResult;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.AggrWhitelist;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.persistence.configuration.DatasourceConfiguration;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.core.env.StandardEnvironment;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@ -46,32 +60,515 @@ public class ConfigPublishRequestHandlerTest {
private ConfigPublishRequestHandler configPublishRequestHandler;
@Mock
private ConfigInfoPersistService configInfoPersistService;
ConfigInfoPersistService configInfoPersistService;
@Mock
private ConfigInfoTagPersistService configInfoTagPersistService;
ConfigInfoBetaPersistService configInfoBetaPersistService;
@Mock
private ConfigInfoBetaPersistService configInfoBetaPersistService;
ConfigInfoTagPersistService configInfoTagPersistService;
MockedStatic<AggrWhitelist> aggrWhitelistMockedStatic;
MockedStatic<EnvUtil> envUtilMockedStatic;
@Before
public void setUp() {
aggrWhitelistMockedStatic = Mockito.mockStatic(AggrWhitelist.class);
envUtilMockedStatic = Mockito.mockStatic(EnvUtil.class);
configPublishRequestHandler = new ConfigPublishRequestHandler(configInfoPersistService,
configInfoTagPersistService, configInfoBetaPersistService);
EnvUtil.setEnvironment(new StandardEnvironment());
DatasourceConfiguration.setEmbeddedStorage(false);
}
@After
public void after() {
aggrWhitelistMockedStatic.close();
envUtilMockedStatic.close();
}
/**
* publish a not-exist config. expect : 1.response return true 2. publish ConfigDataChangeEvent
*
* @throws Exception exception.
*/
@Test
public void testNormalPublishConfigNotCas() throws Exception {
String dataId = "testNormalPublishConfigNotCas";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
configPublishRequest.setAdditionMap(keyMap);
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoPersistService.insertOrUpdate(eq(requestMeta.getClientIp()), eq(srcUser), any(ConfigInfo.class),
any(Map.class))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertFalse(reference.get().isBeta);
}
/**
* publish a exist config.
*
* @throws Exception exception.
*/
@Test
public void testNormalPublishConfigCas() throws Exception {
String dataId = "testNormalPublishConfigCas";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
configPublishRequest.setCasMd5("12314532");
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
configPublishRequest.setAdditionMap(keyMap);
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoPersistService.insertOrUpdateCas(eq(requestMeta.getClientIp()), eq(srcUser),
any(ConfigInfo.class), any(Map.class))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertFalse(reference.get().isBeta);
}
/**
* publish a exist config.
*
* @throws Exception exception.
*/
@Test
public void testNormalPublishConfigCasError() throws Exception {
String dataId = "testNormalPublishConfigCasError";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
configPublishRequest.setCasMd5("12314532");
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
configPublishRequest.setAdditionMap(keyMap);
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
ConfigInfoStateWrapper configInfoStateWrapper = new ConfigInfoStateWrapper();
configInfoStateWrapper.setId(12345678);
long timeStamp = System.currentTimeMillis();
configInfoStateWrapper.setLastModified(timeStamp);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoPersistService.insertOrUpdateCas(eq(requestMeta.getClientIp()), eq(srcUser),
any(ConfigInfo.class), any(Map.class))).thenThrow(new NacosRuntimeException(502, "mock error"));
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.FAIL.getCode(), response.getResultCode());
Assert.assertTrue(response.getMessage().contains("mock error"));
Thread.sleep(500L);
Assert.assertTrue(reference.get() == null);
}
@Test
public void testHandle() throws NacosException {
public void testPublishAggrCheckFail() throws NacosException, InterruptedException {
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
String dataId = "testPublishAggrCheckFail";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId("dataId");
configPublishRequest.setGroup("group");
configPublishRequest.setContent("content");
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
when(AggrWhitelist.isAggrDataId(eq(dataId))).thenReturn(Boolean.TRUE);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.FAIL.getCode(), response.getResultCode());
Assert.assertTrue(response.getMessage().contains("is aggr"));
Thread.sleep(500L);
Assert.assertTrue(reference.get() == null);
}
@Test
public void testBetaPublishNotCas() throws NacosException, InterruptedException {
String dataId = "testBetaPublish";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
String betaIps = "127.0.0.1,127.0.0.2";
keyMap.put("betaIps", betaIps);
configPublishRequest.setAdditionMap(keyMap);
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoBetaPersistService.insertOrUpdateBeta(any(ConfigInfo.class), eq(betaIps),
eq(requestMeta.getClientIp()), eq(srcUser))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertTrue(reference.get().isBeta);
}
@Test
public void testBetaPublishCas() throws NacosException, InterruptedException {
String dataId = "testBetaPublishCas";
String group = "group";
String tenant = "tenant";
String content = "content";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setTenant(tenant);
configPublishRequest.setContent(content);
configPublishRequest.setCasMd5("12314532");
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
String betaIps = "127.0.0.1,127.0.0.2";
keyMap.put("betaIps", betaIps);
configPublishRequest.setAdditionMap(keyMap);
RequestMeta requestMeta = new RequestMeta();
String clientIp = "127.0.0.1";
requestMeta.setClientIp(clientIp);
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoBetaPersistService.insertOrUpdateBetaCas(any(ConfigInfo.class), eq(betaIps),
eq(requestMeta.getClientIp()), eq(srcUser))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertTrue(reference.get().isBeta);
}
@Test
public void testTagPublishNotCas() throws NacosException, InterruptedException {
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
String dataId = "testTagPublishNotCas";
configPublishRequest.setDataId(dataId);
String group = "group";
configPublishRequest.setGroup(group);
String tenant = "tenant";
configPublishRequest.setTenant(tenant);
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
String tag = "testTag";
keyMap.put("tag", tag);
configPublishRequest.setAdditionMap(keyMap);
String content = "content";
configPublishRequest.setContent(content);
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp("127.0.0.1");
when(configInfoPersistService.insertOrUpdate(any(), any(), any(ConfigInfo.class), any(Map.class))).thenReturn(
new ConfigOperateResult(true));
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent event1 = (ConfigDataChangeEvent) event;
if (event1.dataId.equals(dataId)) {
reference.set((ConfigDataChangeEvent) event);
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoTagPersistService.insertOrUpdateTag(any(ConfigInfo.class), eq(tag),
eq(requestMeta.getClientIp()), eq(srcUser))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertFalse(reference.get().isBeta);
Assert.assertEquals(tag, reference.get().tag);
}
@Test
public void testTagPublishCas() throws NacosException, InterruptedException {
String dataId = "testTagPublishCas";
String group = "group";
ConfigPublishRequest configPublishRequest = new ConfigPublishRequest();
configPublishRequest.setDataId(dataId);
configPublishRequest.setGroup(group);
configPublishRequest.setCasMd5("casmd512");
Map<String, String> keyMap = new HashMap<>();
String srcUser = "src_user111";
keyMap.put("src_user", srcUser);
String tag = "testTag";
keyMap.put("tag", tag);
configPublishRequest.setAdditionMap(keyMap);
String tenant = "tenant";
configPublishRequest.setTenant(tenant);
String content = "content";
configPublishRequest.setContent(content);
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp("127.0.0.1");
AtomicReference<ConfigDataChangeEvent> reference = new AtomicReference<>();
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
reference.set((ConfigDataChangeEvent) event);
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
ConfigOperateResult configOperateResult = new ConfigOperateResult(true);
long timestamp = System.currentTimeMillis();
long id = timestamp / 1000;
configOperateResult.setId(id);
configOperateResult.setLastModified(timestamp);
when(configInfoTagPersistService.insertOrUpdateTagCas(any(ConfigInfo.class), eq(tag),
eq(requestMeta.getClientIp()), eq(srcUser))).thenReturn(configOperateResult);
ConfigPublishResponse response = configPublishRequestHandler.handle(configPublishRequest, requestMeta);
Assert.assertEquals(ResponseCode.SUCCESS.getCode(), response.getResultCode());
Thread.sleep(500L);
Assert.assertTrue(reference.get() != null);
Assert.assertEquals(dataId, reference.get().dataId);
Assert.assertEquals(group, reference.get().group);
Assert.assertEquals(tenant, reference.get().tenant);
Assert.assertEquals(timestamp, reference.get().lastModifiedTs);
Assert.assertFalse(reference.get().isBatch);
Assert.assertFalse(reference.get().isBeta);
Assert.assertEquals(tag, reference.get().tag);
}
}

View File

@ -0,0 +1,268 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.dump;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.model.CacheItem;
import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskService;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskServiceFactory;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigRocksDbDiskService;
import com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor;
import com.alibaba.nacos.config.server.service.dump.task.DumpTask;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.persistence.datasource.DataSourceService;
import com.alibaba.nacos.persistence.datasource.DynamicDataSource;
import com.alibaba.nacos.plugin.datasource.constants.CommonConstant;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DumpProcessorTest {
@Mock
DynamicDataSource dynamicDataSource;
@Mock
DataSourceService dataSourceService;
@Mock
ConfigInfoPersistService configInfoPersistService;
@Mock
ConfigInfoBetaPersistService configInfoBetaPersistService;
@Mock
ConfigInfoTagPersistService configInfoTagPersistService;
ExternalDumpService dumpService;
DumpProcessor dumpProcessor;
MockedStatic<DynamicDataSource> dynamicDataSourceMockedStatic;
MockedStatic<EnvUtil> envUtilMockedStatic;
@Before
public void init() throws Exception {
dynamicDataSourceMockedStatic = Mockito.mockStatic(DynamicDataSource.class);
envUtilMockedStatic = Mockito.mockStatic(EnvUtil.class);
when(EnvUtil.getNacosHome()).thenReturn(System.getProperty("user.home"));
when(EnvUtil.getProperty(eq(CommonConstant.NACOS_PLUGIN_DATASOURCE_LOG), eq(Boolean.class),
eq(false))).thenReturn(false);
dynamicDataSourceMockedStatic.when(DynamicDataSource::getInstance).thenReturn(dynamicDataSource);
when(dynamicDataSource.getDataSource()).thenReturn(dataSourceService);
dumpService = new ExternalDumpService(configInfoPersistService, null, null, null, configInfoBetaPersistService,
configInfoTagPersistService, null);
dumpProcessor = new DumpProcessor(configInfoPersistService, configInfoBetaPersistService,
configInfoTagPersistService);
Field[] declaredFields = ConfigDiskServiceFactory.class.getDeclaredFields();
for (Field filed : declaredFields) {
if (filed.getName().equals("configDiskService")) {
filed.setAccessible(true);
filed.set(null, createDiskService());
}
}
}
protected ConfigDiskService createDiskService() {
return new ConfigRocksDbDiskService();
}
@After
public void after() {
dynamicDataSourceMockedStatic.close();
envUtilMockedStatic.close();
ConfigDiskServiceFactory.getInstance().clearAll();
ConfigDiskServiceFactory.getInstance().clearAllBatch();
ConfigDiskServiceFactory.getInstance().clearAllBeta();
ConfigDiskServiceFactory.getInstance().clearAllTag();
}
@Test
public void testDumpNormalAndRemove() throws IOException {
String dataId = "testDataId";
String group = "testGroup";
String tenant = "testTenant";
String content = "testContent你好" + System.currentTimeMillis();
long time = System.currentTimeMillis();
ConfigInfoWrapper configInfoWrapper = new ConfigInfoWrapper();
configInfoWrapper.setDataId(dataId);
configInfoWrapper.setGroup(group);
configInfoWrapper.setTenant(tenant);
configInfoWrapper.setContent(content);
configInfoWrapper.setLastModified(time);
Mockito.when(configInfoPersistService.findConfigInfo(eq(dataId), eq(group), eq(tenant)))
.thenReturn(configInfoWrapper);
String handlerIp = "127.0.0.1";
long lastModified = System.currentTimeMillis();
DumpTask dumpTask = new DumpTask(GroupKey2.getKey(dataId, group, tenant), false, false, false, null,
lastModified, handlerIp);
boolean process = dumpProcessor.process(dumpTask);
Assert.assertTrue(process);
//Check cache
CacheItem contentCache = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertEquals(MD5Utils.md5Hex(content, "UTF-8"), contentCache.getConfigCache().getMd5Utf8());
Assert.assertEquals(time, contentCache.getConfigCache().getLastModifiedTs());
//check disk
String contentFromDisk = ConfigDiskServiceFactory.getInstance().getContent(dataId, group, tenant);
Assert.assertEquals(content, contentFromDisk);
// remove
Mockito.when(configInfoPersistService.findConfigInfo(eq(dataId), eq(group), eq(tenant))).thenReturn(null);
boolean processRemove = dumpProcessor.process(dumpTask);
Assert.assertTrue(processRemove);
//Check cache
CacheItem contentCacheAfterRemove = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertTrue(contentCacheAfterRemove == null);
//check disk
String contentFromDiskAfterRemove = ConfigDiskServiceFactory.getInstance().getContent(dataId, group, tenant);
Assert.assertNull(contentFromDiskAfterRemove);
}
@Test
public void testDumpBetaAndRemove() throws IOException {
String dataId = "testDataIdBeta";
String group = "testGroup";
String tenant = "testTenant";
String content = "testContentBeta你好" + System.currentTimeMillis();
long time = System.currentTimeMillis();
ConfigInfoBetaWrapper configInfoWrapper = new ConfigInfoBetaWrapper();
configInfoWrapper.setDataId(dataId);
configInfoWrapper.setGroup(group);
configInfoWrapper.setTenant(tenant);
configInfoWrapper.setContent(content);
configInfoWrapper.setLastModified(time);
String betaIps = "127.0.0.1123,127.0.0.11";
configInfoWrapper.setBetaIps(betaIps);
Mockito.when(configInfoBetaPersistService.findConfigInfo4Beta(eq(dataId), eq(group), eq(tenant)))
.thenReturn(configInfoWrapper);
String handlerIp = "127.0.0.1";
long lastModified = System.currentTimeMillis();
DumpTask dumpTask = new DumpTask(GroupKey2.getKey(dataId, group, tenant), true, false, false, null,
lastModified, handlerIp);
boolean process = dumpProcessor.process(dumpTask);
Assert.assertTrue(process);
//Check cache
CacheItem contentCache = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertEquals(MD5Utils.md5Hex(content, "UTF-8"), contentCache.getConfigCacheBeta().getMd5Utf8());
Assert.assertEquals(time, contentCache.getConfigCacheBeta().getLastModifiedTs());
Assert.assertTrue(contentCache.ips4Beta.containsAll(Arrays.asList(betaIps.split(","))));
//check disk
String contentFromDisk = ConfigDiskServiceFactory.getInstance().getBetaContent(dataId, group, tenant);
Assert.assertEquals(content, contentFromDisk);
// remove
Mockito.when(configInfoBetaPersistService.findConfigInfo4Beta(eq(dataId), eq(group), eq(tenant)))
.thenReturn(null);
boolean processRemove = dumpProcessor.process(dumpTask);
Assert.assertTrue(processRemove);
//Check cache
CacheItem contentCacheAfterRemove = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertTrue(contentCacheAfterRemove == null || contentCacheAfterRemove.getConfigCacheBeta() == null);
//check disk
String contentFromDiskAfterRemove = ConfigDiskServiceFactory.getInstance()
.getBetaContent(dataId, group, tenant);
Assert.assertNull(contentFromDiskAfterRemove);
}
@Test
public void testDumpTagAndRemove() throws IOException {
String dataId = "testDataIdBeta";
String group = "testGroup";
String tenant = "testTenant";
String tag = "testTag111";
String content = "testContentBeta你好" + System.currentTimeMillis();
long time = System.currentTimeMillis();
ConfigInfoTagWrapper configInfoWrapper = new ConfigInfoTagWrapper();
configInfoWrapper.setDataId(dataId);
configInfoWrapper.setGroup(group);
configInfoWrapper.setTenant(tenant);
configInfoWrapper.setContent(content);
configInfoWrapper.setLastModified(time);
configInfoWrapper.setTag(tag);
Mockito.when(configInfoTagPersistService.findConfigInfo4Tag(eq(dataId), eq(group), eq(tenant), eq(tag)))
.thenReturn(configInfoWrapper);
String handlerIp = "127.0.0.1";
long lastModified = System.currentTimeMillis();
DumpTask dumpTask = new DumpTask(GroupKey2.getKey(dataId, group, tenant), false, false, true, tag, lastModified,
handlerIp);
boolean process = dumpProcessor.process(dumpTask);
Assert.assertTrue(process);
//Check cache
CacheItem contentCache = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertEquals(MD5Utils.md5Hex(content, "UTF-8"), contentCache.getConfigCacheTags().get(tag).getMd5Utf8());
Assert.assertEquals(time, contentCache.getConfigCacheTags().get(tag).getLastModifiedTs());
//check disk
String contentFromDisk = ConfigDiskServiceFactory.getInstance().getTagContent(dataId, group, tenant, tag);
Assert.assertEquals(content, contentFromDisk);
// remove
Mockito.when(configInfoTagPersistService.findConfigInfo4Tag(eq(dataId), eq(group), eq(tenant), eq(tag)))
.thenReturn(null);
boolean processRemove = dumpProcessor.process(dumpTask);
Assert.assertTrue(processRemove);
//Check cache
CacheItem contentCacheAfterRemove = ConfigCacheService.getContentCache(GroupKey2.getKey(dataId, group, tenant));
Assert.assertTrue(contentCacheAfterRemove == null || contentCache.getConfigCacheTags() == null
|| contentCache.getConfigCacheTags().get(tag) == null);
//check disk
String contentFromDiskAfterRemove = ConfigDiskServiceFactory.getInstance()
.getTagContent(dataId, group, tenant, tag);
Assert.assertNull(contentFromDiskAfterRemove);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service.dump;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigDiskService;
import com.alibaba.nacos.config.server.service.dump.disk.ConfigRawDiskService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
@RunWith(MockitoJUnitRunner.class)
public class DumpProcessorUserRwaDiskTest extends DumpProcessorTest {
@Before
public void init() throws Exception {
super.init();
}
@Override
protected ConfigDiskService createDiskService() {
return new ConfigRawDiskService();
}
@After
public void after() {
super.after();
}
@Test
public void testDumpNormalAndRemove() throws IOException {
super.testDumpNormalAndRemove();
}
@Test
public void testDumpBetaAndRemove() throws IOException {
super.testDumpBetaAndRemove();
}
@Test
public void testDumpTagAndRemove() throws IOException {
super.testDumpTagAndRemove();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.utils;
import org.mockito.Mockito;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
public class TestCaseUtils {
/**
* create mocked transaction template with transact ability.
* @return
*/
public static TransactionTemplate createMockTransactionTemplate() {
JdbcTransactionManager transactionManager = Mockito.mock(JdbcTransactionManager.class);
when(transactionManager.getTransaction(any(TransactionDefinition.class))).thenReturn(
new DefaultTransactionStatus(null, true, true, false, false, null));
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
return transactionTemplate;
}
}

View File

@ -1,18 +1,17 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;

View File

@ -21,8 +21,8 @@ import com.alibaba.nacos.Nacos;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.persistence.datasource.DynamicDataSource;
import com.alibaba.nacos.config.server.service.repository.PersistService;
import com.alibaba.nacos.persistence.repository.embedded.operate.DatabaseOperate;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alibaba.nacos.test.base.ConfigCleanUtils;
@ -82,7 +82,7 @@ public class ConfigDerbyImport_CITCase {
final String queryGroup = "DEFAULT_GROUP";
final String expectContent = "people.enable=true";
PersistService persistService = context.getBean(PersistService.class);
ConfigInfoPersistService persistService = context.getBean(ConfigInfoPersistService.class);
ConfigInfo configInfo = persistService.findConfigInfo(queryDataId, queryGroup, "");
System.out.println(configInfo);
Assert.assertNotNull(configInfo);

View File

@ -17,24 +17,23 @@
package com.alibaba.nacos.test.config;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.persistence.model.event.RaftDbErrorEvent;
import com.alibaba.nacos.config.server.model.event.RaftDbErrorRecoverEvent;
import com.alibaba.nacos.config.server.service.repository.embedded.EmbeddedStoragePersistServiceImpl;
import com.alibaba.nacos.config.server.service.repository.PersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.embedded.EmbeddedConfigInfoPersistServiceImpl;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.distributed.id.IdGeneratorManager;
import com.alibaba.nacos.core.distributed.raft.utils.JRaftConstants;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.persistence.constants.PersistenceConstant;
import com.alibaba.nacos.persistence.model.event.RaftDbErrorEvent;
import com.alibaba.nacos.sys.utils.InetUtils;
import com.alibaba.nacos.test.base.BaseClusterTest;
import com.alibaba.nacos.test.base.ConfigCleanUtils;
import org.junit.Assert;
@ -76,10 +75,10 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate8 = context8.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate9 = context9.getBean(EmbeddedStoragePersistServiceImpl.class);
ConfigInfoPersistService operate7 = context7.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate8 = context8.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate9 = context9.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
@ -102,10 +101,10 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate8 = context8.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate9 = context9.getBean(EmbeddedStoragePersistServiceImpl.class);
ConfigInfoPersistService operate7 = context7.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate8 = context8.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate9 = context9.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
@ -127,10 +126,10 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate8 = context8.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate9 = context9.getBean(EmbeddedStoragePersistServiceImpl.class);
ConfigInfoPersistService operate7 = context7.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate8 = context8.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate9 = context9.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
@ -152,10 +151,10 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate8 = context8.getBean(EmbeddedStoragePersistServiceImpl.class);
PersistService operate9 = context9.getBean(EmbeddedStoragePersistServiceImpl.class);
ConfigInfoPersistService operate7 = context7.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate8 = context8.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
ConfigInfoPersistService operate9 = context9.getBean(EmbeddedConfigInfoPersistServiceImpl.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_1", "").getContent();