* [ISSUE #11253]To fix the triggering of the listener upon failover content changes. * [ISSUE #11255]Update style. * [ISSUE #11255]Fix UdpConnectorTest#testContainAck.
This commit is contained in:
parent
0e6b5faf4f
commit
086be0386b
@ -93,7 +93,7 @@ public class CacheData {
|
||||
LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot);
|
||||
}
|
||||
|
||||
private final String envName;
|
||||
public final String envName;
|
||||
|
||||
private final ConfigFilterChainManager configFilterChainManager;
|
||||
|
||||
@ -124,13 +124,13 @@ public class CacheData {
|
||||
/**
|
||||
* local cache change timestamp.
|
||||
*/
|
||||
private volatile AtomicLong lastModifiedTs = new AtomicLong(0);
|
||||
private final AtomicLong lastModifiedTs = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* notify change flag,for notify&sync concurrent control. 1.reset to false if starting to sync with server. 2.update
|
||||
* to true if receive config change notification.
|
||||
*/
|
||||
private volatile AtomicBoolean receiveNotifyChanged = new AtomicBoolean(false);
|
||||
private final AtomicBoolean receiveNotifyChanged = new AtomicBoolean(false);
|
||||
|
||||
private int taskId;
|
||||
|
||||
@ -139,7 +139,7 @@ public class CacheData {
|
||||
/**
|
||||
* if is cache data md5 sync with the server.
|
||||
*/
|
||||
private volatile AtomicBoolean isConsistentWithServer = new AtomicBoolean();
|
||||
private final AtomicBoolean isConsistentWithServer = new AtomicBoolean();
|
||||
|
||||
/**
|
||||
* if is cache data is discard,need to remove.
|
||||
|
@ -36,8 +36,6 @@ import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.remote.RemoteConstants;
|
||||
import com.alibaba.nacos.api.remote.request.Request;
|
||||
import com.alibaba.nacos.api.remote.response.Response;
|
||||
import com.alibaba.nacos.common.remote.client.Connection;
|
||||
import com.alibaba.nacos.plugin.auth.api.RequestResource;
|
||||
import com.alibaba.nacos.client.config.common.GroupKey;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
|
||||
@ -55,6 +53,7 @@ import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||
import com.alibaba.nacos.common.remote.client.Connection;
|
||||
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClient;
|
||||
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
|
||||
@ -66,10 +65,12 @@ import com.alibaba.nacos.common.utils.MD5Utils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
import com.alibaba.nacos.common.utils.VersionUtils;
|
||||
import com.alibaba.nacos.plugin.auth.api.RequestResource;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
@ -122,26 +123,26 @@ public class ClientWorker implements Closeable {
|
||||
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<>(new HashMap<>());
|
||||
|
||||
private final ConfigFilterChainManager configFilterChainManager;
|
||||
|
||||
private String uuid = UUID.randomUUID().toString();
|
||||
|
||||
private final String uuid = UUID.randomUUID().toString();
|
||||
|
||||
private long timeout;
|
||||
|
||||
private ConfigRpcTransportClient agent;
|
||||
private final ConfigRpcTransportClient agent;
|
||||
|
||||
private int taskPenaltyTime;
|
||||
|
||||
private boolean enableRemoteSyncConfig = false;
|
||||
|
||||
|
||||
private static final int MIN_THREAD_NUM = 2;
|
||||
|
||||
|
||||
private static final int THREAD_MULTIPLE = 1;
|
||||
|
||||
|
||||
/**
|
||||
* index(taskId)-> total cache count for this taskId.
|
||||
*/
|
||||
private final List<AtomicInteger> taskIdCacheCountList = new ArrayList<>();
|
||||
|
||||
|
||||
/**
|
||||
* Add listeners for data.
|
||||
*
|
||||
@ -404,11 +405,11 @@ public class ClientWorker implements Closeable {
|
||||
private void increaseTaskIdCount(int taskId) {
|
||||
taskIdCacheCountList.get(taskId).incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
private void decreaseTaskIdCount(int taskId) {
|
||||
taskIdCacheCountList.get(taskId).decrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
private int calculateTaskId() {
|
||||
int perTaskSize = (int) ParamUtil.getPerTaskConfigSize();
|
||||
for (int index = 0; index < taskIdCacheCountList.size(); index++) {
|
||||
@ -419,7 +420,7 @@ public class ClientWorker implements Closeable {
|
||||
taskIdCacheCountList.add(new AtomicInteger(0));
|
||||
return taskIdCacheCountList.size() - 1;
|
||||
}
|
||||
|
||||
|
||||
public CacheData getCache(String dataId, String group) {
|
||||
return getCache(dataId, group, TenantUtil.getUserTenantForAcm());
|
||||
}
|
||||
@ -567,15 +568,15 @@ public class ClientWorker implements Closeable {
|
||||
public class ConfigRpcTransportClient extends ConfigTransportClient {
|
||||
|
||||
Map<String, ExecutorService> multiTaskExecutor = new HashMap<>();
|
||||
|
||||
|
||||
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
|
||||
|
||||
private Object bellItem = new Object();
|
||||
|
||||
private final Object bellItem = new Object();
|
||||
|
||||
private long lastAllSyncTime = System.currentTimeMillis();
|
||||
|
||||
Subscriber subscriber = null;
|
||||
|
||||
|
||||
/**
|
||||
* 3 minutes to check all listen cache keys.
|
||||
*/
|
||||
@ -779,37 +780,30 @@ public class ClientWorker implements Closeable {
|
||||
for (CacheData cache : cacheMap.get().values()) {
|
||||
|
||||
synchronized (cache) {
|
||||
|
||||
//check local listeners consistent.
|
||||
|
||||
checkLocalConfig(cache);
|
||||
|
||||
// check local listeners consistent.
|
||||
if (cache.isConsistentWithServer()) {
|
||||
cache.checkListenerMd5();
|
||||
if (!needAllSync) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If local configuration information is used, then skip the processing directly.
|
||||
if (cache.isUseLocalConfigInfo()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!cache.isDiscard()) {
|
||||
//get listen config
|
||||
if (!cache.isUseLocalConfigInfo()) {
|
||||
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
|
||||
if (cacheDatas == null) {
|
||||
cacheDatas = new LinkedList<>();
|
||||
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
|
||||
}
|
||||
cacheDatas.add(cache);
|
||||
|
||||
}
|
||||
} else if (cache.isDiscard() && CollectionUtils.isEmpty(cache.getListeners())) {
|
||||
|
||||
if (!cache.isUseLocalConfigInfo()) {
|
||||
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
|
||||
if (cacheDatas == null) {
|
||||
cacheDatas = new LinkedList<>();
|
||||
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
|
||||
}
|
||||
cacheDatas.add(cache);
|
||||
|
||||
}
|
||||
List<CacheData> cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()),
|
||||
k -> new LinkedList<>());
|
||||
cacheDatas.add(cache);
|
||||
} else {
|
||||
List<CacheData> cacheDatas = removeListenCachesMap.computeIfAbsent(
|
||||
String.valueOf(cache.getTaskId()), k -> new LinkedList<>());
|
||||
cacheDatas.add(cache);
|
||||
}
|
||||
}
|
||||
|
||||
@ -820,7 +814,7 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
//execute check remove listen.
|
||||
checkRemoveListenCache(removeListenCachesMap);
|
||||
|
||||
|
||||
if (needAllSync) {
|
||||
lastAllSyncTime = now;
|
||||
}
|
||||
@ -828,9 +822,59 @@ public class ClientWorker implements Closeable {
|
||||
if (hasChangedKeys) {
|
||||
notifyListenConfig();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks and handles local configuration for a given CacheData object. This method evaluates the use of
|
||||
* failover files for local configuration storage and updates the CacheData accordingly.
|
||||
*
|
||||
* @param cacheData The CacheData object to be processed.
|
||||
*/
|
||||
public void checkLocalConfig(CacheData cacheData) {
|
||||
final String dataId = cacheData.dataId;
|
||||
final String group = cacheData.group;
|
||||
final String tenant = cacheData.tenant;
|
||||
final String envName = cacheData.envName;
|
||||
|
||||
// Check if a failover file exists for the specified dataId, group, and tenant.
|
||||
File file = LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant);
|
||||
|
||||
// If not using local config info and a failover file exists, load and use it.
|
||||
if (!cacheData.isUseLocalConfigInfo() && file.exists()) {
|
||||
String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant);
|
||||
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
|
||||
cacheData.setUseLocalConfigInfo(true);
|
||||
cacheData.setLocalConfigInfoVersion(file.lastModified());
|
||||
cacheData.setContent(content);
|
||||
LOGGER.warn(
|
||||
"[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
|
||||
envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content));
|
||||
return;
|
||||
}
|
||||
|
||||
// If use local config info, but the failover file is deleted, switch back to server config.
|
||||
if (cacheData.isUseLocalConfigInfo() && !file.exists()) {
|
||||
cacheData.setUseLocalConfigInfo(false);
|
||||
LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", envName,
|
||||
dataId, group, tenant);
|
||||
return;
|
||||
}
|
||||
|
||||
// When the failover file content changes, indicating a change in local configuration.
|
||||
if (cacheData.isUseLocalConfigInfo() && file.exists()
|
||||
&& cacheData.getLocalConfigInfoVersion() != file.lastModified()) {
|
||||
String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant);
|
||||
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
|
||||
cacheData.setUseLocalConfigInfo(true);
|
||||
cacheData.setLocalConfigInfoVersion(file.lastModified());
|
||||
cacheData.setContent(content);
|
||||
LOGGER.warn(
|
||||
"[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
|
||||
envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content));
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService ensureSyncExecutor(String taskId) {
|
||||
if (!multiTaskExecutor.containsKey(taskId)) {
|
||||
multiTaskExecutor.put(taskId,
|
||||
@ -842,14 +886,14 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
return multiTaskExecutor.get(taskId);
|
||||
}
|
||||
|
||||
|
||||
private void checkRemoveListenCache(Map<String, List<CacheData>> removeListenCachesMap) {
|
||||
if (!removeListenCachesMap.isEmpty()) {
|
||||
List<Future> listenFutures = new ArrayList<>();
|
||||
|
||||
|
||||
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
|
||||
|
||||
ExecutorService executorService = ensureSyncExecutor(taskId);
|
||||
Future future = executorService.submit(() -> {
|
||||
List<CacheData> removeListenCaches = entry.getValue();
|
||||
@ -868,7 +912,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Async remove listen config change error ", e);
|
||||
try {
|
||||
@ -880,7 +924,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
});
|
||||
listenFutures.add(future);
|
||||
|
||||
|
||||
}
|
||||
for (Future future : listenFutures) {
|
||||
try {
|
||||
@ -891,9 +935,9 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {
|
||||
|
||||
|
||||
final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
|
||||
if (!listenCachesMap.isEmpty()) {
|
||||
List<Future> listenFutures = new ArrayList<>();
|
||||
@ -913,9 +957,9 @@ public class ClientWorker implements Closeable {
|
||||
ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
|
||||
rpcClient, configChangeListenRequest);
|
||||
if (listenResponse != null && listenResponse.isSuccess()) {
|
||||
|
||||
|
||||
Set<String> changeKeys = new HashSet<String>();
|
||||
|
||||
|
||||
List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
|
||||
//handle changed keys,notify listener
|
||||
if (!CollectionUtils.isEmpty(changedConfigs)) {
|
||||
@ -927,9 +971,9 @@ public class ClientWorker implements Closeable {
|
||||
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
|
||||
refreshContentAndCheck(changeKey, !isInitializing);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
for (CacheData cacheData : listenCaches) {
|
||||
if (cacheData.getReceiveNotifyChanged().get()) {
|
||||
String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
|
||||
@ -940,7 +984,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//handler content configs
|
||||
for (CacheData cacheData : listenCaches) {
|
||||
cacheData.setInitializing(false);
|
||||
@ -954,7 +998,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Execute listen config change error ", e);
|
||||
@ -967,7 +1011,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
});
|
||||
listenFutures.add(future);
|
||||
|
||||
|
||||
}
|
||||
for (Future future : listenFutures) {
|
||||
try {
|
||||
@ -976,7 +1020,7 @@ public class ClientWorker implements Closeable {
|
||||
LOGGER.error("Async listen config change error ", throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
return hasChangedKeys.get();
|
||||
}
|
||||
@ -999,7 +1043,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* build config string.
|
||||
*
|
||||
@ -1170,7 +1214,7 @@ public class ClientWorker implements Closeable {
|
||||
ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request);
|
||||
return response.isSuccess();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check server is health.
|
||||
*
|
||||
@ -1180,7 +1224,7 @@ public class ClientWorker implements Closeable {
|
||||
try {
|
||||
return getOneRunningClient().isRunning();
|
||||
} catch (NacosException e) {
|
||||
LOGGER.warn("check server status failed. error={}", e);
|
||||
LOGGER.warn("check server status failed.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ public class UdpConnectorTest {
|
||||
|
||||
@Test
|
||||
public void testContainAck() {
|
||||
when(ackMap.containsKey(Mockito.anyString())).thenReturn(true);
|
||||
when(ackMap.containsKey("1111")).thenReturn(true);
|
||||
Assert.assertTrue(udpConnector.containAck("1111"));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user