From 8f94646bee3fe49fc6dad70f92f6e89d1449a50b Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Wed, 27 Jan 2021 15:27:13 +0800 Subject: [PATCH] Split double write task --- .../alibaba/nacos/naming/core/Service.java | 2 + .../naming/core/v2/client/AbstractClient.java | 2 +- .../impl/ConnectionBasedClientManager.java | 2 +- .../impl/EphemeralIpPortClientManager.java | 2 +- .../impl/PersistentIpPortClientManager.java | 10 ++- .../v2/metadata/ServiceMetadataProcessor.java | 14 ++++ .../core/v2/upgrade/UpgradeJudgement.java | 15 +++++ .../doublewrite/RefreshStorageDataTask.java | 41 ++++++++++++ .../doublewrite/delay/DoubleWriteContent.java | 38 +++++++++++ .../delay/DoubleWriteEventListener.java | 33 +++++++++- .../delay/ServiceChangeV1Task.java | 64 +++++++++++++++---- .../delay/ServiceChangeV2Task.java | 49 ++++++++++++-- .../DoubleWriteInstanceChangeToV1Task.java | 7 ++ .../DoubleWriteInstanceChangeToV2Task.java | 4 +- .../DoubleWriteMetadataChangeToV1Task.java | 3 +- .../DoubleWriteMetadataChangeToV2Task.java | 3 +- .../core/v2/upgrade/UpgradeJudgementTest.java | 2 +- 17 files changed, 262 insertions(+), 29 deletions(-) create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/RefreshStorageDataTask.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteContent.java diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java index b74a818ce..6cb411fb2 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java @@ -521,6 +521,8 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement Loggers.SRV_LOG.info("cluster size, new: {}, old: {}", getClusterMap().size(), vDom.getClusterMap().size()); recalculateChecksum(); + ApplicationUtils.getBean(DoubleWriteEventListener.class) + .doubleWriteMetadataToV2(this, vDom.allIPs(false).isEmpty()); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java index 96d24307d..1fe0d617b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java @@ -61,8 +61,8 @@ public abstract class AbstractClient implements Client { public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.getIpCountMonitor().incrementAndGet(); - NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); } + NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java index 8a1aa8051..c39bc9fdb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java @@ -61,7 +61,7 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener public boolean clientConnected(Client client) { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); if (!clients.containsKey(client.getClientId())) { - clients.put(client.getClientId(), (ConnectionBasedClient) client); + clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); } return true; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java index 1f0475df6..4784fa17b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java @@ -57,7 +57,7 @@ public class EphemeralIpPortClientManager implements ClientManager { public boolean clientConnected(Client client) { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); if (!clients.containsKey(client.getClientId())) { - clients.put(client.getClientId(), (IpPortBasedClient) client); + clients.putIfAbsent(client.getClientId(), (IpPortBasedClient) client); } return true; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManager.java index 3f9519f31..9ba509b88 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/PersistentIpPortClientManager.java @@ -47,7 +47,7 @@ public class PersistentIpPortClientManager implements ClientManager { public boolean clientConnected(Client client) { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); if (!clients.containsKey(client.getClientId())) { - clients.put(client.getClientId(), (IpPortBasedClient) client); + clients.putIfAbsent(client.getClientId(), (IpPortBasedClient) client); } return true; } @@ -79,8 +79,16 @@ public class PersistentIpPortClientManager implements ClientManager { return clients.containsKey(clientId); } + /** + * Compute and do operation new client when client not exist. + * + * @param clientId clientId + * @param supplier operation + * @return Client saved in manager + */ public Client computeIfAbsent(final String clientId, final Supplier supplier) { clients.computeIfAbsent(clientId, s -> supplier.get()); + Loggers.SRV_LOG.info("Client connection {} connect", clientId); return getClient(clientId); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java index 80e11ba3c..9d64959e6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java @@ -29,7 +29,9 @@ import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteEventListener; import com.alibaba.nacos.naming.utils.Constants; +import com.alibaba.nacos.sys.utils.ApplicationUtils; import org.apache.commons.lang3.reflect.TypeUtils; import org.springframework.stereotype.Component; @@ -119,6 +121,7 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP { Service singleton = ServiceManager.getInstance().getSingleton(service); namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata()); } + doubleWriteMetadata(service); } private void updateServiceMetadata(MetadataOperation op) { @@ -133,6 +136,17 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP { Service singleton = ServiceManager.getInstance().getSingleton(service); namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata()); } + doubleWriteMetadata(service); + } + + /** + * Only for downgrade to v1.x. + * + * @param service double write service + * @deprecated will remove in v2.1.x + */ + private void doubleWriteMetadata(Service service) { + ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteMetadataToV1(service); } /** diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java index 4b0014a11..8698aaa81 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java @@ -30,8 +30,11 @@ import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.RefreshStorageDataTask; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher; import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.sys.env.EnvUtil; import org.codehaus.jackson.Version; @@ -185,6 +188,18 @@ public class UpgradeJudgement extends Subscriber { Loggers.SRV_LOG.info("Upgrade to 2.0.X"); useGrpcFeatures.compareAndSet(false, true); useJraftFeatures.set(true); + refreshPersistentServices(); + } + + private void refreshPersistentServices() { + for (String each : com.alibaba.nacos.naming.core.v2.ServiceManager.getInstance().getAllNamespaces()) { + for (Service service : com.alibaba.nacos.naming.core.v2.ServiceManager.getInstance().getSingletons(each)) { + if (!service.isEphemeral()) { + NamingExecuteTaskDispatcher.getInstance() + .dispatchAndExecuteTask(service, new RefreshStorageDataTask(service)); + } + } + } } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/RefreshStorageDataTask.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/RefreshStorageDataTask.java new file mode 100644 index 000000000..5987ac632 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/RefreshStorageDataTask.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite; + +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.sys.utils.ApplicationUtils; + +/** + * Refresh service storage cache data when upgrading. + * + * @author xiweng.yy + */ +public class RefreshStorageDataTask extends AbstractExecuteTask { + + private final Service service; + + public RefreshStorageDataTask(Service service) { + this.service = service; + } + + @Override + public void run() { + ApplicationUtils.getBean(ServiceStorage.class).getPushData(service); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteContent.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteContent.java new file mode 100644 index 000000000..9329df923 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteContent.java @@ -0,0 +1,38 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay; + +/** + * Double write content. + * + * @author xiweng.yy + */ +public enum DoubleWriteContent { + + /** + * Only write metadata. + */ + METADATA, + /** + * Only write instance. + */ + INSTANCE, + /** + * Both of content. + */ + BOTH; +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteEventListener.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteEventListener.java index e78a98819..06ac4aa3c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteEventListener.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteEventListener.java @@ -49,7 +49,7 @@ public class DoubleWriteEventListener extends Subscriber newInstance = service.allIPs(serviceTask.isEphemeral()); + ServiceInfo serviceInfo = serviceStorage.getPushData(transfer(service, ephemeral)); + List newInstance = service.allIPs(ephemeral); Set instances = new HashSet<>(); for (Instance each : newInstance) { instances.add(each.toIpAddr()); DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task( service.getNamespaceId(), service.getName(), each, true); - NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask( - IpPortBasedClient.getClientId(each.toIpAddr(), serviceTask.isEphemeral()), instanceTask); + NamingExecuteTaskDispatcher.getInstance() + .dispatchAndExecuteTask(IpPortBasedClient.getClientId(each.toIpAddr(), ephemeral), + instanceTask); } List oldInstance = serviceInfo.getHosts(); for (com.alibaba.nacos.api.naming.pojo.Instance each : oldInstance) { if (!instances.contains(each.toInetAddr())) { DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task( service.getNamespaceId(), service.getName(), each, false); - NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask( - IpPortBasedClient.getClientId(each.toInetAddr(), serviceTask.isEphemeral()), instanceTask); + NamingExecuteTaskDispatcher.getInstance() + .dispatchAndExecuteTask(IpPortBasedClient.getClientId(each.toInetAddr(), ephemeral), + instanceTask); } } - return true; } private com.alibaba.nacos.naming.core.v2.pojo.Service transfer(Service service, boolean ephemeral) { @@ -122,6 +153,13 @@ public class ServiceChangeV1Task extends AbstractDelayTask { NamingUtils.getServiceName(service.getName()), ephemeral); } + private void dispatchMetadataTask(Service service, boolean ephemeral) { + ServiceMetadata serviceMetadata = parseServiceMetadata(service, ephemeral); + DoubleWriteMetadataChangeToV2Task metadataTask = new DoubleWriteMetadataChangeToV2Task( + service.getNamespaceId(), service.getName(), ephemeral, serviceMetadata); + NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service.getName(), metadataTask); + } + private ServiceMetadata parseServiceMetadata(Service service, boolean ephemeral) { ServiceMetadata result = new ServiceMetadata(); result.setEphemeral(ephemeral); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV2Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV2Task.java index 2f01df171..55d2922b2 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV2Task.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV2Task.java @@ -34,16 +34,32 @@ public class ServiceChangeV2Task extends AbstractDelayTask { private final Service changedService; - public ServiceChangeV2Task(Service service) { + private DoubleWriteContent content; + + public ServiceChangeV2Task(Service service, DoubleWriteContent content) { changedService = service; + this.content = content; + setTaskInterval(1000L); + setLastProcessTime(System.currentTimeMillis()); } public Service getChangedService() { return changedService; } + public DoubleWriteContent getContent() { + return content; + } + @Override public void merge(AbstractDelayTask task) { + if (!(task instanceof ServiceChangeV2Task)) { + return; + } + ServiceChangeV2Task oldTask = (ServiceChangeV2Task) task; + if (!content.equals(oldTask.getContent())) { + content = DoubleWriteContent.BOTH; + } } public static String getKey(Service service) { @@ -55,15 +71,36 @@ public class ServiceChangeV2Task extends AbstractDelayTask { @Override public boolean process(NacosTask task) { ServiceChangeV2Task serviceTask = (ServiceChangeV2Task) task; - Loggers.SRV_LOG.info("double write for service {}", serviceTask.getChangedService()); Service changedService = serviceTask.getChangedService(); - DoubleWriteMetadataChangeToV1Task metadataTask = new DoubleWriteMetadataChangeToV1Task(changedService); - NamingExecuteTaskDispatcher.getInstance() - .dispatchAndExecuteTask(changedService.getGroupedServiceName(), metadataTask); + Loggers.SRV_LOG.info("double write for service {}, content {}", changedService, serviceTask.getContent()); + switch (serviceTask.getContent()) { + case INSTANCE: + dispatchInstanceChangeTask(changedService); + break; + case METADATA: + dispatchMetadataChangeTask(changedService); + break; + default: + dispatchAllTask(changedService); + } + return true; + } + + private void dispatchInstanceChangeTask(Service changedService) { DoubleWriteInstanceChangeToV1Task instanceTask = new DoubleWriteInstanceChangeToV1Task(changedService); NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(changedService.getGroupedServiceName(), instanceTask); - return true; + } + + private void dispatchMetadataChangeTask(Service changedService) { + DoubleWriteMetadataChangeToV1Task metadataTask = new DoubleWriteMetadataChangeToV1Task(changedService); + NamingExecuteTaskDispatcher.getInstance() + .dispatchAndExecuteTask(changedService.getGroupedServiceName(), metadataTask); + } + + private void dispatchAllTask(Service changedService) { + dispatchMetadataChangeTask(changedService); + dispatchInstanceChangeTask(changedService); } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV1Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV1Task.java index 82686a754..45dbe9c28 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV1Task.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV1Task.java @@ -24,6 +24,9 @@ import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.ServiceManager; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV2Task; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.sys.utils.ApplicationUtils; @@ -60,6 +63,10 @@ public class DoubleWriteInstanceChangeToV1Task extends AbstractExecuteTask { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("Double write task for {} instance from 2 to 1 failed", service, e); } + ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service, DoubleWriteContent.INSTANCE); + retryTask.setTaskInterval(3000L); + String taskKey = ServiceChangeV2Task.getKey(service); + ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java index 16434ae6d..29a967ef1 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java @@ -19,6 +19,7 @@ package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.common.task.AbstractExecuteTask; import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task; import com.alibaba.nacos.naming.misc.Loggers; @@ -61,7 +62,8 @@ public class DoubleWriteInstanceChangeToV2Task extends AbstractExecuteTask { Loggers.SRV_LOG .debug("Double write task for {}#{} instance from 1 to 2 failed", namespace, serviceName, e); } - ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral()); + ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral(), + DoubleWriteContent.INSTANCE); retryTask.setTaskInterval(3000L); String taskKey = ServiceChangeV1Task.getKey(namespace, serviceName, instance.isEphemeral()); ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV1Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV1Task.java index 68aa19c63..4f93711b0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV1Task.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV1Task.java @@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager; import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV2Task; import com.alibaba.nacos.naming.misc.Loggers; @@ -59,7 +60,7 @@ public class DoubleWriteMetadataChangeToV1Task extends AbstractExecuteTask { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e); } - ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service); + ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service, DoubleWriteContent.METADATA); retryTask.setTaskInterval(3000L); String taskKey = ServiceChangeV2Task.getKey(service); ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java index bb78e6030..80dc356fc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java @@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService; import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task; import com.alibaba.nacos.naming.misc.Loggers; @@ -67,7 +68,7 @@ public class DoubleWriteMetadataChangeToV2Task extends AbstractExecuteTask { Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e); } ServiceChangeV1Task retryTask = new ServiceChangeV1Task(service.getNamespace(), - service.getGroupedServiceName(), service.isEphemeral()); + service.getGroupedServiceName(), service.isEphemeral(), DoubleWriteContent.METADATA); retryTask.setTaskInterval(3000L); String taskKey = ServiceChangeV1Task .getKey(service.getNamespace(), service.getGroupedServiceName(), service.isEphemeral()); diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java index 244e25496..f3a2e574e 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java @@ -76,7 +76,7 @@ public class UpgradeJudgementTest { public void setUp() throws Exception { EnvUtil.setEnvironment(environment); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, - doubleWriteDelayTaskEngine); + doubleWriteDelayTaskEngine, serviceStorage); } @After