diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java index 4d6f5e13b..c194e3623 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java @@ -93,7 +93,7 @@ public class CacheData { LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot); } - private final String envName; + public final String envName; private final ConfigFilterChainManager configFilterChainManager; @@ -124,13 +124,13 @@ public class CacheData { /** * local cache change timestamp. */ - private volatile AtomicLong lastModifiedTs = new AtomicLong(0); + private final AtomicLong lastModifiedTs = new AtomicLong(0); /** * notify change flag,for notify&sync concurrent control. 1.reset to false if starting to sync with server. 2.update * to true if receive config change notification. */ - private volatile AtomicBoolean receiveNotifyChanged = new AtomicBoolean(false); + private final AtomicBoolean receiveNotifyChanged = new AtomicBoolean(false); private int taskId; @@ -139,7 +139,7 @@ public class CacheData { /** * if is cache data md5 sync with the server. */ - private volatile AtomicBoolean isConsistentWithServer = new AtomicBoolean(); + private final AtomicBoolean isConsistentWithServer = new AtomicBoolean(); /** * if is cache data is discard,need to remove. diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 86dd81d46..49f840b89 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -36,8 +36,6 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.common.remote.client.Connection; -import com.alibaba.nacos.plugin.auth.api.RequestResource; import com.alibaba.nacos.client.config.common.GroupKey; import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager; import com.alibaba.nacos.client.config.filter.impl.ConfigResponse; @@ -55,6 +53,7 @@ import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.ConnectionEventListener; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.RpcClientFactory; @@ -66,10 +65,12 @@ import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.VersionUtils; +import com.alibaba.nacos.plugin.auth.api.RequestResource; import com.google.gson.Gson; import com.google.gson.JsonObject; import org.slf4j.Logger; +import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -122,26 +123,26 @@ public class ClientWorker implements Closeable { private final AtomicReference> cacheMap = new AtomicReference<>(new HashMap<>()); private final ConfigFilterChainManager configFilterChainManager; - - private String uuid = UUID.randomUUID().toString(); + + private final String uuid = UUID.randomUUID().toString(); private long timeout; - private ConfigRpcTransportClient agent; + private final ConfigRpcTransportClient agent; private int taskPenaltyTime; private boolean enableRemoteSyncConfig = false; - + private static final int MIN_THREAD_NUM = 2; - + private static final int THREAD_MULTIPLE = 1; - + /** * index(taskId)-> total cache count for this taskId. */ private final List taskIdCacheCountList = new ArrayList<>(); - + /** * Add listeners for data. * @@ -404,11 +405,11 @@ public class ClientWorker implements Closeable { private void increaseTaskIdCount(int taskId) { taskIdCacheCountList.get(taskId).incrementAndGet(); } - + private void decreaseTaskIdCount(int taskId) { taskIdCacheCountList.get(taskId).decrementAndGet(); } - + private int calculateTaskId() { int perTaskSize = (int) ParamUtil.getPerTaskConfigSize(); for (int index = 0; index < taskIdCacheCountList.size(); index++) { @@ -419,7 +420,7 @@ public class ClientWorker implements Closeable { taskIdCacheCountList.add(new AtomicInteger(0)); return taskIdCacheCountList.size() - 1; } - + public CacheData getCache(String dataId, String group) { return getCache(dataId, group, TenantUtil.getUserTenantForAcm()); } @@ -567,15 +568,15 @@ public class ClientWorker implements Closeable { public class ConfigRpcTransportClient extends ConfigTransportClient { Map multiTaskExecutor = new HashMap<>(); - + private final BlockingQueue listenExecutebell = new ArrayBlockingQueue<>(1); - - private Object bellItem = new Object(); + + private final Object bellItem = new Object(); private long lastAllSyncTime = System.currentTimeMillis(); Subscriber subscriber = null; - + /** * 3 minutes to check all listen cache keys. */ @@ -779,37 +780,30 @@ public class ClientWorker implements Closeable { for (CacheData cache : cacheMap.get().values()) { synchronized (cache) { - - //check local listeners consistent. + + checkLocalConfig(cache); + + // check local listeners consistent. if (cache.isConsistentWithServer()) { cache.checkListenerMd5(); if (!needAllSync) { continue; } } - + + // If local configuration information is used, then skip the processing directly. + if (cache.isUseLocalConfigInfo()) { + continue; + } + if (!cache.isDiscard()) { - //get listen config - if (!cache.isUseLocalConfigInfo()) { - List cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); - if (cacheDatas == null) { - cacheDatas = new LinkedList<>(); - listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); - } - cacheDatas.add(cache); - - } - } else if (cache.isDiscard() && CollectionUtils.isEmpty(cache.getListeners())) { - - if (!cache.isUseLocalConfigInfo()) { - List cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); - if (cacheDatas == null) { - cacheDatas = new LinkedList<>(); - removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); - } - cacheDatas.add(cache); - - } + List cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()), + k -> new LinkedList<>()); + cacheDatas.add(cache); + } else { + List cacheDatas = removeListenCachesMap.computeIfAbsent( + String.valueOf(cache.getTaskId()), k -> new LinkedList<>()); + cacheDatas.add(cache); } } @@ -820,7 +814,7 @@ public class ClientWorker implements Closeable { //execute check remove listen. checkRemoveListenCache(removeListenCachesMap); - + if (needAllSync) { lastAllSyncTime = now; } @@ -828,9 +822,59 @@ public class ClientWorker implements Closeable { if (hasChangedKeys) { notifyListenConfig(); } - + } - + + /** + * Checks and handles local configuration for a given CacheData object. This method evaluates the use of + * failover files for local configuration storage and updates the CacheData accordingly. + * + * @param cacheData The CacheData object to be processed. + */ + public void checkLocalConfig(CacheData cacheData) { + final String dataId = cacheData.dataId; + final String group = cacheData.group; + final String tenant = cacheData.tenant; + final String envName = cacheData.envName; + + // Check if a failover file exists for the specified dataId, group, and tenant. + File file = LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant); + + // If not using local config info and a failover file exists, load and use it. + if (!cacheData.isUseLocalConfigInfo() && file.exists()) { + String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant); + final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); + cacheData.setUseLocalConfigInfo(true); + cacheData.setLocalConfigInfoVersion(file.lastModified()); + cacheData.setContent(content); + LOGGER.warn( + "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", + envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content)); + return; + } + + // If use local config info, but the failover file is deleted, switch back to server config. + if (cacheData.isUseLocalConfigInfo() && !file.exists()) { + cacheData.setUseLocalConfigInfo(false); + LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", envName, + dataId, group, tenant); + return; + } + + // When the failover file content changes, indicating a change in local configuration. + if (cacheData.isUseLocalConfigInfo() && file.exists() + && cacheData.getLocalConfigInfoVersion() != file.lastModified()) { + String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant); + final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); + cacheData.setUseLocalConfigInfo(true); + cacheData.setLocalConfigInfoVersion(file.lastModified()); + cacheData.setContent(content); + LOGGER.warn( + "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", + envName, dataId, group, tenant, md5, ContentUtils.truncateContent(content)); + } + } + private ExecutorService ensureSyncExecutor(String taskId) { if (!multiTaskExecutor.containsKey(taskId)) { multiTaskExecutor.put(taskId, @@ -842,14 +886,14 @@ public class ClientWorker implements Closeable { } return multiTaskExecutor.get(taskId); } - + private void checkRemoveListenCache(Map> removeListenCachesMap) { if (!removeListenCachesMap.isEmpty()) { List listenFutures = new ArrayList<>(); - + for (Map.Entry> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); - + ExecutorService executorService = ensureSyncExecutor(taskId); Future future = executorService.submit(() -> { List removeListenCaches = entry.getValue(); @@ -868,7 +912,7 @@ public class ClientWorker implements Closeable { } } } - + } catch (Throwable e) { LOGGER.error("Async remove listen config change error ", e); try { @@ -880,7 +924,7 @@ public class ClientWorker implements Closeable { } }); listenFutures.add(future); - + } for (Future future : listenFutures) { try { @@ -891,9 +935,9 @@ public class ClientWorker implements Closeable { } } } - + private boolean checkListenCache(Map> listenCachesMap) { - + final AtomicBoolean hasChangedKeys = new AtomicBoolean(false); if (!listenCachesMap.isEmpty()) { List listenFutures = new ArrayList<>(); @@ -913,9 +957,9 @@ public class ClientWorker implements Closeable { ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (listenResponse != null && listenResponse.isSuccess()) { - + Set changeKeys = new HashSet(); - + List changedConfigs = listenResponse.getChangedConfigs(); //handle changed keys,notify listener if (!CollectionUtils.isEmpty(changedConfigs)) { @@ -927,9 +971,9 @@ public class ClientWorker implements Closeable { boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); refreshContentAndCheck(changeKey, !isInitializing); } - + } - + for (CacheData cacheData : listenCaches) { if (cacheData.getReceiveNotifyChanged().get()) { String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, @@ -940,7 +984,7 @@ public class ClientWorker implements Closeable { } } } - + //handler content configs for (CacheData cacheData : listenCaches) { cacheData.setInitializing(false); @@ -954,7 +998,7 @@ public class ClientWorker implements Closeable { } } } - + } } catch (Throwable e) { LOGGER.error("Execute listen config change error ", e); @@ -967,7 +1011,7 @@ public class ClientWorker implements Closeable { } }); listenFutures.add(future); - + } for (Future future : listenFutures) { try { @@ -976,7 +1020,7 @@ public class ClientWorker implements Closeable { LOGGER.error("Async listen config change error ", throwable); } } - + } return hasChangedKeys.get(); } @@ -999,7 +1043,7 @@ public class ClientWorker implements Closeable { } } - + /** * build config string. * @@ -1170,7 +1214,7 @@ public class ClientWorker implements Closeable { ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request); return response.isSuccess(); } - + /** * check server is health. * @@ -1180,7 +1224,7 @@ public class ClientWorker implements Closeable { try { return getOneRunningClient().isRunning(); } catch (NacosException e) { - LOGGER.warn("check server status failed. error={}", e); + LOGGER.warn("check server status failed.", e); return false; } } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/remote/udp/UdpConnectorTest.java b/naming/src/test/java/com/alibaba/nacos/naming/remote/udp/UdpConnectorTest.java index 11f961711..18469bc4d 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/remote/udp/UdpConnectorTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/remote/udp/UdpConnectorTest.java @@ -91,7 +91,7 @@ public class UdpConnectorTest { @Test public void testContainAck() { - when(ackMap.containsKey(Mockito.anyString())).thenReturn(true); + when(ackMap.containsKey("1111")).thenReturn(true); Assert.assertTrue(udpConnector.containAck("1111")); }